Add Core Hub ops evidence sink

This commit is contained in:
2026-06-27 20:34:25 +02:00
parent 18fcce87fe
commit 30043348f0
3 changed files with 364 additions and 0 deletions

View File

@@ -2,7 +2,9 @@
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any
import httpx
@@ -16,6 +18,10 @@ _INTER_HUB_SINK_TYPES = {
"inter-hub-event",
"inter-hub-interaction-event",
}
_CORE_HUB_SINK_TYPES = {
"core-hub",
"core-hub-interaction-event",
}
def persist_ops_inventory_evidence(payload: dict[str, Any]) -> list[dict[str, Any]]:
@@ -56,6 +62,12 @@ def persist_ops_inventory_evidence(payload: dict[str, Any]) -> list[dict[str, An
results.append(
_post_state_hub_progress(payload, bind_key, probe_result, sink)
)
elif sink_type in _CORE_HUB_SINK_TYPES:
results.append(
_post_core_hub_interaction_event(
payload, bind_key, probe_result, sink
)
)
elif sink_type in _INTER_HUB_SINK_TYPES:
results.append(_inter_hub_result(sink))
else:
@@ -159,6 +171,213 @@ def _progress_exists(base_url: str, event_type: str, idempotency_key: str) -> bo
return False
def _post_core_hub_interaction_event(
payload: dict[str, Any],
context_key: str,
probe_result: dict[str, Any],
sink: dict[str, Any],
) -> dict[str, Any]:
raw_base_url = (
sink.get("core_hub_url")
or sink.get("base_url")
or os.environ.get("CORE_HUB_BASE_URL")
or ""
)
base_url = str(raw_base_url).rstrip("/")
runtime_token = _core_hub_runtime_token(sink)
widget_id = _core_hub_widget_id(sink, probe_result)
missing: list[str] = []
if not base_url:
missing.append("CORE_HUB_BASE_URL")
if not runtime_token:
missing.append("CORE_HUB_RUNTIME_TOKEN or CORE_HUB_RUNTIME_TOKEN_FILE")
if not widget_id:
missing.append("widget_id or CORE_HUB_WIDGET_ID")
if missing:
return {
"type": sink.get("type"),
"status": "skipped",
"reason": "missing_core_hub_config",
"missing": missing,
"context_key": context_key,
}
endpoint = _selected_endpoint(probe_result, sink)
event_type = sink.get("event_type", "ops-endpoint-verified")
timeout = float(sink.get("timeout_seconds", 10.0))
body = {
"widgetId": widget_id,
"eventType": event_type,
"viewContext": _core_hub_view_context(payload, context_key, endpoint, sink),
"metadata": _core_hub_metadata(payload, context_key, probe_result, endpoint),
}
resp = httpx.post(
f"{base_url}/api/v2/interaction-events",
json=body,
headers=_core_hub_headers(runtime_token),
timeout=timeout,
)
resp.raise_for_status()
data = resp.json()
event_id = data.get("id")
if not event_id:
raise RuntimeError("Core Hub interaction event response did not include an id")
if not _core_hub_event_exists(base_url, runtime_token, str(event_id), timeout):
raise RuntimeError("Core Hub interaction event was not visible after create")
return {
"type": sink.get("type"),
"status": "posted",
"event_type": data.get("eventType", event_type),
"event_id": event_id,
"widget_id": data.get("widgetId", widget_id),
"verified": True,
"context_key": context_key,
}
def _core_hub_headers(runtime_token: str) -> dict[str, str]:
return {
"Accept": "application/json",
"Authorization": f"Bearer {runtime_token}",
"Content-Type": "application/json",
"User-Agent": "activity-core-ops-evidence/0.1",
}
def _core_hub_runtime_token(sink: dict[str, Any]) -> str:
token_file = (
sink.get("runtime_token_file")
or sink.get("token_file")
or os.environ.get("CORE_HUB_RUNTIME_TOKEN_FILE")
)
if token_file:
return Path(str(token_file)).read_text(encoding="utf-8").strip()
env_name = (
sink.get("runtime_token_env")
or os.environ.get("CORE_HUB_RUNTIME_TOKEN_ENV")
or "CORE_HUB_RUNTIME_TOKEN"
)
return os.environ.get(str(env_name), "").strip()
def _core_hub_widget_id(sink: dict[str, Any], probe_result: dict[str, Any]) -> str:
direct = sink.get("widget_id") or os.environ.get("CORE_HUB_WIDGET_ID")
if direct:
return str(direct)
endpoint = _selected_endpoint(probe_result, sink)
widget_ref = endpoint.get("widget_ref") if endpoint else None
if not widget_ref:
return ""
mapping = sink.get("widget_mapping") or sink.get("capability_mapping")
if mapping is None:
mapping = os.environ.get("CORE_HUB_WIDGET_MAPPING")
parsed = _parse_widget_mapping(mapping)
return parsed.get(str(widget_ref), "")
def _parse_widget_mapping(raw: Any) -> dict[str, str]:
if isinstance(raw, dict):
return {str(key): str(value) for key, value in raw.items() if value}
if not isinstance(raw, str) or not raw.strip():
return {}
value = raw.strip()
if value.startswith("{"):
try:
loaded = json.loads(value)
except json.JSONDecodeError:
return {}
if isinstance(loaded, dict):
return {str(key): str(item) for key, item in loaded.items() if item}
return {}
if "=" not in value:
return {}
pairs: dict[str, str] = {}
for part in value.split(","):
key, _, item = part.partition("=")
if key.strip() and item.strip():
pairs[key.strip()] = item.strip()
return pairs
def _selected_endpoint(probe_result: dict[str, Any], sink: dict[str, Any]) -> dict[str, Any]:
endpoints = [
endpoint
for endpoint in probe_result.get("endpoints", [])
if isinstance(endpoint, dict)
]
endpoint_id = sink.get("endpoint_id")
if endpoint_id:
match = next(
(endpoint for endpoint in endpoints if endpoint.get("endpoint_id") == endpoint_id),
None,
)
if match:
return match
return next(
(endpoint for endpoint in endpoints if endpoint.get("widget_ref")),
endpoints[0] if endpoints else {},
)
def _core_hub_view_context(
payload: dict[str, Any],
context_key: str,
endpoint: dict[str, Any],
sink: dict[str, Any],
) -> str:
return str(
sink.get("view_context")
or endpoint.get("view_context")
or f"activity-core/ops-inventory/{payload.get('run_id', 'unknown')}/{context_key}"
)
def _core_hub_metadata(
payload: dict[str, Any],
context_key: str,
probe_result: dict[str, Any],
endpoint: dict[str, Any],
) -> dict[str, Any]:
compact = _compact_probe_result(probe_result)
return {
"activity_id": payload.get("activity_id"),
"activity_core_run_id": payload.get("run_id"),
"scheduled_for": payload.get("scheduled_for"),
"source_type": "ops-inventory",
"context_key": context_key,
"probe": {
"generated_at": compact.get("generated_at"),
"inventory_path": compact.get("inventory_path"),
"status": compact.get("status"),
"reason": compact.get("reason"),
"summary": compact.get("summary", {}),
},
"endpoint": _compact_endpoint(endpoint) if endpoint else {},
}
def _core_hub_event_exists(
base_url: str,
runtime_token: str,
event_id: str,
timeout: float,
) -> bool:
resp = httpx.get(
f"{base_url}/api/v2/interaction-events",
headers=_core_hub_headers(runtime_token),
timeout=timeout,
)
resp.raise_for_status()
payload = resp.json()
data = payload.get("data") if isinstance(payload, dict) else []
if not isinstance(data, list):
return False
return any(isinstance(item, dict) and item.get("id") == event_id for item in data)
def _inter_hub_result(sink: dict[str, Any]) -> dict[str, Any]:
missing: list[str] = []
if not (sink.get("inter_hub_url") or os.environ.get("INTER_HUB_URL")):