#!/usr/bin/env bash
# Cluster-owned activity-core runtime reconcile and ops inventory probe evidence path.
set -euo pipefail

ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"

NAMESPACE="${ACTIVITY_CORE_NAMESPACE:-activity-core}"
DEFINITION_ID="${ACTIVITY_CORE_OPS_DEFINITION_ID:-40d15a87-7ff6-4d8e-992c-37df15f95110}"
DEFINITION_SLUG="${ACTIVITY_CORE_OPS_DEFINITION_SLUG:-ops-service-inventory-probes}"
DEFINITION_NAME="${ACTIVITY_CORE_OPS_DEFINITION_NAME:-Ops Service Inventory Probes}"

STATE_HUB_URL="${STATE_HUB_URL:-http://127.0.0.1:8000}"
STATE_HUB_PROGRESS_TIMEOUT_SECONDS="${STATE_HUB_PROGRESS_TIMEOUT_SECONDS:-180}"
STATE_HUB_PROGRESS_POLL_SECONDS="${STATE_HUB_PROGRESS_POLL_SECONDS:-5}"

ACTIVITY_CORE_REPO="${ACTIVITY_CORE_REPO:-/home/worsch/activity-core}"
ACTIVITY_CORE_REMOTE_REPO="${ACTIVITY_CORE_REMOTE_REPO:-}"
ACTIVITY_CORE_CLUSTER_HOST="${ACTIVITY_CORE_CLUSTER_HOST:-railiance01}"
ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE="${ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE:-auto}"
ACTIVITY_CORE_RESTART_DEPLOYMENTS="${ACTIVITY_CORE_RESTART_DEPLOYMENTS:-0}"
if [[ "$ACTIVITY_CORE_CLUSTER_HOST" == "local" ]]; then
  ACTIVITY_CORE_CLUSTER_HOST=""
fi
if [[ -z "$ACTIVITY_CORE_REMOTE_REPO" ]]; then
  if [[ -n "$ACTIVITY_CORE_CLUSTER_HOST" ]]; then
    ACTIVITY_CORE_REMOTE_REPO="$(ssh "$ACTIVITY_CORE_CLUSTER_HOST" pwd)/activity-core"
  else
    ACTIVITY_CORE_REMOTE_REPO="$ACTIVITY_CORE_REPO"
  fi
fi

EVIDENCE_WORKSTREAM_ID="${STATE_HUB_EVIDENCE_WORKSTREAM_ID:-c91a0946-92f9-4b41-8a92-005b29952916}"
EVIDENCE_TASK_ID="${STATE_HUB_EVIDENCE_TASK_ID:-d15fc947-3fbe-4269-93c6-d98577352149}"
INTER_HUB_SUBMISSION_STATUS="${INTER_HUB_SUBMISSION_STATUS:-deferred}"
INTER_HUB_DEFER_REASON="${INTER_HUB_DEFER_REASON:-ops-hub key custody and Inter-Hub production intake remain operator-gated; State Hub fallback evidence is accepted for this handoff}"

STARTED_AT="$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
CURRENT_GATE="startup"
REMOTE_REVISION=""
API_IMAGE=""
SYNC_STATUS_JSON=""
DEFINITION_JSON=""
TRIGGER_JSON=""
PROGRESS_JSON=""
EVIDENCE_NOTE_JSON=""

export NAMESPACE DEFINITION_ID DEFINITION_SLUG DEFINITION_NAME
export STATE_HUB_URL EVIDENCE_WORKSTREAM_ID EVIDENCE_TASK_ID
export STATE_HUB_PROGRESS_TIMEOUT_SECONDS STATE_HUB_PROGRESS_POLL_SECONDS
export INTER_HUB_SUBMISSION_STATUS INTER_HUB_DEFER_REASON STARTED_AT
export ACTIVITY_CORE_CLUSTER_HOST ACTIVITY_CORE_REMOTE_REPO
export ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE ACTIVITY_CORE_RESTART_DEPLOYMENTS
export REMOTE_REVISION API_IMAGE SYNC_STATUS_JSON DEFINITION_JSON TRIGGER_JSON PROGRESS_JSON

log() {
  printf '[activity-core-verify] %s\n' "$*"
}

quote() {
  printf '%q' "$1"
}

cluster_bash() {
  local script="$1"
  if [[ -n "$ACTIVITY_CORE_CLUSTER_HOST" ]]; then
    ssh "$ACTIVITY_CORE_CLUSTER_HOST" "bash -s" <<<"$script"
  else
    bash -s <<<"$script"
  fi
}

should_sync_runtime_bundle() {
  case "$ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE" in
    1|true|yes) return 0 ;;
    0|false|no) return 1 ;;
    auto)
      [[ -n "$ACTIVITY_CORE_CLUSTER_HOST" && -d "$ACTIVITY_CORE_REPO/k8s/railiance" ]]
      return
      ;;
    *)
      printf 'invalid ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE=%s\n' "$ACTIVITY_CORE_SYNC_RUNTIME_BUNDLE" >&2
      exit 2
      ;;
  esac
}

post_evidence() {
  local status="$1"
  local failing_gate="${2:-}"
  export EVIDENCE_STATUS="$status"
  export FAILING_GATE="$failing_gate"
  python3 - <<'PY'
import json
import os
import sys
import urllib.request

def from_json_env(name):
    raw = os.environ.get(name, "")
    if not raw:
        return None
    try:
        return json.loads(raw)
    except json.JSONDecodeError:
        return {"raw": raw}

status = os.environ["EVIDENCE_STATUS"]
failing_gate = os.environ.get("FAILING_GATE") or None
definition = from_json_env("DEFINITION_JSON")
trigger = from_json_env("TRIGGER_JSON")
progress = from_json_env("PROGRESS_JSON")
sync_status = from_json_env("SYNC_STATUS_JSON")

detail = {
    "producer": "railiance-cluster",
    "verification": "activity-core cluster-owned deploy/verify",
    "status": status,
    "failing_gate": failing_gate,
    "cluster_host": os.environ.get("ACTIVITY_CORE_CLUSTER_HOST") or "local-kubectl",
    "namespace": os.environ.get("NAMESPACE"),
    "activity_core_repo": os.environ.get("ACTIVITY_CORE_REMOTE_REPO"),
    "activity_core_revision": os.environ.get("REMOTE_REVISION") or None,
    "api_image": os.environ.get("API_IMAGE") or None,
    "runtime_bundle": "k8s/railiance/20-runtime.yaml",
    "sync_job": sync_status,
    "definition": definition,
    "manual_trigger": trigger,
    "state_hub_progress": progress,
    "inter_hub_submission": {
        "status": os.environ.get("INTER_HUB_SUBMISSION_STATUS"),
        "reason": os.environ.get("INTER_HUB_DEFER_REASON"),
    },
    "started_at": os.environ.get("STARTED_AT"),
}

if status == "passed":
    summary = (
        "Railiance activity-core deploy/verify passed: runtime reconciled, "
        "actcore-sync completed, ops-service-inventory-probes remains disabled, "
        f"manual trigger {trigger.get('workflow_id') if isinstance(trigger, dict) else 'unknown'} ran, "
        f"and State Hub ops_inventory_probe progress {progress.get('id') if isinstance(progress, dict) else 'unknown'} exists."
    )
else:
    summary = (
        "Railiance activity-core deploy/verify failed"
        + (f" at {failing_gate}" if failing_gate else "")
        + "; see non-secret evidence detail for the last completed gate."
    )

payload = {
    "summary": summary,
    "event_type": "note",
    "author": "railiance-cluster",
    "detail": detail,
}
if os.environ.get("EVIDENCE_WORKSTREAM_ID"):
    payload["workstream_id"] = os.environ["EVIDENCE_WORKSTREAM_ID"]
if os.environ.get("EVIDENCE_TASK_ID"):
    payload["task_id"] = os.environ["EVIDENCE_TASK_ID"]

body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
    os.environ["STATE_HUB_URL"].rstrip("/") + "/progress/",
    data=body,
    headers={"Content-Type": "application/json"},
    method="POST",
)
try:
    with urllib.request.urlopen(req, timeout=20) as resp:
        sys.stdout.write(resp.read().decode("utf-8"))
except Exception as exc:
    sys.stderr.write(f"failed to post State Hub evidence note: {exc}\n")
    raise
PY
}

on_error() {
  local code=$?
  trap - ERR
  post_evidence "failed" "$CURRENT_GATE" >/dev/null || true
  exit "$code"
}
trap on_error ERR

CURRENT_GATE="cluster executor preflight"
log "using cluster executor: ${ACTIVITY_CORE_CLUSTER_HOST:-local kubectl}"
cluster_bash "$(cat <<EOF
set -euo pipefail
command -v kubectl >/dev/null
EOF
)"

CURRENT_GATE="runtime bundle sync"
if should_sync_runtime_bundle; then
  if [[ -z "$ACTIVITY_CORE_CLUSTER_HOST" ]]; then
    log "runtime bundle already local at ${ACTIVITY_CORE_REPO}/k8s/railiance"
  else
    log "syncing runtime bundle to ${ACTIVITY_CORE_CLUSTER_HOST}:${ACTIVITY_CORE_REMOTE_REPO}/k8s/railiance"
    ssh "$ACTIVITY_CORE_CLUSTER_HOST" "mkdir -p $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance"
    rsync -a --delete \
      "$ACTIVITY_CORE_REPO/k8s/railiance/" \
      "${ACTIVITY_CORE_CLUSTER_HOST}:${ACTIVITY_CORE_REMOTE_REPO}/k8s/railiance/"
  fi
fi

CURRENT_GATE="runtime bundle preflight"
cluster_bash "$(cat <<EOF
set -euo pipefail
test -f $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance/00-namespace.yaml
test -f $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance/20-runtime.yaml
grep -q $(quote "$DEFINITION_SLUG") $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance/20-runtime.yaml
EOF
)"

CURRENT_GATE="activity-core revision inspection"
REMOTE_REVISION="$(
  cluster_bash "$(cat <<EOF
set -euo pipefail
git -C $(quote "$ACTIVITY_CORE_REMOTE_REPO") rev-parse --short HEAD 2>/dev/null || true
EOF
)"
)"
export REMOTE_REVISION

CURRENT_GATE="runtime bundle reconcile"
log "reconciling activity-core runtime bundle"
cluster_bash "$(cat <<EOF
set -euo pipefail
kubectl apply -f $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance/00-namespace.yaml
kubectl -n $(quote "$NAMESPACE") delete job actcore-migrate actcore-sync --ignore-not-found
kubectl apply -f $(quote "$ACTIVITY_CORE_REMOTE_REPO")/k8s/railiance/20-runtime.yaml
if [[ $(quote "$ACTIVITY_CORE_RESTART_DEPLOYMENTS") == "1" ]]; then
  kubectl -n $(quote "$NAMESPACE") rollout restart deploy/actcore-api deploy/actcore-worker deploy/actcore-event-router
fi
kubectl -n $(quote "$NAMESPACE") wait --for=condition=complete job/actcore-migrate --timeout=180s
kubectl -n $(quote "$NAMESPACE") rollout status deploy/actcore-api --timeout=180s
kubectl -n $(quote "$NAMESPACE") rollout status deploy/actcore-worker --timeout=180s
kubectl -n $(quote "$NAMESPACE") rollout status deploy/actcore-event-router --timeout=180s
kubectl -n $(quote "$NAMESPACE") wait --for=condition=complete job/actcore-sync --timeout=180s
EOF
)"

CURRENT_GATE="live image capability check"
cluster_bash "$(cat <<EOF
set -euo pipefail
kubectl -n $(quote "$NAMESPACE") exec deploy/actcore-api -- python -c 'import activity_core.context_resolvers.ops_inventory; import activity_core.ops_evidence_sinks'
EOF
)"

CURRENT_GATE="runtime status capture"
API_IMAGE="$(
  cluster_bash "$(cat <<EOF
set -euo pipefail
kubectl -n $(quote "$NAMESPACE") get deploy actcore-api -o jsonpath='{.spec.template.spec.containers[0].image}'
EOF
)"
)"
SYNC_STATUS_JSON="$(
  cluster_bash "$(cat <<EOF
set -euo pipefail
kubectl -n $(quote "$NAMESPACE") get job actcore-sync -o json
EOF
)" | python3 -c 'import json,sys; j=json.load(sys.stdin); s=j.get("status",{}); print(json.dumps({"name": j["metadata"]["name"], "succeeded": s.get("succeeded", 0), "failed": s.get("failed", 0), "completion_time": s.get("completionTime")}))'
)"
export API_IMAGE SYNC_STATUS_JSON

CURRENT_GATE="disabled definition check"
log "checking ${DEFINITION_SLUG} is present and disabled"
DEFINITION_JSON="$(
  cluster_bash "$(cat <<EOF
set -euo pipefail
kubectl -n $(quote "$NAMESPACE") exec -i deploy/actcore-api -- python - $(quote "$DEFINITION_ID") $(quote "$DEFINITION_NAME") <<'PY'
import json
import sys
import urllib.request

definition_id = sys.argv[1]
expected_name = sys.argv[2]
with urllib.request.urlopen("http://localhost:8010/activity-definitions/", timeout=30) as resp:
    definitions = json.load(resp)

for definition in definitions:
    if definition.get("id") == definition_id:
        if definition.get("enabled") is not False:
            raise SystemExit(f"definition {definition_id} exists but enabled={definition.get('enabled')!r}")
        if definition.get("name") != expected_name:
            raise SystemExit(f"definition {definition_id} name mismatch: {definition.get('name')!r}")
        print(json.dumps({
            "id": definition["id"],
            "slug": "ops-service-inventory-probes",
            "name": definition["name"],
            "enabled": definition["enabled"],
            "trigger_type": definition.get("trigger_type"),
            "version": definition.get("version"),
        }))
        break
else:
    raise SystemExit(f"definition {definition_id} not found")
PY
EOF
)"
)"
if [[ -z "$DEFINITION_JSON" ]]; then
  printf 'definition check produced no output\n' >&2
  exit 1
fi
export DEFINITION_JSON

CURRENT_GATE="manual disabled trigger"
log "triggering disabled definition manually"
TRIGGER_JSON="$(
  cluster_bash "$(cat <<EOF
set -euo pipefail
kubectl -n $(quote "$NAMESPACE") exec -i deploy/actcore-api -- python - $(quote "$DEFINITION_ID") <<'PY'
import json
import sys
import urllib.request

definition_id = sys.argv[1]
req = urllib.request.Request(
    f"http://localhost:8010/activity-definitions/{definition_id}/trigger",
    method="POST",
)
with urllib.request.urlopen(req, timeout=30) as resp:
    print(resp.read().decode("utf-8"))
PY
EOF
)"
)"
if [[ -z "$TRIGGER_JSON" ]]; then
  printf 'manual trigger produced no output\n' >&2
  exit 1
fi
export TRIGGER_JSON

CURRENT_GATE="State Hub ops_inventory_probe evidence"
log "polling State Hub for ops_inventory_probe progress"
PROGRESS_JSON="$(
  python3 - <<'PY'
from datetime import datetime, timezone
import json
import os
import time
import urllib.parse
import urllib.request

base = os.environ["STATE_HUB_URL"].rstrip("/")
started = datetime.fromisoformat(os.environ["STARTED_AT"].replace("Z", "+00:00"))
timeout = int(os.environ["STATE_HUB_PROGRESS_TIMEOUT_SECONDS"])
interval = int(os.environ["STATE_HUB_PROGRESS_POLL_SECONDS"])
deadline = time.monotonic() + timeout
url = base + "/progress/?" + urllib.parse.urlencode({"event_type": "ops_inventory_probe"})

while time.monotonic() < deadline:
    with urllib.request.urlopen(url, timeout=20) as resp:
        events = json.load(resp)
    for event in events:
        created_at = datetime.fromisoformat(event["created_at"].replace("Z", "+00:00"))
        if created_at >= started:
            detail = event.get("detail") or {}
            print(json.dumps({
                "id": event["id"],
                "event_type": event.get("event_type"),
                "summary": event.get("summary"),
                "author": event.get("author"),
                "created_at": event.get("created_at"),
                "detail_keys": sorted(detail.keys()) if isinstance(detail, dict) else [],
            }))
            raise SystemExit(0)
    time.sleep(interval)

raise SystemExit(f"no ops_inventory_probe progress found after {timeout}s")
PY
)"
export PROGRESS_JSON

CURRENT_GATE="State Hub evidence note"
log "posting non-secret evidence note to State Hub"
EVIDENCE_NOTE_JSON="$(post_evidence "passed" "")"
export EVIDENCE_NOTE_JSON

trap - ERR
log "verification passed"
printf '%s\n' "$EVIDENCE_NOTE_JSON"
