Files
railiance-cluster/tools/cmd/railiance-deploy-activity-core-triage-robustness
2026-07-02 10:44:06 +02:00

264 lines
13 KiB
Bash
Executable File

#!/usr/bin/env bash
# Deploy ACTIVITY-WP-0016 code/schema/runtime together and prove daily-triage output.
set -euo pipefail
NAMESPACE="${ACTIVITY_CORE_NAMESPACE:-activity-core}"
CLUSTER_HOST="${ACTIVITY_CORE_CLUSTER_HOST:-railiance01}"
STATE_HUB_URL="${STATE_HUB_URL:-http://127.0.0.1:8000}"
ACTIVITY_CORE_REPO="${ACTIVITY_CORE_REPO:-/home/worsch/activity-core}"
ACTIVITY_CORE_REMOTE_REPO="${ACTIVITY_CORE_REMOTE_REPO:-}"
ACTIVITY_CORE_ALLOW_LOCAL_KUBECTL="${ACTIVITY_CORE_ALLOW_LOCAL_KUBECTL:-0}"
ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE="${ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE:-auto}"
ACTIVITY_CORE_RESTART_DEPLOYMENTS="${ACTIVITY_CORE_RESTART_DEPLOYMENTS:-1}"
REQUIRED_ACTIVITY_CORE_REV="${REQUIRED_ACTIVITY_CORE_REV:-bf877b7}"
DAILY_TRIAGE_DEFINITION_SLUG="${DAILY_TRIAGE_DEFINITION_SLUG:-daily-statehub-wsjf-triage}"
STATE_HUB_PROGRESS_TIMEOUT_SECONDS="${STATE_HUB_PROGRESS_TIMEOUT_SECONDS:-240}"
STATE_HUB_PROGRESS_POLL_SECONDS="${STATE_HUB_PROGRESS_POLL_SECONDS:-5}"
EVIDENCE_WORKSTREAM_ID="${STATE_HUB_EVIDENCE_WORKSTREAM_ID:-7cbbe0d6-fea9-41c6-840c-46d0d8e8edde}"
EVIDENCE_TASK_ID="${STATE_HUB_EVIDENCE_TASK_ID:-8096621a-54ee-4be5-943e-5dc2da19ed28}"
STARTED_AT="$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
CURRENT_GATE=startup
REMOTE_REVISION=""
CONTRACT_JSON=""
API_IMAGE=""
API_IMAGE_ID=""
WORKER_IMAGE=""
WORKER_IMAGE_ID=""
SYNC_STATUS_JSON=""
TRIGGER_JSON=""
DEFINITION_ID=""
TRIGGER_KEY=""
EXPECTED_RUN_ID=""
PROGRESS_JSON=""
export NAMESPACE CLUSTER_HOST STATE_HUB_URL ACTIVITY_CORE_REMOTE_REPO REQUIRED_ACTIVITY_CORE_REV
export DAILY_TRIAGE_DEFINITION_SLUG STARTED_AT EVIDENCE_WORKSTREAM_ID EVIDENCE_TASK_ID
export STATE_HUB_PROGRESS_TIMEOUT_SECONDS STATE_HUB_PROGRESS_POLL_SECONDS
export REMOTE_REVISION CONTRACT_JSON API_IMAGE API_IMAGE_ID WORKER_IMAGE WORKER_IMAGE_ID
export SYNC_STATUS_JSON TRIGGER_JSON DEFINITION_ID TRIGGER_KEY EXPECTED_RUN_ID PROGRESS_JSON
log() { printf '[activity-core-triage-robustness] %s\n' "$*"; }
quote() { printf '%q' "$1"; }
cluster_bash() { if [[ -n "$CLUSTER_HOST" ]]; then ssh "$CLUSTER_HOST" "bash -s" <<<"$1"; else bash -s <<<"$1"; fi; }
should_sync_runtime_bundle() {
case "$ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE" in
1|true|yes) return 0 ;;
0|false|no) return 1 ;;
auto) [[ -n "$CLUSTER_HOST" && -d "$ACTIVITY_CORE_REPO/k8s/railiance" ]]; return ;;
*) printf 'invalid ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE=%s\n' "$ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE" >&2; exit 2 ;;
esac
}
post_evidence() {
local status="$1" failing_gate="${2:-}"
export EVIDENCE_STATUS="$status" FAILING_GATE="$failing_gate"
python3 - <<'PY'
import json, os, sys, urllib.request
def env_json(name):
raw = os.environ.get(name, "")
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return {"raw": raw}
status = os.environ["EVIDENCE_STATUS"]
failing_gate = os.environ.get("FAILING_GATE") or None
detail = {
"producer": "railiance-cluster",
"verification": "activity-core WP-0016 coupled deploy and daily-triage smoke",
"status": status,
"failing_gate": failing_gate,
"cluster_host": os.environ.get("CLUSTER_HOST") or "local-kubectl",
"namespace": os.environ.get("NAMESPACE"),
"activity_core_repo": os.environ.get("ACTIVITY_CORE_REMOTE_REPO"),
"required_activity_core_revision": os.environ.get("REQUIRED_ACTIVITY_CORE_REV"),
"activity_core_revision": os.environ.get("REMOTE_REVISION") or None,
"runtime_bundle": "k8s/railiance/20-runtime.yaml",
"runtime_contract": env_json("CONTRACT_JSON"),
"sync_job": env_json("SYNC_STATUS_JSON"),
"api_image": os.environ.get("API_IMAGE") or None,
"api_image_id": os.environ.get("API_IMAGE_ID") or None,
"worker_image": os.environ.get("WORKER_IMAGE") or None,
"worker_image_id": os.environ.get("WORKER_IMAGE_ID") or None,
"definition_slug": os.environ.get("DAILY_TRIAGE_DEFINITION_SLUG"),
"definition_id": os.environ.get("DEFINITION_ID") or None,
"manual_trigger": env_json("TRIGGER_JSON"),
"expected_activity_core_run_id": os.environ.get("EXPECTED_RUN_ID") or None,
"state_hub_progress": env_json("PROGRESS_JSON"),
"started_at": os.environ.get("STARTED_AT"),
}
summary = (
"Railiance activity-core WP-0016 deploy/smoke passed: code/schema and bounded runtime contract were reconciled together, daily triage was triggered, and State Hub recorded schema-valid output."
if status == "passed"
else "Railiance activity-core WP-0016 deploy/smoke failed" + (f" at {failing_gate}" if failing_gate else "") + "; see non-secret evidence detail."
)
payload = {"summary": summary, "event_type": "note", "author": "railiance-cluster", "detail": detail}
if os.environ.get("EVIDENCE_WORKSTREAM_ID"):
payload["workstream_id"] = os.environ["EVIDENCE_WORKSTREAM_ID"]
if os.environ.get("EVIDENCE_TASK_ID"):
payload["task_id"] = os.environ["EVIDENCE_TASK_ID"]
req = urllib.request.Request(os.environ["STATE_HUB_URL"].rstrip("/") + "/progress/", data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"}, method="POST")
with urllib.request.urlopen(req, timeout=20) as resp:
sys.stdout.write(resp.read().decode())
PY
}
on_error() { local code=$?; trap - ERR; post_evidence failed "$CURRENT_GATE" >/dev/null || true; exit "$code"; }
trap on_error ERR
if [[ "$CLUSTER_HOST" == local ]]; then
[[ "$ACTIVITY_CORE_ALLOW_LOCAL_KUBECTL" == 1 ]] || { echo 'ACTIVITY_CORE_CLUSTER_HOST=local requires ACTIVITY_CORE_ALLOW_LOCAL_KUBECTL=1' >&2; exit 2; }
CLUSTER_HOST=""
fi
if [[ -z "$ACTIVITY_CORE_REMOTE_REPO" ]]; then
if [[ -n "$CLUSTER_HOST" ]]; then ACTIVITY_CORE_REMOTE_REPO="$(ssh "$CLUSTER_HOST" pwd)/activity-core"; else ACTIVITY_CORE_REMOTE_REPO="$ACTIVITY_CORE_REPO"; fi
fi
export CLUSTER_HOST ACTIVITY_CORE_REMOTE_REPO
CURRENT_GATE='cluster executor preflight'
log "using cluster executor: ${CLUSTER_HOST:-local kubectl}"
cluster_bash 'set -euo pipefail; command -v kubectl >/dev/null; command -v python3 >/dev/null'
CURRENT_GATE='runtime bundle sync'
if should_sync_runtime_bundle; then
log "syncing runtime bundle to ${CLUSTER_HOST}:${ACTIVITY_CORE_REMOTE_REPO}/k8s/railiance"
ssh "$CLUSTER_HOST" "mkdir -p $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance"
rsync -a --delete "$ACTIVITY_CORE_REPO/k8s/railiance/" "${CLUSTER_HOST}:${ACTIVITY_CORE_REMOTE_REPO}/k8s/railiance/"
fi
CURRENT_GATE='activity-core revision gate'
REMOTE_REVISION="$(cluster_bash "set -euo pipefail; git -C $(quote "$ACTIVITY_CORE_REMOTE_REPO") rev-parse --short HEAD; git -C $(quote "$ACTIVITY_CORE_REMOTE_REPO") merge-base --is-ancestor $(quote "$REQUIRED_ACTIVITY_CORE_REV") HEAD")"
export REMOTE_REVISION
CURRENT_GATE='runtime contract gate'
CONTRACT_JSON="$(
cluster_bash "$(cat <<EOF
set -euo pipefail
python3 - $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance/20-runtime.yaml <<'PY'
import json, re, sys
text = open(sys.argv[1], encoding='utf-8').read()
lower = text.lower()
max_tokens = [int(v) for v in re.findall(r"max_tokens\s*[:=]\s*['\"]?(\d+)", text)]
checks = {
'mentions_daily_instruction': 'daily-statehub-wsjf-triage' in lower,
'bounded_top_7': bool(re.search(r'(top[- ]?7|<=\s*7|≤\s*7|at most\s+7|no more than\s+7)', lower)),
'fewer_well_formed': 'fewer well-formed' in lower,
'ndjson_or_line_framing': 'ndjson' in lower or 'one recommendation json object per line' in lower,
'max_tokens_headroom': bool(max_tokens and max(max_tokens) >= 1800),
}
missing = [name for name, ok in checks.items() if not ok]
print(json.dumps({'path': sys.argv[1], 'max_tokens': max_tokens, 'checks': checks, 'missing': missing}, sort_keys=True))
if missing:
raise SystemExit('runtime bundle contract checks failed: ' + ', '.join(missing))
PY
EOF
)"
)"
export CONTRACT_JSON
CURRENT_GATE='runtime bundle reconcile'
log 'applying runtime bundle and restarting activity-core deployments'
cluster_bash "set -euo pipefail
kubectl apply -f $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance/00-namespace.yaml
kubectl -n $(quote "$NAMESPACE") delete job actcore-migrate actcore-sync --ignore-not-found
kubectl apply -f $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance/20-runtime.yaml
if [[ $(quote "$ACTIVITY_CORE_RESTART_DEPLOYMENTS") == 1 ]]; then kubectl -n $(quote "$NAMESPACE") rollout restart deploy/actcore-api deploy/actcore-worker deploy/actcore-event-router; fi
kubectl -n $(quote "$NAMESPACE") wait --for=condition=complete job/actcore-migrate --timeout=180s
kubectl -n $(quote "$NAMESPACE") rollout status deploy/actcore-api --timeout=180s
kubectl -n $(quote "$NAMESPACE") rollout status deploy/actcore-worker --timeout=180s
kubectl -n $(quote "$NAMESPACE") rollout status deploy/actcore-event-router --timeout=180s
kubectl -n $(quote "$NAMESPACE") wait --for=condition=complete job/actcore-sync --timeout=180s"
CURRENT_GATE='runtime status capture'
API_IMAGE="$(cluster_bash "kubectl -n $(quote "$NAMESPACE") get deploy actcore-api -o jsonpath='{.spec.template.spec.containers[0].image}'")"
API_IMAGE_ID="$(cluster_bash "kubectl -n $(quote "$NAMESPACE") get pod -l app.kubernetes.io/name=actcore-api -o jsonpath='{.items[0].status.containerStatuses[0].imageID}'")"
WORKER_IMAGE="$(cluster_bash "kubectl -n $(quote "$NAMESPACE") get deploy actcore-worker -o jsonpath='{.spec.template.spec.containers[0].image}'")"
WORKER_IMAGE_ID="$(cluster_bash "kubectl -n $(quote "$NAMESPACE") get pod -l app.kubernetes.io/name=actcore-worker -o jsonpath='{.items[0].status.containerStatuses[0].imageID}'")"
SYNC_STATUS_JSON="$(cluster_bash "kubectl -n $(quote "$NAMESPACE") get job actcore-sync -o json" | python3 -c 'import json,sys; j=json.load(sys.stdin); s=j.get("status",{}); print(json.dumps({"name":j["metadata"]["name"],"succeeded":s.get("succeeded",0),"failed":s.get("failed",0),"completion_time":s.get("completionTime")}))')"
export API_IMAGE API_IMAGE_ID WORKER_IMAGE WORKER_IMAGE_ID SYNC_STATUS_JSON
CURRENT_GATE='daily-triage manual trigger'
log "triggering ${DAILY_TRIAGE_DEFINITION_SLUG}"
TRIGGER_JSON="$(
cluster_bash "$(cat <<EOF
set -euo pipefail
kubectl -n $(quote "$NAMESPACE") exec -i deploy/actcore-api -- python - $(quote "$DAILY_TRIAGE_DEFINITION_SLUG") <<'PY'
import json, sys, urllib.request
slug = sys.argv[1]
with urllib.request.urlopen('http://localhost:8010/activity-definitions/', timeout=30) as resp:
definitions = json.load(resp)
match = None
for definition in definitions:
values = [str(definition.get(k) or '') for k in ('slug', 'name', 'id')]
if slug in values or any(slug in value for value in values):
match = definition
break
if not match:
raise SystemExit(f'definition matching {slug!r} not found')
definition_id = match['id']
req = urllib.request.Request(f'http://localhost:8010/activity-definitions/{definition_id}/trigger', method='POST')
with urllib.request.urlopen(req, timeout=30) as resp:
payload = json.loads(resp.read().decode())
payload['definition_id'] = definition_id
print(json.dumps(payload, sort_keys=True))
PY
EOF
)"
)"
DEFINITION_ID="$(python3 -c 'import json,os; print(json.loads(os.environ["TRIGGER_JSON"])["definition_id"])')"
TRIGGER_KEY="$(python3 -c 'import json,os; t=json.loads(os.environ["TRIGGER_JSON"]); print(t.get("trigger_key") or t.get("workflow_id") or "")')"
EXPECTED_RUN_ID="$(python3 - <<'PY'
import os, uuid
trigger_key = os.environ.get('TRIGGER_KEY')
definition_id = os.environ.get('DEFINITION_ID')
print(uuid.uuid5(uuid.NAMESPACE_URL, f'{definition_id}:{trigger_key}') if trigger_key else '')
PY
)"
export TRIGGER_JSON DEFINITION_ID TRIGGER_KEY EXPECTED_RUN_ID
CURRENT_GATE='State Hub daily_triage evidence'
log 'polling State Hub for schema-valid daily_triage progress'
PROGRESS_JSON="$(python3 - <<'PY'
from datetime import datetime
import json, os, time, urllib.parse, urllib.request
base = os.environ['STATE_HUB_URL'].rstrip('/')
started = datetime.fromisoformat(os.environ['STARTED_AT'].replace('Z', '+00:00'))
deadline = time.monotonic() + int(os.environ['STATE_HUB_PROGRESS_TIMEOUT_SECONDS'])
interval = int(os.environ['STATE_HUB_PROGRESS_POLL_SECONDS'])
expected_run_id = os.environ.get('EXPECTED_RUN_ID')
url = base + '/progress/?' + urllib.parse.urlencode({'event_type': 'daily_triage'})
while time.monotonic() < deadline:
with urllib.request.urlopen(url, timeout=20) as resp:
events = json.load(resp)
for event in events:
created_at = datetime.fromisoformat(event['created_at'].replace('Z', '+00:00'))
if created_at < started:
continue
detail = event.get('detail') or {}
if expected_run_id and isinstance(detail, dict):
run_id = detail.get('activity_core_run_id') or detail.get('run_id')
if run_id and run_id != expected_run_id:
continue
if not isinstance(detail, dict) or detail.get('output_validated') is not True:
continue
if detail.get('partial') is True and int(detail.get('quarantined_count') or 0) <= 0:
continue
print(json.dumps({'id': event['id'], 'event_type': event.get('event_type'), 'summary': event.get('summary'), 'author': event.get('author'), 'created_at': event.get('created_at'), 'output_validated': detail.get('output_validated'), 'partial': detail.get('partial'), 'quarantined_count': detail.get('quarantined_count'), 'activity_core_run_id': detail.get('activity_core_run_id'), 'detail_keys': sorted(detail.keys())}))
raise SystemExit(0)
time.sleep(interval)
raise SystemExit('no schema-valid daily_triage progress found')
PY
)"
export PROGRESS_JSON
CURRENT_GATE='State Hub evidence note'
log 'posting non-secret evidence note to State Hub'
post_evidence passed ''
trap - ERR
log 'verification passed'