Wire ops inventory probes for Railiance

This commit is contained in:
2026-06-05 23:40:25 +02:00
parent 5838077327
commit 4b1b3e1b5f
7 changed files with 467 additions and 4 deletions

View File

@@ -184,6 +184,25 @@ def test_inter_hub_sink_skips_cleanly_when_config_missing(monkeypatch) -> None:
]
def test_inter_hub_sink_accepts_widget_mapping_from_env(monkeypatch) -> None:
monkeypatch.delenv("INTER_HUB_URL", raising=False)
monkeypatch.delenv("OPS_HUB_KEY", raising=False)
monkeypatch.setenv("OPS_HUB_WIDGET_MAPPING", "ops:endpoint:gitea-registry")
result = persist_ops_inventory_evidence(
_payload([{"type": "inter-hub-interaction-event"}])
)
assert result == [
{
"type": "inter-hub-interaction-event",
"status": "skipped",
"reason": "missing_inter_hub_config",
"missing": ["INTER_HUB_URL", "OPS_HUB_KEY"],
}
]
def test_no_evidence_sinks_returns_no_results() -> None:
payload = _payload([])
payload["context_sources"][0]["params"] = {}

View File

@@ -0,0 +1,192 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
import yaml
import httpx
from activity_core.definition_parser import parse_file
from activity_core.context_resolvers.ops_inventory import OpsInventoryContextResolver
from activity_core.ops_evidence_sinks import persist_ops_inventory_evidence
_REPO_ROOT = Path(__file__).parent.parent
_RUNTIME_PATH = _REPO_ROOT / "k8s" / "railiance" / "20-runtime.yaml"
_BOOTSTRAP_SECRETS_PATH = _REPO_ROOT / "k8s" / "railiance" / "bootstrap-secrets.sh"
def _resources() -> list[dict[str, Any]]:
return [
resource
for resource in yaml.safe_load_all(_RUNTIME_PATH.read_text(encoding="utf-8"))
if isinstance(resource, dict)
]
def _by_kind_name(kind: str, name: str) -> dict[str, Any]:
for resource in _resources():
if resource.get("kind") == kind and resource.get("metadata", {}).get("name") == name:
return resource
raise AssertionError(f"missing {kind}/{name}")
def test_runtime_config_has_ops_inventory_placeholders() -> None:
config = _by_kind_name("ConfigMap", "actcore-runtime-config")
assert config["data"]["OPS_INVENTORY_PATH"] == (
"/etc/activity-core/ops/service-inventory.yml"
)
assert config["data"]["INTER_HUB_URL"] == ""
assert config["data"]["OPS_HUB_WIDGET_MAPPING"] == ""
def test_external_configmap_projects_disabled_ops_probe_definition(tmp_path) -> None:
config = _by_kind_name("ConfigMap", "actcore-external-activity-definitions")
raw_definition = config["data"]["ops-service-inventory-probes.md"]
definition_path = tmp_path / "ops-service-inventory-probes.md"
definition_path.write_text(raw_definition, encoding="utf-8")
definition = parse_file(definition_path)
assert definition.name == "Ops Service Inventory Probes"
assert definition.enabled is False
assert definition.trigger_config["cron_expression"] == "15 * * * *"
assert definition.context_sources == [
{
"type": "ops-inventory",
"query": "probe_services",
"required": False,
"params": {
"inventory_path": "/etc/activity-core/ops/service-inventory.yml",
"timeout_seconds": 10,
"include_kinds": ["http", "https"],
"allow_network": True,
"evidence_sinks": [
{
"type": "state-hub-progress",
"event_type": "ops_inventory_probe",
"author": "activity-core",
}
],
},
"bind_to": "context.ops_inventory_probe",
}
]
def test_ops_inventory_configmap_contains_probeable_inventory() -> None:
config = _by_kind_name("ConfigMap", "actcore-ops-service-inventory")
inventory = yaml.safe_load(config["data"]["service-inventory.yml"])
services = {service["id"]: service for service in inventory["services"]}
assert inventory["policy"]["non_secret_inventory"] is True
assert services["gitea"]["endpoints"][0]["id"] == "gitea-oci-registry"
assert services["state-hub"]["endpoints"][0]["url"] == (
"http://actcore-state-hub-bridge:8000/state/health"
)
assert services["inter-hub"]["endpoints"][0]["id"] == "inter-hub-openapi"
assert services["activity-core"]["endpoints"][0]["id"] == "activity-core-api"
def test_worker_mounts_ops_inventory_configmap() -> None:
deployment = _by_kind_name("Deployment", "actcore-worker")
pod_spec = deployment["spec"]["template"]["spec"]
container = pod_spec["containers"][0]
mounts = {mount["name"]: mount for mount in container["volumeMounts"]}
volumes = {volume["name"]: volume for volume in pod_spec["volumes"]}
assert mounts["ops-service-inventory"]["mountPath"] == "/etc/activity-core/ops"
assert mounts["ops-service-inventory"]["readOnly"] is True
assert volumes["ops-service-inventory"]["configMap"]["name"] == (
"actcore-ops-service-inventory"
)
def test_ops_hub_key_is_secret_only_placeholder() -> None:
runtime_config = _by_kind_name("ConfigMap", "actcore-runtime-config")
bootstrap = _BOOTSTRAP_SECRETS_PATH.read_text(encoding="utf-8")
assert "OPS_HUB_KEY" not in runtime_config["data"]
assert '--from-literal=OPS_HUB_KEY=""' in bootstrap
def test_disabled_ops_probe_definition_can_emit_fixture_evidence(
tmp_path,
monkeypatch,
) -> None:
definition_config = _by_kind_name("ConfigMap", "actcore-external-activity-definitions")
inventory_config = _by_kind_name("ConfigMap", "actcore-ops-service-inventory")
definition_path = tmp_path / "ops-service-inventory-probes.md"
inventory_path = tmp_path / "service-inventory.yml"
definition_path.write_text(
definition_config["data"]["ops-service-inventory-probes.md"],
encoding="utf-8",
)
inventory_path.write_text(
inventory_config["data"]["service-inventory.yml"],
encoding="utf-8",
)
definition = parse_file(definition_path)
source = definition.context_sources[0]
source["params"]["inventory_path"] = str(inventory_path)
def fake_endpoint_get(url: str, **kwargs: Any) -> Any:
if url.endswith("/v2/"):
return _HttpResponse(401, "OCI registry auth challenge")
if url.endswith("/state/health"):
return _HttpResponse(200, "health response")
if url.endswith("/openapi.json"):
return _HttpResponse(200, "OpenAPI document")
if url.endswith("/Hubs"):
return _HttpResponse(302, "login redirect when unauthenticated")
raise AssertionError(f"unexpected endpoint probe {url}")
monkeypatch.setattr(httpx, "get", fake_endpoint_get)
probe = OpsInventoryContextResolver().resolve("probe_services", None, source["params"])
posts: list[dict[str, Any]] = []
def fake_progress_get(url: str, **kwargs: Any) -> _JsonResponse:
return _JsonResponse([])
def fake_progress_post(url: str, **kwargs: Any) -> _JsonResponse:
posts.append({"url": url, **kwargs})
return _JsonResponse({"id": "progress-1"})
monkeypatch.setattr(httpx, "get", fake_progress_get)
monkeypatch.setattr(httpx, "post", fake_progress_post)
result = persist_ops_inventory_evidence(
{
"activity_id": definition.id,
"run_id": "12345678-aaaa-bbbb-cccc-123456789abc",
"scheduled_for": "2026-06-05T10:15:00+00:00",
"version_used": 1,
"context_sources": [source],
"context": {"ops_inventory_probe": probe},
}
)
assert definition.enabled is False
assert result[0]["status"] == "posted"
assert posts[0]["json"]["event_type"] == "ops_inventory_probe"
assert posts[0]["json"]["detail"]["probe"]["summary"]["ok"] == 4
class _HttpResponse:
def __init__(self, status_code: int, text: str) -> None:
self.status_code = status_code
self.text = text
class _JsonResponse:
def __init__(self, payload: Any) -> None:
self.payload = payload
def raise_for_status(self) -> None:
return None
def json(self) -> Any:
return self.payload