From 41d3e75a88e95061c58722ac0606ba6b85548b44 Mon Sep 17 00:00:00 2001 From: tegwick Date: Fri, 5 Jun 2026 23:16:40 +0200 Subject: [PATCH] Implement ops inventory probe evidence slice --- event-types/ops-access-path-checked.md | 55 +++ event-types/ops-backup-verified.md | 54 +++ event-types/ops-endpoint-verified.md | 63 ++++ event-types/ops-inventory-drift.md | 56 +++ event-types/ops-service-observed.md | 53 +++ src/activity_core/activities.py | 7 + .../context_resolvers/__init__.py | 2 +- .../context_resolvers/ops_inventory.py | 322 ++++++++++++++++++ src/activity_core/ops_evidence_sinks.py | 280 +++++++++++++++ src/activity_core/worker.py | 2 + src/activity_core/workflows.py | 27 +- tests/test_ops_event_types.py | 44 +++ tests/test_ops_evidence_sinks.py | 195 +++++++++++ tests/test_ops_inventory_context_resolver.py | 283 +++++++++++++++ tests/test_schedule_lifecycle.py | 12 +- tests/test_sync_activity_definitions.py | 54 +++ ...VITY-WP-0007-ops-inventory-probe-runner.md | 26 +- 17 files changed, 1521 insertions(+), 14 deletions(-) create mode 100644 event-types/ops-access-path-checked.md create mode 100644 event-types/ops-backup-verified.md create mode 100644 event-types/ops-endpoint-verified.md create mode 100644 event-types/ops-inventory-drift.md create mode 100644 event-types/ops-service-observed.md create mode 100644 src/activity_core/context_resolvers/ops_inventory.py create mode 100644 src/activity_core/ops_evidence_sinks.py create mode 100644 tests/test_ops_event_types.py create mode 100644 tests/test_ops_evidence_sinks.py create mode 100644 tests/test_ops_inventory_context_resolver.py diff --git a/event-types/ops-access-path-checked.md b/event-types/ops-access-path-checked.md new file mode 100644 index 0000000..b6c7ac4 --- /dev/null +++ b/event-types/ops-access-path-checked.md @@ -0,0 +1,55 @@ +--- +type_id: ops-access-path-checked +version: "1.0" +publisher: activity-core +governance: publisher-declared +status: active +--- + +# ops-access-path-checked + +## Intent + +Published when an inventory access path is checked or deliberately skipped. +The first activity-core implementation records non-HTTP/k8s/ssh/tunnel paths as +`skipped` / unsupported rather than executing commands. + +## Attributes + +| Name | Type | Required | Description | +|---|---|---|---| +| activity_core_run_id | uuid | yes | UUID of the activity-core run that produced this evidence. | +| idempotency_key | string | yes | Stable key for deduplicating this access-path evidence. | +| service_id | string | yes | Stable service id from the inventory. | +| access_path_id | string | yes | Stable or derived access path id. | +| access_path_type | string | yes | Declared access path type such as `http`, `k8s`, `ssh`, or `tunnel`. | +| declared_status | string | no | Status declared in the inventory. | +| observed_status | string | yes | One of `ok`, `degraded`, `down`, or `skipped`. | +| reason | string | no | Compact non-secret reason such as `unsupported_access_path_type`. | +| observed_at | datetime | yes | UTC time when the evidence was generated. | + +## Example Payload + +```json +{ + "type": "ops-access-path-checked", + "version": "1.0", + "publisher": "activity-core", + "attributes": { + "activity_core_run_id": "12345678-aaaa-bbbb-cccc-123456789abc", + "idempotency_key": "12345678:gitea:gitea-access-1:ops-access-path-checked", + "service_id": "gitea", + "access_path_id": "gitea-access-1", + "access_path_type": "k8s", + "declared_status": "unknown", + "observed_status": "skipped", + "reason": "unsupported_access_path_type", + "observed_at": "2026-06-05T10:15:01Z" + } +} +``` + +## Safety + +Do not include secrets, authorization headers, cookies, tokens, raw response +bodies, command output, private key material, or unredacted URL query strings. diff --git a/event-types/ops-backup-verified.md b/event-types/ops-backup-verified.md new file mode 100644 index 0000000..bf5659f --- /dev/null +++ b/event-types/ops-backup-verified.md @@ -0,0 +1,54 @@ +--- +type_id: ops-backup-verified +version: "1.0" +publisher: activity-core +governance: publisher-declared +status: active +--- + +# ops-backup-verified + +## Intent + +Published when backup or restore evidence for a service backing store has been +verified from non-secret metadata. The initial probe runner may emit `skipped` +until backup evidence is available. + +## Attributes + +| Name | Type | Required | Description | +|---|---|---|---| +| activity_core_run_id | uuid | yes | UUID of the activity-core run that produced this evidence. | +| idempotency_key | string | yes | Stable key for deduplicating this backup evidence. | +| service_id | string | yes | Stable service id from the inventory. | +| backing_store_ref | string | yes | Non-secret backing store reference from the inventory. | +| backup_evidence_ref | string | no | Non-secret document, progress, or artifact reference. | +| restore_verified | boolean | no | Whether restore evidence has been verified. | +| observed_status | string | yes | One of `ok`, `degraded`, `down`, or `skipped`. | +| reason | string | no | Compact non-secret reason for non-OK status. | +| observed_at | datetime | yes | UTC time when the evidence was generated. | + +## Example Payload + +```json +{ + "type": "ops-backup-verified", + "version": "1.0", + "publisher": "activity-core", + "attributes": { + "activity_core_run_id": "12345678-aaaa-bbbb-cccc-123456789abc", + "idempotency_key": "12345678:gitea:database:gitea-db:ops-backup-verified", + "service_id": "gitea", + "backing_store_ref": "database:gitea-db", + "restore_verified": false, + "observed_status": "skipped", + "reason": "backup_probe_not_implemented", + "observed_at": "2026-06-05T10:15:01Z" + } +} +``` + +## Safety + +Do not include secrets, authorization headers, cookies, tokens, raw response +bodies, command output, private key material, or unredacted URL query strings. diff --git a/event-types/ops-endpoint-verified.md b/event-types/ops-endpoint-verified.md new file mode 100644 index 0000000..cba2a86 --- /dev/null +++ b/event-types/ops-endpoint-verified.md @@ -0,0 +1,63 @@ +--- +type_id: ops-endpoint-verified +version: "1.0" +publisher: activity-core +governance: publisher-declared +status: active +--- + +# ops-endpoint-verified + +## Intent + +Published when activity-core checks an inventory endpoint and compares the +non-secret response metadata to the declared expected status and signal. + +## Attributes + +| Name | Type | Required | Description | +|---|---|---|---| +| activity_core_run_id | uuid | yes | UUID of the activity-core run that produced this evidence. | +| idempotency_key | string | yes | Stable key for deduplicating this endpoint evidence. | +| service_id | string | yes | Stable service id from the inventory. | +| endpoint_id | string | yes | Stable endpoint id from the inventory. | +| endpoint_type | string | yes | Endpoint type, usually `http` or `https` for the first implementation. | +| endpoint_url | string | yes | Sanitized URL without credentials, query string, or fragment. | +| expected_status | integer | no | Declared expected HTTP status. | +| status_code | integer | no | Observed HTTP status code, if a response was received. | +| matched_expected_status | boolean | no | Whether the observed status matched the declaration. | +| matched_expected_signal | boolean | no | Whether the expected signal was found without storing the response body. | +| observed_status | string | yes | One of `ok`, `degraded`, `down`, or `skipped`. | +| reason | string | no | Compact non-secret reason such as `expected_status_mismatch`. | +| observed_at | datetime | yes | UTC time when the endpoint evidence was generated. | +| widget_ref | string | no | Optional ops widget reference from the inventory. | + +## Example Payload + +```json +{ + "type": "ops-endpoint-verified", + "version": "1.0", + "publisher": "activity-core", + "attributes": { + "activity_core_run_id": "12345678-aaaa-bbbb-cccc-123456789abc", + "idempotency_key": "12345678:gitea:gitea-oci-registry:ops-endpoint-verified", + "service_id": "gitea", + "endpoint_id": "gitea-oci-registry", + "endpoint_type": "https", + "endpoint_url": "https://gitea.coulomb.social/v2/", + "expected_status": 401, + "status_code": 401, + "matched_expected_status": true, + "matched_expected_signal": true, + "observed_status": "ok", + "observed_at": "2026-06-05T10:15:01Z", + "widget_ref": "ops:endpoint:gitea-registry" + } +} +``` + +## Safety + +Do not include secrets, authorization headers, cookies, tokens, raw response +bodies, command output, private key material, or unredacted URL query strings. diff --git a/event-types/ops-inventory-drift.md b/event-types/ops-inventory-drift.md new file mode 100644 index 0000000..a83b8ff --- /dev/null +++ b/event-types/ops-inventory-drift.md @@ -0,0 +1,56 @@ +--- +type_id: ops-inventory-drift +version: "1.0" +publisher: activity-core +governance: publisher-declared +status: active +--- + +# ops-inventory-drift + +## Intent + +Published when observed non-secret runtime evidence differs from the declared +ops inventory and the difference should be visible to ops-hub or operators. + +## Attributes + +| Name | Type | Required | Description | +|---|---|---|---| +| activity_core_run_id | uuid | yes | UUID of the activity-core run that produced this evidence. | +| idempotency_key | string | yes | Stable key for deduplicating this drift evidence. | +| service_id | string | yes | Stable service id from the inventory. | +| inventory_object_id | string | no | Endpoint, access path, backing store, or runtime object id. | +| drift_kind | string | yes | Compact drift category such as `missing_endpoint` or `status_mismatch`. | +| declared_summary | string | no | Bounded non-secret summary of the declared value. | +| observed_summary | string | no | Bounded non-secret summary of the observed value. | +| observed_status | string | yes | Usually `degraded` for drift evidence. | +| reason | string | no | Compact non-secret reason for the drift event. | +| observed_at | datetime | yes | UTC time when the drift evidence was generated. | + +## Example Payload + +```json +{ + "type": "ops-inventory-drift", + "version": "1.0", + "publisher": "activity-core", + "attributes": { + "activity_core_run_id": "12345678-aaaa-bbbb-cccc-123456789abc", + "idempotency_key": "12345678:gitea:gitea-oci-registry:ops-inventory-drift", + "service_id": "gitea", + "inventory_object_id": "gitea-oci-registry", + "drift_kind": "status_mismatch", + "declared_summary": "expected_status=401", + "observed_summary": "status_code=200", + "observed_status": "degraded", + "reason": "expected_status_mismatch", + "observed_at": "2026-06-05T10:15:01Z" + } +} +``` + +## Safety + +Do not include secrets, authorization headers, cookies, tokens, raw response +bodies, command output, private key material, or unredacted URL query strings. diff --git a/event-types/ops-service-observed.md b/event-types/ops-service-observed.md new file mode 100644 index 0000000..e37239d --- /dev/null +++ b/event-types/ops-service-observed.md @@ -0,0 +1,53 @@ +--- +type_id: ops-service-observed +version: "1.0" +publisher: activity-core +governance: publisher-declared +status: active +--- + +# ops-service-observed + +## Intent + +Published when activity-core observes a service from the declared ops inventory +and records compact non-secret runtime evidence. + +## Attributes + +| Name | Type | Required | Description | +|---|---|---|---| +| activity_core_run_id | uuid | yes | UUID of the activity-core run that produced this evidence. | +| idempotency_key | string | yes | Stable key for deduplicating this evidence event. | +| service_id | string | yes | Stable service id from `ops/service-inventory.yml`. | +| service_name | string | no | Human-readable service name. | +| environment | string | no | Inventory environment id. | +| lifecycle_state | string | no | Declared service lifecycle state. | +| observed_status | string | yes | One of `ok`, `degraded`, `down`, or `skipped`. | +| observed_at | datetime | yes | UTC time when the evidence was generated. | +| reason | string | no | Compact non-secret reason for non-OK status. | + +## Example Payload + +```json +{ + "type": "ops-service-observed", + "version": "1.0", + "publisher": "activity-core", + "attributes": { + "activity_core_run_id": "12345678-aaaa-bbbb-cccc-123456789abc", + "idempotency_key": "12345678:state-hub:ops-service-observed", + "service_id": "state-hub", + "service_name": "State Hub", + "environment": "local", + "lifecycle_state": "observed", + "observed_status": "ok", + "observed_at": "2026-06-05T10:15:01Z" + } +} +``` + +## Safety + +Do not include secrets, authorization headers, cookies, tokens, raw response +bodies, command output, private key material, or unredacted URL query strings. diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py index a0ae1bb..b662a45 100644 --- a/src/activity_core/activities.py +++ b/src/activity_core/activities.py @@ -26,6 +26,7 @@ from activity_core.orm import ActivityDefinition as ActivityDefinitionRow from activity_core.orm import ActivityRun, TaskInstance, TaskSpawnLog from activity_core.llm_client import get_llm_client from activity_core.models import InstructionDef +from activity_core.ops_evidence_sinks import persist_ops_inventory_evidence from activity_core.report_sinks import persist_reports from activity_core.rules.actions import expand_rule_actions from activity_core.rules.executor import execute_instruction_with_audit @@ -356,6 +357,12 @@ async def persist_instruction_reports(payload: dict) -> list[dict]: return persist_reports(payload) +@activity.defn +async def persist_ops_evidence(payload: dict) -> list[dict]: + """Persist compact deterministic ops inventory evidence.""" + return persist_ops_inventory_evidence(payload) + + @activity.defn async def emit_tasks(payload: dict) -> list[str]: """Emit TaskSpecs to IssueSink and write task_spawn_log rows. diff --git a/src/activity_core/context_resolvers/__init__.py b/src/activity_core/context_resolvers/__init__.py index f00a5d6..d76fb80 100644 --- a/src/activity_core/context_resolvers/__init__.py +++ b/src/activity_core/context_resolvers/__init__.py @@ -1 +1 @@ -from activity_core.context_resolvers import repo_scoping, state_hub # noqa: F401 +from activity_core.context_resolvers import ops_inventory, repo_scoping, state_hub # noqa: F401 diff --git a/src/activity_core/context_resolvers/ops_inventory.py b/src/activity_core/context_resolvers/ops_inventory.py new file mode 100644 index 0000000..d763c69 --- /dev/null +++ b/src/activity_core/context_resolvers/ops_inventory.py @@ -0,0 +1,322 @@ +"""Ops service inventory probe context adapter. + +Registered as source type ``ops-inventory``. + +The resolver reads the Custodian's non-secret service inventory and performs +bounded HTTP/HTTPS checks for declared endpoints. It deliberately records only +compact probe metadata: stable inventory ids, sanitized endpoint URLs, status +codes, boolean match results, and summary counts. +""" + +from __future__ import annotations + +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import Any +from urllib.parse import urlsplit, urlunsplit + +import httpx +import yaml + +from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, ContextResolver + +_DEFAULT_INVENTORY_PATH = "/home/worsch/the-custodian/ops/service-inventory.yml" +_DEFAULT_TIMEOUT_SECONDS = 10.0 +_SUPPORTED_ENDPOINT_TYPES = {"http", "https"} + + +class OpsInventoryContextResolver(ContextResolver): + """Resolve lightweight ops inventory probes from a non-secret YAML file.""" + + def resolve(self, query: str, event: Any, params: dict[str, Any]) -> dict[str, Any]: + if query != "probe_services": + return {} + return _probe_services(params) + + +CONTEXT_RESOLVER_REGISTRY["ops-inventory"] = OpsInventoryContextResolver + + +def _probe_services(params: dict[str, Any]) -> dict[str, Any]: + inventory_path = Path( + str( + params.get("inventory_path") + or os.environ.get("OPS_INVENTORY_PATH") + or _DEFAULT_INVENTORY_PATH + ) + ) + timeout_seconds = float(params.get("timeout_seconds", _DEFAULT_TIMEOUT_SECONDS)) + allow_network = _bool_param(params.get("allow_network", True)) + required = _bool_param(params.get("required", False)) + include_kinds = _include_kinds(params.get("include_kinds")) + + if not inventory_path.exists(): + if required: + raise FileNotFoundError(f"ops inventory not found: {inventory_path}") + return _empty_result( + inventory_path, + reason="inventory_not_found", + status="skipped", + skipped=1, + ) + + inventory = _load_inventory(inventory_path) + raw_services = inventory.get("services") + if not isinstance(raw_services, list): + if required: + raise ValueError("ops inventory missing services list") + return _empty_result( + inventory_path, + reason="invalid_inventory", + status="skipped", + skipped=1, + ) + + result = _empty_result(inventory_path) + for raw_service in raw_services: + if not isinstance(raw_service, dict): + continue + service = _service_summary(raw_service) + result["services"].append(service) + + for endpoint in _endpoint_entries( + raw_service, + include_kinds, + allow_network, + timeout_seconds, + ): + result["endpoints"].append(endpoint) + _increment_summary(result["summary"], endpoint["status"]) + + for access_path in _access_path_entries(raw_service): + result["access_paths"].append(access_path) + _increment_summary(result["summary"], access_path["status"]) + + return result + + +def _load_inventory(path: Path) -> dict[str, Any]: + with path.open("r", encoding="utf-8") as handle: + payload = yaml.safe_load(handle) or {} + if not isinstance(payload, dict): + raise ValueError("ops inventory root must be a mapping") + return payload + + +def _empty_result( + inventory_path: Path, + *, + reason: str | None = None, + status: str | None = None, + skipped: int = 0, +) -> dict[str, Any]: + summary: dict[str, int] = { + "ok": 0, + "degraded": 0, + "down": 0, + "skipped": skipped, + } + result: dict[str, Any] = { + "services": [], + "endpoints": [], + "access_paths": [], + "summary": summary, + "generated_at": datetime.now(timezone.utc).isoformat(), + "inventory_path": str(inventory_path), + } + if reason is not None: + result["reason"] = reason + if status is not None: + result["status"] = status + return result + + +def _service_summary(service: dict[str, Any]) -> dict[str, Any]: + endpoints = service.get("endpoints") if isinstance(service.get("endpoints"), list) else [] + access_paths = ( + service.get("access_paths") if isinstance(service.get("access_paths"), list) else [] + ) + owner_repos = service.get("owner_repos") + return { + "service_id": str(service.get("id") or ""), + "name": str(service.get("name") or service.get("id") or ""), + "kind": str(service.get("kind") or ""), + "environment": str(service.get("environment") or ""), + "lifecycle_state": str(service.get("lifecycle_state") or ""), + "declared_health_status": str(service.get("health_status") or ""), + "owner_repos": owner_repos if isinstance(owner_repos, list) else [], + "endpoint_count": len(endpoints), + "access_path_count": len(access_paths), + } + + +def _endpoint_entries( + service: dict[str, Any], + include_kinds: set[str], + allow_network: bool, + timeout_seconds: float, +) -> list[dict[str, Any]]: + service_id = str(service.get("id") or "") + service_name = str(service.get("name") or service_id) + raw_endpoints = service.get("endpoints") + if not isinstance(raw_endpoints, list): + return [] + + entries: list[dict[str, Any]] = [] + for raw_endpoint in raw_endpoints: + if not isinstance(raw_endpoint, dict): + continue + endpoint_type = str(raw_endpoint.get("type") or "").lower() + entry = _endpoint_base(service_id, service_name, raw_endpoint, endpoint_type) + + if endpoint_type not in include_kinds: + entry.update({"status": "skipped", "reason": "kind_not_included"}) + entries.append(entry) + continue + if endpoint_type not in _SUPPORTED_ENDPOINT_TYPES: + entry.update({"status": "skipped", "reason": "unsupported_endpoint_type"}) + entries.append(entry) + continue + if not raw_endpoint.get("url"): + entry.update({"status": "skipped", "reason": "missing_url"}) + entries.append(entry) + continue + if not allow_network: + entry.update({"status": "skipped", "reason": "network_disabled"}) + entries.append(entry) + continue + + entry.update(_probe_http_endpoint(raw_endpoint, timeout_seconds)) + entries.append(entry) + + return entries + + +def _endpoint_base( + service_id: str, + service_name: str, + endpoint: dict[str, Any], + endpoint_type: str, +) -> dict[str, Any]: + expected_status = endpoint.get("expected_status") + return { + "service_id": service_id, + "service_name": service_name, + "endpoint_id": str(endpoint.get("id") or ""), + "endpoint_type": endpoint_type, + "url": _sanitize_url(str(endpoint.get("url") or "")), + "expected_status": expected_status if isinstance(expected_status, int) else None, + "expected_signal_present": bool(endpoint.get("expected_signal")), + "widget_ref": str(endpoint.get("widget_ref") or ""), + "status": "skipped", + "reason": None, + "status_code": None, + "matched_expected_status": None, + "matched_expected_signal": None, + } + + +def _probe_http_endpoint( + endpoint: dict[str, Any], + timeout_seconds: float, +) -> dict[str, Any]: + url = str(endpoint.get("url") or "") + expected_status = endpoint.get("expected_status") + expected_signal = endpoint.get("expected_signal") + try: + response = httpx.get(url, timeout=timeout_seconds, follow_redirects=False) + except httpx.HTTPError as exc: + return { + "status": "down", + "reason": type(exc).__name__, + "status_code": None, + "matched_expected_status": False if isinstance(expected_status, int) else None, + "matched_expected_signal": False if expected_signal else None, + } + + status_match = ( + response.status_code == expected_status + if isinstance(expected_status, int) + else True + ) + signal_match = ( + str(expected_signal) in response.text + if isinstance(expected_signal, str) and expected_signal + else True + ) + status = "ok" if status_match and signal_match else "degraded" + reason = None + if not status_match: + reason = "expected_status_mismatch" + elif not signal_match: + reason = "expected_signal_missing" + + return { + "status": status, + "reason": reason, + "status_code": response.status_code, + "matched_expected_status": status_match, + "matched_expected_signal": signal_match, + } + + +def _access_path_entries(service: dict[str, Any]) -> list[dict[str, Any]]: + service_id = str(service.get("id") or "") + service_name = str(service.get("name") or service_id) + raw_paths = service.get("access_paths") + if not isinstance(raw_paths, list): + return [] + + entries: list[dict[str, Any]] = [] + for index, raw_path in enumerate(raw_paths, start=1): + if not isinstance(raw_path, dict): + continue + path_type = str(raw_path.get("type") or "").lower() + entries.append({ + "service_id": service_id, + "service_name": service_name, + "access_path_id": str(raw_path.get("id") or f"{service_id}-access-{index}"), + "access_path_type": path_type, + "declared_status": str(raw_path.get("status") or ""), + "status": "skipped", + "reason": "unsupported_access_path_type", + }) + return entries + + +def _include_kinds(raw: Any) -> set[str]: + if raw is None: + return set(_SUPPORTED_ENDPOINT_TYPES) + if isinstance(raw, str): + return {part.strip().lower() for part in raw.split(",") if part.strip()} + if isinstance(raw, list): + return {str(part).strip().lower() for part in raw if str(part).strip()} + return set(_SUPPORTED_ENDPOINT_TYPES) + + +def _bool_param(raw: Any) -> bool: + if isinstance(raw, bool): + return raw + if isinstance(raw, str): + return raw.strip().lower() not in {"0", "false", "no", "off"} + return bool(raw) + + +def _increment_summary(summary: dict[str, int], status: str) -> None: + if status not in summary: + status = "skipped" + summary[status] += 1 + + +def _sanitize_url(raw_url: str) -> str: + if not raw_url: + return "" + parsed = urlsplit(raw_url) + if not parsed.scheme or not parsed.netloc: + return raw_url.split("?", 1)[0].split("#", 1)[0] + + hostname = parsed.hostname or "" + if parsed.port is not None: + hostname = f"{hostname}:{parsed.port}" + return urlunsplit((parsed.scheme, hostname, parsed.path, "", "")) diff --git a/src/activity_core/ops_evidence_sinks.py b/src/activity_core/ops_evidence_sinks.py new file mode 100644 index 0000000..1e7db24 --- /dev/null +++ b/src/activity_core/ops_evidence_sinks.py @@ -0,0 +1,280 @@ +"""Deterministic sinks for ops inventory probe evidence.""" + +from __future__ import annotations + +import os +from typing import Any + +import httpx + +from activity_core.context_resolvers.ops_inventory import _sanitize_url + +_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" +_INTER_HUB_SINK_TYPES = { + "inter-hub", + "inter-hub-event", + "inter-hub-interaction-event", +} + + +def persist_ops_inventory_evidence(payload: dict[str, Any]) -> list[dict[str, Any]]: + """Persist compact non-secret ops inventory evidence for configured sources. + + The workflow passes all context sources and the resolved context snapshot. + This function filters to ``type: ops-inventory`` sources and only emits + evidence when the source params contain an explicit ``evidence_sinks`` list. + """ + results: list[dict[str, Any]] = [] + for source in payload.get("context_sources", []): + if not isinstance(source, dict) or source.get("type") != "ops-inventory": + continue + + params = source.get("params") or {} + sinks = _normalise_sinks(params.get("evidence_sinks") or params.get("evidence_sink")) + if not sinks: + continue + + bind_key = _context_bind_key(source) + probe_result = (payload.get("context") or {}).get(bind_key) + if not isinstance(probe_result, dict): + results.extend( + { + "type": sink.get("type", "unknown"), + "status": "skipped", + "reason": "missing_probe_result", + "context_key": bind_key, + } + for sink in sinks + ) + continue + + for sink in sinks: + sink_type = sink.get("type") + try: + if sink_type == "state-hub-progress": + results.append( + _post_state_hub_progress(payload, bind_key, probe_result, sink) + ) + elif sink_type in _INTER_HUB_SINK_TYPES: + results.append(_inter_hub_result(sink)) + else: + results.append({ + "type": sink_type or "unknown", + "status": "skipped", + "reason": "unknown_sink_type", + "context_key": bind_key, + }) + except Exception as exc: + results.append({ + "type": sink_type or "unknown", + "status": "error", + "error": str(exc), + "context_key": bind_key, + }) + + errors = [result for result in results if result.get("status") == "error"] + if errors: + raise RuntimeError(f"ops evidence sink failure: {errors!r}") + return results + + +def _post_state_hub_progress( + payload: dict[str, Any], + context_key: str, + probe_result: dict[str, Any], + sink: dict[str, Any], +) -> dict[str, Any]: + base_url = sink.get("state_hub_url") or os.environ.get("STATE_HUB_URL", _DEFAULT_STATE_HUB_URL) + base_url = str(base_url).rstrip("/") + event_type = sink.get("event_type", "ops_inventory_probe") + run_id = payload["run_id"] + idempotency_key = f"{run_id}:{context_key}:{event_type}" + + if _progress_exists(base_url, event_type, idempotency_key): + return { + "type": "state-hub-progress", + "status": "exists", + "event_type": event_type, + "idempotency_key": idempotency_key, + "context_key": context_key, + } + + compact = _compact_probe_result(probe_result) + body: dict[str, Any] = { + "event_type": event_type, + "author": sink.get("author", "activity-core"), + "summary": _summary_text(compact.get("summary", {})), + "detail": { + "activity_id": payload.get("activity_id"), + "activity_core_run_id": run_id, + "scheduled_for": payload.get("scheduled_for"), + "source_type": "ops-inventory", + "context_key": context_key, + "idempotency_key": idempotency_key, + "probe": compact, + }, + } + for key in ("topic_id", "workstream_id", "task_id", "decision_id"): + if sink.get(key): + body[key] = sink[key] + + resp = httpx.post( + f"{base_url}/progress/", + json=body, + timeout=float(sink.get("timeout_seconds", 10.0)), + ) + resp.raise_for_status() + data = resp.json() + return { + "type": "state-hub-progress", + "status": "posted", + "event_type": event_type, + "progress_id": data.get("id"), + "idempotency_key": idempotency_key, + "context_key": context_key, + } + + +def _progress_exists(base_url: str, event_type: str, idempotency_key: str) -> bool: + resp = httpx.get( + f"{base_url}/progress/", + params={"limit": 100}, + timeout=10.0, + ) + resp.raise_for_status() + for item in resp.json(): + detail = item.get("detail") or {} + if ( + item.get("event_type") == event_type + and detail.get("idempotency_key") == idempotency_key + ): + return True + return False + + +def _inter_hub_result(sink: dict[str, Any]) -> dict[str, Any]: + missing: list[str] = [] + if not (sink.get("inter_hub_url") or os.environ.get("INTER_HUB_URL")): + missing.append("INTER_HUB_URL") + if not os.environ.get("OPS_HUB_KEY"): + missing.append("OPS_HUB_KEY") + if not (sink.get("widget_mapping") or sink.get("capability_mapping")): + missing.append("widget_mapping") + + if missing: + return { + "type": sink.get("type"), + "status": "skipped", + "reason": "missing_inter_hub_config", + "missing": missing, + } + return { + "type": sink.get("type"), + "status": "skipped", + "reason": "inter_hub_sink_deferred", + } + + +def _compact_probe_result(probe_result: dict[str, Any]) -> dict[str, Any]: + return { + "generated_at": probe_result.get("generated_at"), + "inventory_path": probe_result.get("inventory_path"), + "status": probe_result.get("status"), + "reason": probe_result.get("reason"), + "summary": _compact_summary(probe_result.get("summary")), + "services": [ + _compact_service(service) + for service in probe_result.get("services", []) + if isinstance(service, dict) + ], + "endpoints": [ + _compact_endpoint(endpoint) + for endpoint in probe_result.get("endpoints", []) + if isinstance(endpoint, dict) + ], + "access_paths": [ + _compact_access_path(access_path) + for access_path in probe_result.get("access_paths", []) + if isinstance(access_path, dict) + ], + } + + +def _compact_summary(raw: Any) -> dict[str, int]: + if not isinstance(raw, dict): + raw = {} + return { + "ok": int(raw.get("ok", 0) or 0), + "degraded": int(raw.get("degraded", 0) or 0), + "down": int(raw.get("down", 0) or 0), + "skipped": int(raw.get("skipped", 0) or 0), + } + + +def _compact_service(service: dict[str, Any]) -> dict[str, Any]: + return { + "service_id": service.get("service_id"), + "name": service.get("name"), + "kind": service.get("kind"), + "environment": service.get("environment"), + "lifecycle_state": service.get("lifecycle_state"), + "declared_health_status": service.get("declared_health_status"), + "owner_repos": service.get("owner_repos") if isinstance(service.get("owner_repos"), list) else [], + "endpoint_count": service.get("endpoint_count"), + "access_path_count": service.get("access_path_count"), + } + + +def _compact_endpoint(endpoint: dict[str, Any]) -> dict[str, Any]: + return { + "service_id": endpoint.get("service_id"), + "service_name": endpoint.get("service_name"), + "endpoint_id": endpoint.get("endpoint_id"), + "endpoint_type": endpoint.get("endpoint_type"), + "url": _sanitize_url(str(endpoint.get("url") or "")), + "expected_status": endpoint.get("expected_status"), + "expected_signal_present": bool(endpoint.get("expected_signal_present")), + "widget_ref": endpoint.get("widget_ref"), + "status": endpoint.get("status"), + "reason": endpoint.get("reason"), + "status_code": endpoint.get("status_code"), + "matched_expected_status": endpoint.get("matched_expected_status"), + "matched_expected_signal": endpoint.get("matched_expected_signal"), + } + + +def _compact_access_path(access_path: dict[str, Any]) -> dict[str, Any]: + return { + "service_id": access_path.get("service_id"), + "service_name": access_path.get("service_name"), + "access_path_id": access_path.get("access_path_id"), + "access_path_type": access_path.get("access_path_type"), + "declared_status": access_path.get("declared_status"), + "status": access_path.get("status"), + "reason": access_path.get("reason"), + } + + +def _summary_text(summary: dict[str, Any]) -> str: + return ( + "Ops inventory probe: " + f"{summary.get('ok', 0)} ok, " + f"{summary.get('degraded', 0)} degraded, " + f"{summary.get('down', 0)} down, " + f"{summary.get('skipped', 0)} skipped" + ) + + +def _context_bind_key(source: dict[str, Any]) -> str: + raw_bind = source.get("bind_to") or source.get("name") or source.get("type", "") + return raw_bind.removeprefix("context.") if raw_bind.startswith("context.") else raw_bind + + +def _normalise_sinks(raw: Any) -> list[dict[str, Any]]: + if raw is None: + return [] + if isinstance(raw, dict): + return [raw] + if isinstance(raw, list): + return [sink for sink in raw if isinstance(sink, dict)] + return [] diff --git a/src/activity_core/worker.py b/src/activity_core/worker.py index d1cad29..fedb05c 100644 --- a/src/activity_core/worker.py +++ b/src/activity_core/worker.py @@ -40,6 +40,7 @@ from activity_core.activities import ( load_activity_definition, log_run, persist_instruction_reports, + persist_ops_evidence, persist_task_instance, resolve_context, ) @@ -102,6 +103,7 @@ async def run() -> None: evaluate_rules, evaluate_instructions, persist_instruction_reports, + persist_ops_evidence, emit_tasks, ], ) diff --git a/src/activity_core/workflows.py b/src/activity_core/workflows.py index ee60bc8..3c3aa08 100644 --- a/src/activity_core/workflows.py +++ b/src/activity_core/workflows.py @@ -26,6 +26,7 @@ with workflow.unsafe.imports_passed_through(): load_activity_definition, log_run, persist_instruction_reports, + persist_ops_evidence, persist_task_instance, resolve_context, ) @@ -105,6 +106,26 @@ class RunActivityWorkflow: retry_policy=_RETRY_POLICY, ) + if trigger_key == SCHEDULED_TRIGGER_KEY: + dedup_source = workflow.info().workflow_id + else: + dedup_source = f"{activity_id}:{trigger_key}" + run_id = str(uuid.uuid5(uuid.NAMESPACE_URL, dedup_source)) + + await workflow.execute_activity( + persist_ops_evidence, + { + "context_sources": defn.get("context_sources", []), + "context": context_snapshot, + "activity_id": activity_id, + "run_id": run_id, + "scheduled_for": scheduled_for, + "version_used": defn["version"], + }, + start_to_close_timeout=_ACTIVITY_TIMEOUT, + retry_policy=_RETRY_POLICY, + ) + # ── 3. Evaluate rules ───────────────────────────────────────────────── import json as _json event_attrs: dict = {} @@ -140,12 +161,6 @@ class RunActivityWorkflow: task_spec_dicts.extend(instruction_result.get("task_specs", [])) report_dicts.extend(instruction_result.get("reports", [])) - if trigger_key == SCHEDULED_TRIGGER_KEY: - dedup_source = workflow.info().workflow_id - else: - dedup_source = f"{activity_id}:{trigger_key}" - run_id = str(uuid.uuid5(uuid.NAMESPACE_URL, dedup_source)) - # ── 4. Persist reports and emit tasks ──────────────────────────────── if report_dicts: await workflow.execute_activity( diff --git a/tests/test_ops_event_types.py b/tests/test_ops_event_types.py new file mode 100644 index 0000000..16adf09 --- /dev/null +++ b/tests/test_ops_event_types.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from pathlib import Path + +from activity_core.event_type_registry import parse_event_type_file + +_EVENT_DIR = Path(__file__).parent.parent / "event-types" +_OPS_EVENT_TYPES = { + "ops-service-observed", + "ops-endpoint-verified", + "ops-access-path-checked", + "ops-backup-verified", + "ops-inventory-drift", +} + + +def test_ops_event_type_definitions_parse_and_expose_required_fields() -> None: + for type_id in _OPS_EVENT_TYPES: + path = _EVENT_DIR / f"{type_id}.md" + event_type = parse_event_type_file(path) + + assert event_type.type_id == type_id + assert event_type.publisher == "activity-core" + assert event_type.status == "active" + assert event_type.attribute_schema["activity_core_run_id"]["required"] is True + assert event_type.attribute_schema["idempotency_key"]["required"] is True + assert event_type.attribute_schema["service_id"]["required"] is True + assert event_type.attribute_schema["observed_status"]["required"] is True + assert "raw response" in event_type.raw_md + assert "unredacted URL query strings" in event_type.raw_md + + +def test_endpoint_event_contract_captures_probe_result_fields() -> None: + event_type = parse_event_type_file(_EVENT_DIR / "ops-endpoint-verified.md") + + for field in ( + "endpoint_id", + "endpoint_url", + "expected_status", + "status_code", + "matched_expected_status", + "matched_expected_signal", + ): + assert field in event_type.attribute_schema diff --git a/tests/test_ops_evidence_sinks.py b/tests/test_ops_evidence_sinks.py new file mode 100644 index 0000000..426b660 --- /dev/null +++ b/tests/test_ops_evidence_sinks.py @@ -0,0 +1,195 @@ +from __future__ import annotations + +import json +from typing import Any + +import httpx + +from activity_core.ops_evidence_sinks import persist_ops_inventory_evidence + + +class DummyResponse: + def __init__(self, payload: Any) -> None: + self.payload = payload + + def raise_for_status(self) -> None: + return None + + def json(self) -> Any: + return self.payload + + +def _payload(sinks: list[dict[str, Any]]) -> dict[str, Any]: + return { + "activity_id": "activity-1", + "run_id": "12345678-aaaa-bbbb-cccc-123456789abc", + "scheduled_for": "2026-06-05T10:15:00+00:00", + "version_used": 1, + "context_sources": [ + { + "type": "ops-inventory", + "query": "probe_services", + "bind_to": "context.ops_probe", + "params": {"evidence_sinks": sinks}, + } + ], + "context": { + "ops_probe": { + "generated_at": "2026-06-05T10:15:01+00:00", + "inventory_path": "/tmp/service-inventory.yml", + "summary": {"ok": 1, "degraded": 0, "down": 0, "skipped": 1}, + "services": [ + { + "service_id": "state-hub", + "name": "State Hub", + "kind": "coordination-service", + "environment": "local", + "lifecycle_state": "observed", + "declared_health_status": "unknown", + "owner_repos": ["state-hub"], + "endpoint_count": 1, + "access_path_count": 1, + } + ], + "endpoints": [ + { + "service_id": "state-hub", + "service_name": "State Hub", + "endpoint_id": "state-hub-health", + "endpoint_type": "http", + "url": "http://user:pass@state-hub.test/health?token=secret", + "expected_status": 200, + "expected_signal_present": True, + "widget_ref": "ops:endpoint:state-hub-health", + "status": "ok", + "status_code": 200, + "matched_expected_status": True, + "matched_expected_signal": True, + "response_body": "secret response body", + "headers": {"Authorization": "Bearer secret"}, + } + ], + "access_paths": [ + { + "service_id": "state-hub", + "service_name": "State Hub", + "access_path_id": "state-hub-access-1", + "access_path_type": "k8s", + "declared_status": "unknown", + "status": "skipped", + "reason": "unsupported_access_path_type", + } + ], + } + }, + } + + +def test_state_hub_progress_sink_posts_compact_probe_summary(monkeypatch) -> None: + posts: list[dict[str, Any]] = [] + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + assert url == "http://state-hub.test/progress/" + return DummyResponse([]) + + def fake_post(url: str, **kwargs: Any) -> DummyResponse: + posts.append({"url": url, **kwargs}) + return DummyResponse({"id": "progress-1"}) + + monkeypatch.setattr(httpx, "get", fake_get) + monkeypatch.setattr(httpx, "post", fake_post) + + result = persist_ops_inventory_evidence( + _payload([ + { + "type": "state-hub-progress", + "state_hub_url": "http://state-hub.test", + "event_type": "ops_inventory_probe", + "workstream_id": "workstream-1", + "task_id": "task-1", + } + ]) + ) + + assert result == [ + { + "type": "state-hub-progress", + "status": "posted", + "event_type": "ops_inventory_probe", + "progress_id": "progress-1", + "idempotency_key": "12345678-aaaa-bbbb-cccc-123456789abc:ops_probe:ops_inventory_probe", + "context_key": "ops_probe", + } + ] + body = posts[0]["json"] + assert body["summary"] == "Ops inventory probe: 1 ok, 0 degraded, 0 down, 1 skipped" + assert body["workstream_id"] == "workstream-1" + assert body["task_id"] == "task-1" + assert body["detail"]["activity_core_run_id"] == _run_id() + assert body["detail"]["idempotency_key"] == result[0]["idempotency_key"] + assert body["detail"]["probe"]["endpoints"][0]["url"] == "http://state-hub.test/health" + + serialized = json.dumps(body, sort_keys=True) + assert "secret response body" not in serialized + assert "Authorization" not in serialized + assert "user:pass" not in serialized + assert "token=secret" not in serialized + + +def test_state_hub_progress_sink_is_idempotent(monkeypatch) -> None: + idempotency_key = f"{_run_id()}:ops_probe:ops_inventory_probe" + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + return DummyResponse([ + { + "event_type": "ops_inventory_probe", + "detail": {"idempotency_key": idempotency_key}, + } + ]) + + def fake_post(url: str, **kwargs: Any) -> DummyResponse: + raise AssertionError("post should not be called") + + monkeypatch.setattr(httpx, "get", fake_get) + monkeypatch.setattr(httpx, "post", fake_post) + + result = persist_ops_inventory_evidence( + _payload([ + { + "type": "state-hub-progress", + "state_hub_url": "http://state-hub.test", + } + ]) + ) + + assert result[0]["status"] == "exists" + assert result[0]["idempotency_key"] == idempotency_key + + +def test_inter_hub_sink_skips_cleanly_when_config_missing(monkeypatch) -> None: + monkeypatch.delenv("INTER_HUB_URL", raising=False) + monkeypatch.delenv("OPS_HUB_KEY", raising=False) + + result = persist_ops_inventory_evidence( + _payload([{"type": "inter-hub-interaction-event"}]) + ) + + assert result == [ + { + "type": "inter-hub-interaction-event", + "status": "skipped", + "reason": "missing_inter_hub_config", + "missing": ["INTER_HUB_URL", "OPS_HUB_KEY", "widget_mapping"], + } + ] + + +def test_no_evidence_sinks_returns_no_results() -> None: + payload = _payload([]) + payload["context_sources"][0]["params"] = {} + + assert persist_ops_inventory_evidence(payload) == [] + + +def _run_id() -> str: + return "12345678-aaaa-bbbb-cccc-123456789abc" diff --git a/tests/test_ops_inventory_context_resolver.py b/tests/test_ops_inventory_context_resolver.py new file mode 100644 index 0000000..73d6c1b --- /dev/null +++ b/tests/test_ops_inventory_context_resolver.py @@ -0,0 +1,283 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import httpx +import pytest + +from activity_core.context_resolvers.ops_inventory import OpsInventoryContextResolver + + +class DummyResponse: + def __init__(self, status_code: int, text: str = "") -> None: + self.status_code = status_code + self.text = text + + +def _write_inventory(tmp_path: Path, services: str) -> Path: + path = tmp_path / "service-inventory.yml" + path.write_text( + f""" +version: 1 +last_reviewed: "2026-06-05" +environments: [] +hosts: [] +clusters: [] +services: +{services} +""", + encoding="utf-8", + ) + return path + + +def test_probe_services_reports_ok_endpoint_and_skipped_access_path( + tmp_path, + monkeypatch, +) -> None: + inventory = _write_inventory( + tmp_path, + """ + - id: state-hub + name: State Hub + kind: coordination-service + lifecycle_state: observed + health_status: unknown + environment: local + owner_repos: [state-hub] + endpoints: + - id: state-hub-health + type: http + url: "http://127.0.0.1:8000/state/health" + expected_status: 200 + expected_signal: "health response" + access_paths: + - type: k8s + target: local + status: unknown +""", + ) + calls: list[dict[str, Any]] = [] + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + calls.append({"url": url, **kwargs}) + return DummyResponse(200, "ok: health response") + + monkeypatch.setattr(httpx, "get", fake_get) + + result = OpsInventoryContextResolver().resolve( + "probe_services", + None, + {"inventory_path": str(inventory)}, + ) + + assert result["summary"] == {"ok": 1, "degraded": 0, "down": 0, "skipped": 1} + assert result["services"][0]["service_id"] == "state-hub" + assert result["endpoints"][0]["status"] == "ok" + assert result["endpoints"][0]["matched_expected_status"] is True + assert result["endpoints"][0]["matched_expected_signal"] is True + assert result["access_paths"][0]["status"] == "skipped" + assert result["access_paths"][0]["reason"] == "unsupported_access_path_type" + assert calls == [ + { + "url": "http://127.0.0.1:8000/state/health", + "timeout": 10.0, + "follow_redirects": False, + } + ] + + +def test_probe_services_marks_status_mismatch_degraded(tmp_path, monkeypatch) -> None: + inventory = _write_inventory( + tmp_path, + """ + - id: gitea + name: Gitea + kind: application + lifecycle_state: observed + health_status: unknown + environment: coulombcore + owner_repos: [railiance-apps] + endpoints: + - id: gitea-registry + type: https + url: "https://gitea.coulomb.social/v2/" + expected_status: 401 + expected_signal: "OCI registry auth challenge" + access_paths: [] +""", + ) + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + return DummyResponse(200, "OCI registry auth challenge") + + monkeypatch.setattr(httpx, "get", fake_get) + + result = OpsInventoryContextResolver().resolve( + "probe_services", + None, + {"inventory_path": str(inventory)}, + ) + + endpoint = result["endpoints"][0] + assert result["summary"] == {"ok": 0, "degraded": 1, "down": 0, "skipped": 0} + assert endpoint["status"] == "degraded" + assert endpoint["reason"] == "expected_status_mismatch" + assert endpoint["matched_expected_status"] is False + assert endpoint["matched_expected_signal"] is True + + +def test_probe_services_marks_signal_mismatch_degraded(tmp_path, monkeypatch) -> None: + inventory = _write_inventory( + tmp_path, + """ + - id: inter-hub + name: Inter-Hub + kind: governance-service + lifecycle_state: observed + health_status: unknown + environment: threephoenix-prod + owner_repos: [inter-hub] + endpoints: + - id: inter-hub-openapi + type: https + url: "https://hub.coulomb.social/api/v2/openapi.json" + expected_status: 200 + expected_signal: "OpenAPI document" + access_paths: [] +""", + ) + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + return DummyResponse(200, "{}") + + monkeypatch.setattr(httpx, "get", fake_get) + + result = OpsInventoryContextResolver().resolve( + "probe_services", + None, + {"inventory_path": str(inventory)}, + ) + + endpoint = result["endpoints"][0] + assert result["summary"] == {"ok": 0, "degraded": 1, "down": 0, "skipped": 0} + assert endpoint["status"] == "degraded" + assert endpoint["reason"] == "expected_signal_missing" + assert endpoint["matched_expected_status"] is True + assert endpoint["matched_expected_signal"] is False + + +def test_probe_services_marks_network_error_down_and_sanitizes_output( + tmp_path, + monkeypatch, +) -> None: + inventory = _write_inventory( + tmp_path, + """ + - id: private-api + name: Private API + kind: application + lifecycle_state: observed + health_status: unknown + environment: local + owner_repos: [secret-repo] + endpoints: + - id: private-api-health + type: https + url: "https://user:pass@example.test/health?token=super-secret" + expected_status: 200 + expected_signal: "secret response body" + access_paths: [] +""", + ) + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + raise httpx.ConnectError("offline") + + monkeypatch.setattr(httpx, "get", fake_get) + + result = OpsInventoryContextResolver().resolve( + "probe_services", + None, + {"inventory_path": str(inventory)}, + ) + serialized = json.dumps(result, sort_keys=True) + + endpoint = result["endpoints"][0] + assert result["summary"] == {"ok": 0, "degraded": 0, "down": 1, "skipped": 0} + assert endpoint["status"] == "down" + assert endpoint["url"] == "https://example.test/health" + assert "super-secret" not in serialized + assert "user:pass" not in serialized + assert "secret response body" not in serialized + + +def test_probe_services_skips_unsupported_and_network_disabled( + tmp_path, + monkeypatch, +) -> None: + inventory = _write_inventory( + tmp_path, + """ + - id: bridge + name: Ops Bridge + kind: bridge + lifecycle_state: observed + health_status: unknown + environment: local + owner_repos: [ops-bridge] + endpoints: + - id: bridge-ssh + type: ssh + url: "ssh://bridge.example" + - id: bridge-http + type: http + url: "http://bridge.example/health" + access_paths: [] +""", + ) + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + raise AssertionError("network should be disabled") + + monkeypatch.setattr(httpx, "get", fake_get) + + result = OpsInventoryContextResolver().resolve( + "probe_services", + None, + {"inventory_path": str(inventory), "allow_network": False}, + ) + + assert result["summary"] == {"ok": 0, "degraded": 0, "down": 0, "skipped": 2} + assert [entry["reason"] for entry in result["endpoints"]] == [ + "kind_not_included", + "network_disabled", + ] + + +def test_probe_services_missing_inventory_optional_and_required(tmp_path) -> None: + missing = tmp_path / "missing.yml" + resolver = OpsInventoryContextResolver() + + optional = resolver.resolve( + "probe_services", + None, + {"inventory_path": str(missing), "required": False}, + ) + + assert optional["status"] == "skipped" + assert optional["reason"] == "inventory_not_found" + assert optional["summary"] == {"ok": 0, "degraded": 0, "down": 0, "skipped": 1} + + with pytest.raises(FileNotFoundError): + resolver.resolve( + "probe_services", + None, + {"inventory_path": str(missing), "required": True}, + ) + + +def test_unknown_query_returns_empty() -> None: + assert OpsInventoryContextResolver().resolve("unknown", None, {}) == {} diff --git a/tests/test_schedule_lifecycle.py b/tests/test_schedule_lifecycle.py index 9069635..3af0d37 100644 --- a/tests/test_schedule_lifecycle.py +++ b/tests/test_schedule_lifecycle.py @@ -125,9 +125,15 @@ async def test_delete_schedule_removes_schedule(env: WorkflowEnvironment) -> Non await upsert_schedule(env.client, defn) await delete_schedule(env.client, defn.id) - schedules = await list_schedules(env.client) - ids = [s["schedule_id"] for s in schedules] - assert schedule_id(defn.id) not in ids, "Schedule should be gone after delete" + sid = schedule_id(defn.id) + ids: list[str] = [] + for _ in range(10): + schedules = await list_schedules(env.client) + ids = [s["schedule_id"] for s in schedules] + if sid not in ids: + break + await asyncio.sleep(0.3) + assert sid not in ids, "Schedule should be gone after delete" # ── T25e: delete_schedule is idempotent (no-op for non-existent schedule) ──── diff --git a/tests/test_sync_activity_definitions.py b/tests/test_sync_activity_definitions.py index e6ee0bd..f34a7e3 100644 --- a/tests/test_sync_activity_definitions.py +++ b/tests/test_sync_activity_definitions.py @@ -1,5 +1,6 @@ import uuid +from activity_core.definition_parser import scan_and_parse from activity_core.models import ActivityDefinition from activity_core.sync_activity_definitions import _definition_uuid @@ -41,3 +42,56 @@ def test_activity_definition_accepts_adr_style_context_source_without_name() -> ) assert defn.context_sources[0].name == "" + + +def test_scan_and_parse_reads_external_activity_definition_dirs( + tmp_path, + monkeypatch, +) -> None: + repo_root = tmp_path / "activity-core" + external_root = tmp_path / "the-custodian" + definitions_dir = external_root / "activity-definitions" + repo_root.mkdir() + definitions_dir.mkdir(parents=True) + (definitions_dir / "ops-service-inventory-probes.md").write_text( + """--- +id: "40d15a87-7ff6-4d8e-992c-37df15f95110" +name: "Ops Service Inventory Probes" +enabled: false +owner: custodian +governance: custodian +status: proposed +trigger: + type: cron + cron_expression: "15 * * * *" + timezone: Europe/Berlin + misfire_policy: skip +context_sources: + - type: ops-inventory + query: probe_services + bind_to: context.ops_probe + params: + inventory_path: /tmp/service-inventory.yml + evidence_sinks: + - type: state-hub-progress + event_type: ops_inventory_probe +--- + +# Ops Service Inventory Probes +""", + encoding="utf-8", + ) + + monkeypatch.chdir(repo_root) + monkeypatch.setenv("ACTIVITY_DEFINITION_DIRS", str(external_root)) + + definitions = scan_and_parse() + + assert len(definitions) == 1 + definition = definitions[0] + assert definition.name == "Ops Service Inventory Probes" + assert definition.enabled is False + assert definition.context_sources[0]["type"] == "ops-inventory" + assert definition.context_sources[0]["params"]["evidence_sinks"][0]["type"] == ( + "state-hub-progress" + ) diff --git a/workplans/ACTIVITY-WP-0007-ops-inventory-probe-runner.md b/workplans/ACTIVITY-WP-0007-ops-inventory-probe-runner.md index d1c16cc..ed1c0e3 100644 --- a/workplans/ACTIVITY-WP-0007-ops-inventory-probe-runner.md +++ b/workplans/ACTIVITY-WP-0007-ops-inventory-probe-runner.md @@ -46,7 +46,7 @@ Known gaps this workplan closes: ```task id: ACTIVITY-WP-0007-T01 -status: todo +status: done priority: high state_hub_task_id: "dbe49dfb-f073-4245-8e86-d0355a6bb8bb" ``` @@ -85,11 +85,16 @@ Done when fixture-based resolver tests cover `ok`, expected-status mismatch, expected-signal mismatch, network/down, unsupported, and optional/required inventory failure behavior. +2026-06-05: Completed the first resolver slice. Added +`src/activity_core/context_resolvers/ops_inventory.py`, registered source type +`ops-inventory`, and covered ok/degraded/down/skipped results plus required vs +optional inventory failure and no-secret output behavior. + ## Add Ops Evidence Sink ```task id: ACTIVITY-WP-0007-T02 -status: todo +status: done priority: high state_hub_task_id: "c6b5f49d-6f05-4be9-a968-de42195170cb" ``` @@ -113,11 +118,16 @@ Prepare the contract for later Inter-Hub submission without making it mandatory: Done when sink idempotency, State Hub fallback posting, missing Inter-Hub config, and no-secret-leak behavior are covered by tests. +2026-06-05: Completed the State Hub fallback sink slice. Added +`src/activity_core/ops_evidence_sinks.py`, a `persist_ops_evidence` Temporal +activity, workflow/worker wiring, idempotent `ops_inventory_probe` progress +posting, missing-Inter-Hub-config skip behavior, and no-secret compaction tests. + ## Register Ops Evidence Event Definitions ```task id: ACTIVITY-WP-0007-T03 -status: todo +status: done priority: medium state_hub_task_id: "70eb470e-9b0a-448f-ae3a-f5b1bed49e04" ``` @@ -142,11 +152,14 @@ Each definition must document: Done when event registry tests or parser coverage prove the definitions are valid and reviewable. +2026-06-05: Completed. Added the five ops evidence event definitions under +`event-types/` and parser tests covering required fields and safety language. + ## Wire Custodian Definition Safely ```task id: ACTIVITY-WP-0007-T04 -status: todo +status: progress priority: medium state_hub_task_id: "45132f9f-da3c-44f1-a488-195aa0e46428" ``` @@ -166,6 +179,11 @@ Requirements: Done when activity-core can scan the Custodian definition path without enabling it prematurely. +2026-06-05: Started. Added test coverage that +`ACTIVITY_DEFINITION_DIRS=/home/worsch/the-custodian` style external roots can +scan a disabled `ops-service-inventory-probes.md` definition carrying an +`ops-inventory` context source and explicit `state-hub-progress` evidence sink. + ## Wire Railiance Runtime Inputs ```task