Files
railiance-cluster/tools/cmd/railiance-verify-activity-core
tegwick dddc7ebd81
Some checks failed
railiance-tests / smoke (push) Has been cancelled
Add activity-core cluster verifier
2026-06-16 03:51:01 +02:00

385 lines
13 KiB
Bash
Executable File

#!/usr/bin/env bash
# Cluster-owned activity-core runtime reconcile and ops inventory probe evidence path.
set -euo pipefail
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
NAMESPACE="${ACTIVITY_CORE_NAMESPACE:-activity-core}"
DEFINITION_ID="${ACTIVITY_CORE_OPS_DEFINITION_ID:-40d15a87-7ff6-4d8e-992c-37df15f95110}"
DEFINITION_SLUG="${ACTIVITY_CORE_OPS_DEFINITION_SLUG:-ops-service-inventory-probes}"
DEFINITION_NAME="${ACTIVITY_CORE_OPS_DEFINITION_NAME:-Ops Service Inventory Probes}"
STATE_HUB_URL="${STATE_HUB_URL:-http://127.0.0.1:8000}"
STATE_HUB_PROGRESS_TIMEOUT_SECONDS="${STATE_HUB_PROGRESS_TIMEOUT_SECONDS:-180}"
STATE_HUB_PROGRESS_POLL_SECONDS="${STATE_HUB_PROGRESS_POLL_SECONDS:-5}"
ACTIVITY_CORE_REPO="${ACTIVITY_CORE_REPO:-/home/worsch/activity-core}"
ACTIVITY_CORE_REMOTE_REPO="${ACTIVITY_CORE_REMOTE_REPO:-}"
ACTIVITY_CORE_CLUSTER_HOST="${ACTIVITY_CORE_CLUSTER_HOST:-railiance01}"
ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE="${ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE:-auto}"
ACTIVITY_CORE_RESTART_DEPLOYMENTS="${ACTIVITY_CORE_RESTART_DEPLOYMENTS:-0}"
if [[ "$ACTIVITY_CORE_CLUSTER_HOST" == "local" ]]; then
ACTIVITY_CORE_CLUSTER_HOST=""
fi
if [[ -z "$ACTIVITY_CORE_REMOTE_REPO" ]]; then
if [[ -n "$ACTIVITY_CORE_CLUSTER_HOST" ]]; then
ACTIVITY_CORE_REMOTE_REPO="$(ssh "$ACTIVITY_CORE_CLUSTER_HOST" pwd)/activity-core"
else
ACTIVITY_CORE_REMOTE_REPO="$ACTIVITY_CORE_REPO"
fi
fi
EVIDENCE_WORKSTREAM_ID="${STATE_HUB_EVIDENCE_WORKSTREAM_ID:-c91a0946-92f9-4b41-8a92-005b29952916}"
EVIDENCE_TASK_ID="${STATE_HUB_EVIDENCE_TASK_ID:-d15fc947-3fbe-4269-93c6-d98577352149}"
INTER_HUB_SUBMISSION_STATUS="${INTER_HUB_SUBMISSION_STATUS:-deferred}"
INTER_HUB_DEFER_REASON="${INTER_HUB_DEFER_REASON:-ops-hub key custody and Inter-Hub production intake remain operator-gated; State Hub fallback evidence is accepted for this handoff}"
STARTED_AT="$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
CURRENT_GATE="startup"
REMOTE_REVISION=""
API_IMAGE=""
SYNC_STATUS_JSON=""
DEFINITION_JSON=""
TRIGGER_JSON=""
PROGRESS_JSON=""
EVIDENCE_NOTE_JSON=""
export NAMESPACE DEFINITION_ID DEFINITION_SLUG DEFINITION_NAME
export STATE_HUB_URL EVIDENCE_WORKSTREAM_ID EVIDENCE_TASK_ID
export STATE_HUB_PROGRESS_TIMEOUT_SECONDS STATE_HUB_PROGRESS_POLL_SECONDS
export INTER_HUB_SUBMISSION_STATUS INTER_HUB_DEFER_REASON STARTED_AT
export ACTIVITY_CORE_CLUSTER_HOST ACTIVITY_CORE_REMOTE_REPO
export ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE ACTIVITY_CORE_RESTART_DEPLOYMENTS
export REMOTE_REVISION API_IMAGE SYNC_STATUS_JSON DEFINITION_JSON TRIGGER_JSON PROGRESS_JSON
log() {
printf '[activity-core-verify] %s\n' "$*"
}
quote() {
printf '%q' "$1"
}
cluster_bash() {
local script="$1"
if [[ -n "$ACTIVITY_CORE_CLUSTER_HOST" ]]; then
ssh "$ACTIVITY_CORE_CLUSTER_HOST" "bash -s" <<<"$script"
else
bash -s <<<"$script"
fi
}
should_sync_runtime_bundle() {
case "$ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE" in
1|true|yes) return 0 ;;
0|false|no) return 1 ;;
auto)
[[ -n "$ACTIVITY_CORE_CLUSTER_HOST" && -d "$ACTIVITY_CORE_REPO/k8s/railiance" ]]
return
;;
*)
printf 'invalid ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE=%s\n' "$ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE" >&2
exit 2
;;
esac
}
post_evidence() {
local status="$1"
local failing_gate="${2:-}"
export EVIDENCE_STATUS="$status"
export FAILING_GATE="$failing_gate"
python3 - <<'PY'
import json
import os
import sys
import urllib.request
def from_json_env(name):
raw = os.environ.get(name, "")
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return {"raw": raw}
status = os.environ["EVIDENCE_STATUS"]
failing_gate = os.environ.get("FAILING_GATE") or None
definition = from_json_env("DEFINITION_JSON")
trigger = from_json_env("TRIGGER_JSON")
progress = from_json_env("PROGRESS_JSON")
sync_status = from_json_env("SYNC_STATUS_JSON")
detail = {
"producer": "railiance-cluster",
"verification": "activity-core cluster-owned deploy/verify",
"status": status,
"failing_gate": failing_gate,
"cluster_host": os.environ.get("ACTIVITY_CORE_CLUSTER_HOST") or "local-kubectl",
"namespace": os.environ.get("NAMESPACE"),
"activity_core_repo": os.environ.get("ACTIVITY_CORE_REMOTE_REPO"),
"activity_core_revision": os.environ.get("REMOTE_REVISION") or None,
"api_image": os.environ.get("API_IMAGE") or None,
"runtime_bundle": "k8s/railiance/20-runtime.yaml",
"sync_job": sync_status,
"definition": definition,
"manual_trigger": trigger,
"state_hub_progress": progress,
"inter_hub_submission": {
"status": os.environ.get("INTER_HUB_SUBMISSION_STATUS"),
"reason": os.environ.get("INTER_HUB_DEFER_REASON"),
},
"started_at": os.environ.get("STARTED_AT"),
}
if status == "passed":
summary = (
"Railiance activity-core deploy/verify passed: runtime reconciled, "
"actcore-sync completed, ops-service-inventory-probes remains disabled, "
f"manual trigger {trigger.get('workflow_id') if isinstance(trigger, dict) else 'unknown'} ran, "
f"and State Hub ops_inventory_probe progress {progress.get('id') if isinstance(progress, dict) else 'unknown'} exists."
)
else:
summary = (
"Railiance activity-core deploy/verify failed"
+ (f" at {failing_gate}" if failing_gate else "")
+ "; see non-secret evidence detail for the last completed gate."
)
payload = {
"summary": summary,
"event_type": "note",
"author": "railiance-cluster",
"detail": detail,
}
if os.environ.get("EVIDENCE_WORKSTREAM_ID"):
payload["workstream_id"] = os.environ["EVIDENCE_WORKSTREAM_ID"]
if os.environ.get("EVIDENCE_TASK_ID"):
payload["task_id"] = os.environ["EVIDENCE_TASK_ID"]
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
os.environ["STATE_HUB_URL"].rstrip("/") + "/progress/",
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=20) as resp:
sys.stdout.write(resp.read().decode("utf-8"))
except Exception as exc:
sys.stderr.write(f"failed to post State Hub evidence note: {exc}\n")
raise
PY
}
on_error() {
local code=$?
trap - ERR
post_evidence "failed" "$CURRENT_GATE" >/dev/null || true
exit "$code"
}
trap on_error ERR
CURRENT_GATE="cluster executor preflight"
log "using cluster executor: ${ACTIVITY_CORE_CLUSTER_HOST:-local kubectl}"
cluster_bash "$(cat <<EOF
set -euo pipefail
command -v kubectl >/dev/null
EOF
)"
CURRENT_GATE="runtime bundle sync"
if should_sync_runtime_bundle; then
if [[ -z "$ACTIVITY_CORE_CLUSTER_HOST" ]]; then
log "runtime bundle already local at ${ACTIVITY_CORE_REPO}/k8s/railiance"
else
log "syncing runtime bundle to ${ACTIVITY_CORE_CLUSTER_HOST}:${ACTIVITY_CORE_REMOTE_REPO}/k8s/railiance"
ssh "$ACTIVITY_CORE_CLUSTER_HOST" "mkdir -p $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance"
rsync -a --delete \
"$ACTIVITY_CORE_REPO/k8s/railiance/" \
"${ACTIVITY_CORE_CLUSTER_HOST}:${ACTIVITY_CORE_REMOTE_REPO}/k8s/railiance/"
fi
fi
CURRENT_GATE="runtime bundle preflight"
cluster_bash "$(cat <<EOF
set -euo pipefail
test -f $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance/00-namespace.yaml
test -f $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance/20-runtime.yaml
grep -q $(quote "$DEFINITION_SLUG") $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance/20-runtime.yaml
EOF
)"
CURRENT_GATE="activity-core revision inspection"
REMOTE_REVISION="$(
cluster_bash "$(cat <<EOF
set -euo pipefail
git -C $(quote "$ACTIVITY_CORE_REMOTE_REPO") rev-parse --short HEAD 2>/dev/null || true
EOF
)"
)"
export REMOTE_REVISION
CURRENT_GATE="runtime bundle reconcile"
log "reconciling activity-core runtime bundle"
cluster_bash "$(cat <<EOF
set -euo pipefail
kubectl apply -f $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance/00-namespace.yaml
kubectl -n $(quote "$NAMESPACE") delete job actcore-migrate actcore-sync --ignore-not-found
kubectl apply -f $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance/20-runtime.yaml
if [[ $(quote "$ACTIVITY_CORE_RESTART_DEPLOYMENTS") == "1" ]]; then
kubectl -n $(quote "$NAMESPACE") rollout restart deploy/actcore-api deploy/actcore-worker deploy/actcore-event-router
fi
kubectl -n $(quote "$NAMESPACE") wait --for=condition=complete job/actcore-migrate --timeout=180s
kubectl -n $(quote "$NAMESPACE") rollout status deploy/actcore-api --timeout=180s
kubectl -n $(quote "$NAMESPACE") rollout status deploy/actcore-worker --timeout=180s
kubectl -n $(quote "$NAMESPACE") rollout status deploy/actcore-event-router --timeout=180s
kubectl -n $(quote "$NAMESPACE") wait --for=condition=complete job/actcore-sync --timeout=180s
EOF
)"
CURRENT_GATE="live image capability check"
cluster_bash "$(cat <<EOF
set -euo pipefail
kubectl -n $(quote "$NAMESPACE") exec deploy/actcore-api -- python -c 'import activity_core.context_resolvers.ops_inventory; import activity_core.ops_evidence_sinks'
EOF
)"
CURRENT_GATE="runtime status capture"
API_IMAGE="$(
cluster_bash "$(cat <<EOF
set -euo pipefail
kubectl -n $(quote "$NAMESPACE") get deploy actcore-api -o jsonpath='{.spec.template.spec.containers[0].image}'
EOF
)"
)"
SYNC_STATUS_JSON="$(
cluster_bash "$(cat <<EOF
set -euo pipefail
kubectl -n $(quote "$NAMESPACE") get job actcore-sync -o json
EOF
)" | python3 -c 'import json,sys; j=json.load(sys.stdin); s=j.get("status",{}); print(json.dumps({"name": j["metadata"]["name"], "succeeded": s.get("succeeded", 0), "failed": s.get("failed", 0), "completion_time": s.get("completionTime")}))'
)"
export API_IMAGE SYNC_STATUS_JSON
CURRENT_GATE="disabled definition check"
log "checking ${DEFINITION_SLUG} is present and disabled"
DEFINITION_JSON="$(
cluster_bash "$(cat <<EOF
set -euo pipefail
kubectl -n $(quote "$NAMESPACE") exec -i deploy/actcore-api -- python - $(quote "$DEFINITION_ID") $(quote "$DEFINITION_NAME") <<'PY'
import json
import sys
import urllib.request
definition_id = sys.argv[1]
expected_name = sys.argv[2]
with urllib.request.urlopen("http://localhost:8010/activity-definitions/", timeout=30) as resp:
definitions = json.load(resp)
for definition in definitions:
if definition.get("id") == definition_id:
if definition.get("enabled") is not False:
raise SystemExit(f"definition {definition_id} exists but enabled={definition.get('enabled')!r}")
if definition.get("name") != expected_name:
raise SystemExit(f"definition {definition_id} name mismatch: {definition.get('name')!r}")
print(json.dumps({
"id": definition["id"],
"slug": "ops-service-inventory-probes",
"name": definition["name"],
"enabled": definition["enabled"],
"trigger_type": definition.get("trigger_type"),
"version": definition.get("version"),
}))
break
else:
raise SystemExit(f"definition {definition_id} not found")
PY
EOF
)"
)"
if [[ -z "$DEFINITION_JSON" ]]; then
printf 'definition check produced no output\n' >&2
exit 1
fi
export DEFINITION_JSON
CURRENT_GATE="manual disabled trigger"
log "triggering disabled definition manually"
TRIGGER_JSON="$(
cluster_bash "$(cat <<EOF
set -euo pipefail
kubectl -n $(quote "$NAMESPACE") exec -i deploy/actcore-api -- python - $(quote "$DEFINITION_ID") <<'PY'
import json
import sys
import urllib.request
definition_id = sys.argv[1]
req = urllib.request.Request(
f"http://localhost:8010/activity-definitions/{definition_id}/trigger",
method="POST",
)
with urllib.request.urlopen(req, timeout=30) as resp:
print(resp.read().decode("utf-8"))
PY
EOF
)"
)"
if [[ -z "$TRIGGER_JSON" ]]; then
printf 'manual trigger produced no output\n' >&2
exit 1
fi
export TRIGGER_JSON
CURRENT_GATE="State Hub ops_inventory_probe evidence"
log "polling State Hub for ops_inventory_probe progress"
PROGRESS_JSON="$(
python3 - <<'PY'
from datetime import datetime, timezone
import json
import os
import time
import urllib.parse
import urllib.request
base = os.environ["STATE_HUB_URL"].rstrip("/")
started = datetime.fromisoformat(os.environ["STARTED_AT"].replace("Z", "+00:00"))
timeout = int(os.environ["STATE_HUB_PROGRESS_TIMEOUT_SECONDS"])
interval = int(os.environ["STATE_HUB_PROGRESS_POLL_SECONDS"])
deadline = time.monotonic() + timeout
url = base + "/progress/?" + urllib.parse.urlencode({"event_type": "ops_inventory_probe"})
while time.monotonic() < deadline:
with urllib.request.urlopen(url, timeout=20) as resp:
events = json.load(resp)
for event in events:
created_at = datetime.fromisoformat(event["created_at"].replace("Z", "+00:00"))
if created_at >= started:
detail = event.get("detail") or {}
print(json.dumps({
"id": event["id"],
"event_type": event.get("event_type"),
"summary": event.get("summary"),
"author": event.get("author"),
"created_at": event.get("created_at"),
"detail_keys": sorted(detail.keys()) if isinstance(detail, dict) else [],
}))
raise SystemExit(0)
time.sleep(interval)
raise SystemExit(f"no ops_inventory_probe progress found after {timeout}s")
PY
)"
export PROGRESS_JSON
CURRENT_GATE="State Hub evidence note"
log "posting non-secret evidence note to State Hub"
EVIDENCE_NOTE_JSON="$(post_evidence "passed" "")"
export EVIDENCE_NOTE_JSON
trap - ERR
log "verification passed"
printf '%s\n' "$EVIDENCE_NOTE_JSON"