From 30043348f072412f10831cc00916b96dc22e9ba2 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 27 Jun 2026 20:34:25 +0200 Subject: [PATCH] Add Core Hub ops evidence sink --- src/activity_core/ops_evidence_sinks.py | 219 ++++++++++++++++++ tests/test_ops_evidence_sinks.py | 87 +++++++ ...VITY-WP-0017-core-hub-ops-evidence-sink.md | 58 +++++ 3 files changed, 364 insertions(+) create mode 100644 workplans/ACTIVITY-WP-0017-core-hub-ops-evidence-sink.md diff --git a/src/activity_core/ops_evidence_sinks.py b/src/activity_core/ops_evidence_sinks.py index 803479f..8209084 100644 --- a/src/activity_core/ops_evidence_sinks.py +++ b/src/activity_core/ops_evidence_sinks.py @@ -2,7 +2,9 @@ from __future__ import annotations +import json import os +from pathlib import Path from typing import Any import httpx @@ -16,6 +18,10 @@ _INTER_HUB_SINK_TYPES = { "inter-hub-event", "inter-hub-interaction-event", } +_CORE_HUB_SINK_TYPES = { + "core-hub", + "core-hub-interaction-event", +} def persist_ops_inventory_evidence(payload: dict[str, Any]) -> list[dict[str, Any]]: @@ -56,6 +62,12 @@ def persist_ops_inventory_evidence(payload: dict[str, Any]) -> list[dict[str, An results.append( _post_state_hub_progress(payload, bind_key, probe_result, sink) ) + elif sink_type in _CORE_HUB_SINK_TYPES: + results.append( + _post_core_hub_interaction_event( + payload, bind_key, probe_result, sink + ) + ) elif sink_type in _INTER_HUB_SINK_TYPES: results.append(_inter_hub_result(sink)) else: @@ -159,6 +171,213 @@ def _progress_exists(base_url: str, event_type: str, idempotency_key: str) -> bo return False +def _post_core_hub_interaction_event( + payload: dict[str, Any], + context_key: str, + probe_result: dict[str, Any], + sink: dict[str, Any], +) -> dict[str, Any]: + raw_base_url = ( + sink.get("core_hub_url") + or sink.get("base_url") + or os.environ.get("CORE_HUB_BASE_URL") + or "" + ) + base_url = str(raw_base_url).rstrip("/") + runtime_token = _core_hub_runtime_token(sink) + widget_id = _core_hub_widget_id(sink, probe_result) + + missing: list[str] = [] + if not base_url: + missing.append("CORE_HUB_BASE_URL") + if not runtime_token: + missing.append("CORE_HUB_RUNTIME_TOKEN or CORE_HUB_RUNTIME_TOKEN_FILE") + if not widget_id: + missing.append("widget_id or CORE_HUB_WIDGET_ID") + if missing: + return { + "type": sink.get("type"), + "status": "skipped", + "reason": "missing_core_hub_config", + "missing": missing, + "context_key": context_key, + } + + endpoint = _selected_endpoint(probe_result, sink) + event_type = sink.get("event_type", "ops-endpoint-verified") + timeout = float(sink.get("timeout_seconds", 10.0)) + body = { + "widgetId": widget_id, + "eventType": event_type, + "viewContext": _core_hub_view_context(payload, context_key, endpoint, sink), + "metadata": _core_hub_metadata(payload, context_key, probe_result, endpoint), + } + resp = httpx.post( + f"{base_url}/api/v2/interaction-events", + json=body, + headers=_core_hub_headers(runtime_token), + timeout=timeout, + ) + resp.raise_for_status() + data = resp.json() + event_id = data.get("id") + if not event_id: + raise RuntimeError("Core Hub interaction event response did not include an id") + if not _core_hub_event_exists(base_url, runtime_token, str(event_id), timeout): + raise RuntimeError("Core Hub interaction event was not visible after create") + + return { + "type": sink.get("type"), + "status": "posted", + "event_type": data.get("eventType", event_type), + "event_id": event_id, + "widget_id": data.get("widgetId", widget_id), + "verified": True, + "context_key": context_key, + } + + +def _core_hub_headers(runtime_token: str) -> dict[str, str]: + return { + "Accept": "application/json", + "Authorization": f"Bearer {runtime_token}", + "Content-Type": "application/json", + "User-Agent": "activity-core-ops-evidence/0.1", + } + + +def _core_hub_runtime_token(sink: dict[str, Any]) -> str: + token_file = ( + sink.get("runtime_token_file") + or sink.get("token_file") + or os.environ.get("CORE_HUB_RUNTIME_TOKEN_FILE") + ) + if token_file: + return Path(str(token_file)).read_text(encoding="utf-8").strip() + env_name = ( + sink.get("runtime_token_env") + or os.environ.get("CORE_HUB_RUNTIME_TOKEN_ENV") + or "CORE_HUB_RUNTIME_TOKEN" + ) + return os.environ.get(str(env_name), "").strip() + + +def _core_hub_widget_id(sink: dict[str, Any], probe_result: dict[str, Any]) -> str: + direct = sink.get("widget_id") or os.environ.get("CORE_HUB_WIDGET_ID") + if direct: + return str(direct) + + endpoint = _selected_endpoint(probe_result, sink) + widget_ref = endpoint.get("widget_ref") if endpoint else None + if not widget_ref: + return "" + + mapping = sink.get("widget_mapping") or sink.get("capability_mapping") + if mapping is None: + mapping = os.environ.get("CORE_HUB_WIDGET_MAPPING") + parsed = _parse_widget_mapping(mapping) + return parsed.get(str(widget_ref), "") + + +def _parse_widget_mapping(raw: Any) -> dict[str, str]: + if isinstance(raw, dict): + return {str(key): str(value) for key, value in raw.items() if value} + if not isinstance(raw, str) or not raw.strip(): + return {} + value = raw.strip() + if value.startswith("{"): + try: + loaded = json.loads(value) + except json.JSONDecodeError: + return {} + if isinstance(loaded, dict): + return {str(key): str(item) for key, item in loaded.items() if item} + return {} + if "=" not in value: + return {} + pairs: dict[str, str] = {} + for part in value.split(","): + key, _, item = part.partition("=") + if key.strip() and item.strip(): + pairs[key.strip()] = item.strip() + return pairs + + +def _selected_endpoint(probe_result: dict[str, Any], sink: dict[str, Any]) -> dict[str, Any]: + endpoints = [ + endpoint + for endpoint in probe_result.get("endpoints", []) + if isinstance(endpoint, dict) + ] + endpoint_id = sink.get("endpoint_id") + if endpoint_id: + match = next( + (endpoint for endpoint in endpoints if endpoint.get("endpoint_id") == endpoint_id), + None, + ) + if match: + return match + return next( + (endpoint for endpoint in endpoints if endpoint.get("widget_ref")), + endpoints[0] if endpoints else {}, + ) + + +def _core_hub_view_context( + payload: dict[str, Any], + context_key: str, + endpoint: dict[str, Any], + sink: dict[str, Any], +) -> str: + return str( + sink.get("view_context") + or endpoint.get("view_context") + or f"activity-core/ops-inventory/{payload.get('run_id', 'unknown')}/{context_key}" + ) + + +def _core_hub_metadata( + payload: dict[str, Any], + context_key: str, + probe_result: dict[str, Any], + endpoint: dict[str, Any], +) -> dict[str, Any]: + compact = _compact_probe_result(probe_result) + return { + "activity_id": payload.get("activity_id"), + "activity_core_run_id": payload.get("run_id"), + "scheduled_for": payload.get("scheduled_for"), + "source_type": "ops-inventory", + "context_key": context_key, + "probe": { + "generated_at": compact.get("generated_at"), + "inventory_path": compact.get("inventory_path"), + "status": compact.get("status"), + "reason": compact.get("reason"), + "summary": compact.get("summary", {}), + }, + "endpoint": _compact_endpoint(endpoint) if endpoint else {}, + } + + +def _core_hub_event_exists( + base_url: str, + runtime_token: str, + event_id: str, + timeout: float, +) -> bool: + resp = httpx.get( + f"{base_url}/api/v2/interaction-events", + headers=_core_hub_headers(runtime_token), + timeout=timeout, + ) + resp.raise_for_status() + payload = resp.json() + data = payload.get("data") if isinstance(payload, dict) else [] + if not isinstance(data, list): + return False + return any(isinstance(item, dict) and item.get("id") == event_id for item in data) + def _inter_hub_result(sink: dict[str, Any]) -> dict[str, Any]: missing: list[str] = [] if not (sink.get("inter_hub_url") or os.environ.get("INTER_HUB_URL")): diff --git a/tests/test_ops_evidence_sinks.py b/tests/test_ops_evidence_sinks.py index c76d65c..413641c 100644 --- a/tests/test_ops_evidence_sinks.py +++ b/tests/test_ops_evidence_sinks.py @@ -166,6 +166,93 @@ def test_state_hub_progress_sink_is_idempotent(monkeypatch) -> None: assert result[0]["idempotency_key"] == idempotency_key +def test_core_hub_interaction_event_sink_posts_and_verifies_compact_event(monkeypatch) -> None: + posts: list[dict[str, Any]] = [] + + def fake_post(url: str, **kwargs: Any) -> DummyResponse: + assert url == "http://core-hub.test/api/v2/interaction-events" + assert kwargs["headers"]["Authorization"] == "Bearer runtime-secret" + posts.append({"url": url, **kwargs}) + return DummyResponse( + { + "id": "event-1", + "eventType": "ops-endpoint-verified", + "widgetId": "widget-1", + } + ) + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + assert url == "http://core-hub.test/api/v2/interaction-events" + assert kwargs["headers"]["Authorization"] == "Bearer runtime-secret" + return DummyResponse({"data": [{"id": "event-1"}]}) + + monkeypatch.setenv("CORE_HUB_RUNTIME_TOKEN", "runtime-secret") + monkeypatch.setattr(httpx, "post", fake_post) + monkeypatch.setattr(httpx, "get", fake_get) + + result = persist_ops_inventory_evidence( + _payload([ + { + "type": "core-hub-interaction-event", + "core_hub_url": "http://core-hub.test", + "widget_id": "widget-1", + "event_type": "ops-endpoint-verified", + } + ]) + ) + + assert result == [ + { + "type": "core-hub-interaction-event", + "status": "posted", + "event_type": "ops-endpoint-verified", + "event_id": "event-1", + "widget_id": "widget-1", + "verified": True, + "context_key": "ops_probe", + } + ] + body = posts[0]["json"] + assert body["widgetId"] == "widget-1" + assert body["eventType"] == "ops-endpoint-verified" + assert body["metadata"]["activity_core_run_id"] == _run_id() + assert body["metadata"]["endpoint"]["url"] == "http://state-hub.test/health" + assert body["metadata"]["endpoint"]["widget_ref"] == "ops:endpoint:state-hub-health" + + serialized = json.dumps(body, sort_keys=True) + assert "runtime-secret" not in serialized + assert "secret response body" not in serialized + assert "Authorization" not in serialized + assert "user:pass" not in serialized + assert "token=secret" not in serialized + + +def test_core_hub_sink_skips_cleanly_when_config_missing(monkeypatch) -> None: + monkeypatch.delenv("CORE_HUB_BASE_URL", raising=False) + monkeypatch.delenv("CORE_HUB_RUNTIME_TOKEN", raising=False) + monkeypatch.delenv("CORE_HUB_RUNTIME_TOKEN_FILE", raising=False) + monkeypatch.delenv("CORE_HUB_WIDGET_ID", raising=False) + monkeypatch.delenv("CORE_HUB_WIDGET_MAPPING", raising=False) + + result = persist_ops_inventory_evidence( + _payload([{"type": "core-hub-interaction-event"}]) + ) + + assert result == [ + { + "type": "core-hub-interaction-event", + "status": "skipped", + "reason": "missing_core_hub_config", + "missing": [ + "CORE_HUB_BASE_URL", + "CORE_HUB_RUNTIME_TOKEN or CORE_HUB_RUNTIME_TOKEN_FILE", + "widget_id or CORE_HUB_WIDGET_ID", + ], + "context_key": "ops_probe", + } + ] + + def test_inter_hub_sink_skips_cleanly_when_config_missing(monkeypatch) -> None: monkeypatch.delenv("INTER_HUB_URL", raising=False) monkeypatch.delenv("OPS_HUB_KEY", raising=False) diff --git a/workplans/ACTIVITY-WP-0017-core-hub-ops-evidence-sink.md b/workplans/ACTIVITY-WP-0017-core-hub-ops-evidence-sink.md new file mode 100644 index 0000000..bd67025 --- /dev/null +++ b/workplans/ACTIVITY-WP-0017-core-hub-ops-evidence-sink.md @@ -0,0 +1,58 @@ +--- +id: ACTIVITY-WP-0017 +type: workplan +title: "Core Hub ops evidence sink" +domain: infotech +repo: activity-core +status: finished +owner: codex +topic_slug: custodian +created: "2026-06-27" +updated: "2026-06-27" +state_hub_workstream_id: "2a073bf4-febf-433e-a721-5daf71760912" +--- + +# Core Hub ops evidence sink + +## Goal + +Provide the activity-core side of the Core Hub replacement evidence path for +`CORE-WP-0008-T03`, without depending on the legacy Haskell Inter-Hub sink and +without placing secret material in activity definitions, logs, State Hub, or +chat. + +## Task: Add Core Hub interaction-event sink + +```task +id: ACTIVITY-WP-0017-T01 +status: done +priority: high +state_hub_task_id: "32aab1af-6be5-4b52-afa1-c11f52c65892" +``` + +Add a `core-hub-interaction-event` ops evidence sink that posts sanitized +ops-inventory probe evidence to Core Hub `/api/v2/interaction-events`, verifies +the created event is visible, and reports only non-secret ids/statuses. + +Acceptance: + +- runtime token is read through `CORE_HUB_RUNTIME_TOKEN_FILE` or a named +environment variable, never from workplan content; +- sink configuration accepts `CORE_HUB_BASE_URL` and a widget id or widget +mapping; +- emitted metadata reuses the existing compact/sanitized probe evidence path; +- missing Core Hub config skips cleanly with explicit non-secret missing keys; +- tests prove the POST/visibility check and secret non-disclosure. + +Verification 2026-06-27: `tests/test_ops_evidence_sinks.py` passed, and +a disposable local Core Hub runtime accepted an activity-core +`core-hub-interaction-event` sink emission, then listed the created +`ops-endpoint-verified` event back through `/api/v2/interaction-events`. +The verification asserted sanitized metadata did not include response body, +authorization header, URL userinfo, or token query material. + +Completed 2026-06-27: implemented the Core Hub interaction-event sink in +`activity_core.ops_evidence_sinks` with unit coverage for POST/visibility +verification, missing config behavior, and secret non-disclosure. This provides +the direct Core Hub consumer path needed by `CORE-WP-0008-T03`; deployed use +still requires an approved Core Hub runtime token and widget id/mapping.