diff --git a/k8s/railiance/20-runtime.yaml b/k8s/railiance/20-runtime.yaml index 6520f0b..ff2e172 100644 --- a/k8s/railiance/20-runtime.yaml +++ b/k8s/railiance/20-runtime.yaml @@ -10,14 +10,165 @@ data: TEMPORAL_HOST: actcore-temporal:7233 TEMPORAL_NAMESPACE: default NATS_URL: nats://actcore-nats:4222 - STATE_HUB_URL: http://inter-hub.inter-hub.svc.cluster.local:8000 + STATE_HUB_URL: http://actcore-state-hub-bridge:8000 REPO_SCOPING_URL: http://repo-scoping.repo-scoping.svc.cluster.local:8020 ISSUE_CORE_URL: http://issue-core.issue-core.svc.cluster.local:8010 ISSUE_SINK_TYPE: "null" - ACTIVITY_DEFINITION_DIRS: "" + ACTIVITY_DEFINITION_DIRS: /etc/activity-core/external-definitions PROMETHEUS_BIND_ADDR: 0.0.0.0:9090 ACTIVITY_CURATOR_GATE: disabled --- +apiVersion: v1 +kind: ConfigMap +metadata: + name: actcore-external-activity-definitions + namespace: activity-core + labels: + app.kubernetes.io/name: activity-core + app.kubernetes.io/part-of: activity-core +data: + hourly-recently-on-scope.md: | + --- + id: "d104348c-d792-4377-943c-70a31e81a9bc" + name: "Hourly RecentlyOnScope Reports" + type: activity-definition + version: "1.0" + enabled: true + owner: custodian + governance: custodian + status: active + created: "2026-05-22" + trigger: + type: cron + cron_expression: "0 * * * *" + timezone: Europe/Berlin + misfire_policy: skip + context_sources: + - type: state-hub + query: recently_on_scope_hourly + required: true + params: + range: "1h" + active_only: true + include_attention: false + bind_to: context.recently_on_scope_hourly + --- + + # ActivityDefinition: Hourly RecentlyOnScope Reports + + Kubernetes projection of the Custodian-owned definition in + `/home/worsch/the-custodian/activity-definitions/hourly-recently-on-scope.md`. +--- +apiVersion: v1 +kind: Service +metadata: + name: actcore-state-hub-bridge + namespace: activity-core + labels: + app.kubernetes.io/name: actcore-state-hub-bridge + app.kubernetes.io/part-of: activity-core +spec: + selector: + app.kubernetes.io/name: actcore-state-hub-bridge + ports: + - name: http + port: 8000 + targetPort: http +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: actcore-state-hub-bridge + namespace: activity-core + labels: + app.kubernetes.io/name: actcore-state-hub-bridge + app.kubernetes.io/part-of: activity-core +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: actcore-state-hub-bridge + template: + metadata: + labels: + app.kubernetes.io/name: actcore-state-hub-bridge + app.kubernetes.io/part-of: activity-core + spec: + hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet + containers: + - name: proxy + image: activity-core:railiance01-prod + imagePullPolicy: Never + ports: + - name: http + containerPort: 18080 + command: + - python + - -c + - | + from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + from urllib.error import HTTPError, URLError + from urllib.request import Request, urlopen + + TARGET = "http://127.0.0.1:18000" + HOP_HEADERS = {"connection", "host", "keep-alive", "proxy-authenticate", + "proxy-authorization", "te", "trailers", + "transfer-encoding", "upgrade"} + + class Proxy(BaseHTTPRequestHandler): + def do_GET(self): + self._proxy() + + def do_POST(self): + self._proxy() + + def do_PATCH(self): + self._proxy() + + def _proxy(self): + length = int(self.headers.get("content-length", "0") or "0") + body = self.rfile.read(length) if length else None + headers = { + key: value + for key, value in self.headers.items() + if key.lower() not in HOP_HEADERS + } + request = Request( + TARGET + self.path, + data=body, + headers=headers, + method=self.command, + ) + try: + with urlopen(request, timeout=30) as response: + payload = response.read() + self.send_response(response.status) + for key, value in response.headers.items(): + if key.lower() not in HOP_HEADERS: + self.send_header(key, value) + self.end_headers() + self.wfile.write(payload) + except HTTPError as exc: + payload = exc.read() + self.send_response(exc.code) + self.end_headers() + self.wfile.write(payload) + except URLError as exc: + self.send_response(502) + self.end_headers() + self.wfile.write(str(exc).encode()) + + ThreadingHTTPServer(("0.0.0.0", 18080), Proxy).serve_forever() + readinessProbe: + httpGet: + path: /state/summary + port: http + initialDelaySeconds: 5 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 6 +--- apiVersion: batch/v1 kind: Job metadata: @@ -76,6 +227,14 @@ spec: name: actcore-runtime-config - secretRef: name: actcore-runtime-secret + volumeMounts: + - name: external-activity-definitions + mountPath: /etc/activity-core/external-definitions/activity-definitions + readOnly: true + volumes: + - name: external-activity-definitions + configMap: + name: actcore-external-activity-definitions --- apiVersion: v1 kind: Service @@ -125,6 +284,10 @@ spec: name: actcore-runtime-config - secretRef: name: actcore-runtime-secret + volumeMounts: + - name: external-activity-definitions + mountPath: /etc/activity-core/external-definitions/activity-definitions + readOnly: true readinessProbe: httpGet: path: /health @@ -140,6 +303,10 @@ spec: initialDelaySeconds: 45 periodSeconds: 20 timeoutSeconds: 5 + volumes: + - name: external-activity-definitions + configMap: + name: actcore-external-activity-definitions --- apiVersion: v1 kind: Service @@ -189,6 +356,14 @@ spec: name: actcore-runtime-config - secretRef: name: actcore-runtime-secret + volumeMounts: + - name: external-activity-definitions + mountPath: /etc/activity-core/external-definitions/activity-definitions + readOnly: true + volumes: + - name: external-activity-definitions + configMap: + name: actcore-external-activity-definitions --- apiVersion: apps/v1 kind: Deployment diff --git a/src/activity_core/context_resolvers/state_hub.py b/src/activity_core/context_resolvers/state_hub.py index 31918e3..6348ae8 100644 --- a/src/activity_core/context_resolvers/state_hub.py +++ b/src/activity_core/context_resolvers/state_hub.py @@ -53,6 +53,19 @@ def _post_json(path: str, payload: dict[str, Any]) -> Any: return resp.json() +def _validate_recently_on_scope_hourly(result: Any) -> dict[str, Any]: + if not isinstance(result, dict): + raise RuntimeError("recently_on_scope_hourly returned a non-object response") + required_keys = {"generated", "skipped", "failed"} + missing = required_keys - set(result) + if missing: + missing_list = ", ".join(sorted(missing)) + raise RuntimeError( + f"recently_on_scope_hourly response missing required key(s): {missing_list}" + ) + return result + + class StateHubContextResolver(ContextResolver): """Fetches live data from the Custodian State Hub.""" @@ -84,7 +97,8 @@ class StateHubContextResolver(ContextResolver): for key, value in params.items() if key not in {"required"} } - return _post_json("/recently-on-scope/hourly", payload) + result = _post_json("/recently-on-scope/hourly", payload) + return _validate_recently_on_scope_hourly(result) return {} diff --git a/tests/test_state_hub_context_resolver.py b/tests/test_state_hub_context_resolver.py index 2350717..6056ad5 100644 --- a/tests/test_state_hub_context_resolver.py +++ b/tests/test_state_hub_context_resolver.py @@ -123,7 +123,13 @@ def test_recently_on_scope_hourly_posts_batch(monkeypatch) -> None: def fake_post(url: str, **kwargs: Any) -> DummyResponse: calls.append({"url": url, **kwargs}) - return DummyResponse({"generated": [{"domain_slug": "custodian"}]}) + return DummyResponse( + { + "generated": [{"domain_slug": "custodian"}], + "skipped": [], + "failed": [], + } + ) monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test/") monkeypatch.setattr(httpx, "post", fake_post) @@ -139,7 +145,11 @@ def test_recently_on_scope_hourly_posts_batch(monkeypatch) -> None: }, ) - assert result == {"generated": [{"domain_slug": "custodian"}]} + assert result == { + "generated": [{"domain_slug": "custodian"}], + "skipped": [], + "failed": [], + } assert calls == [ { "url": "http://state-hub.test/recently-on-scope/hourly", @@ -159,6 +169,16 @@ def test_recently_on_scope_hourly_failure_bubbles(monkeypatch) -> None: StateHubContextResolver().resolve("recently_on_scope_hourly", None, {"range": "1h"}) +def test_recently_on_scope_hourly_rejects_empty_response(monkeypatch) -> None: + def fake_post(url: str, **kwargs: Any) -> DummyResponse: + return DummyResponse({}) + + monkeypatch.setattr(httpx, "post", fake_post) + + with pytest.raises(RuntimeError, match="missing required key"): + StateHubContextResolver().resolve("recently_on_scope_hourly", None, {"range": "1h"}) + + def test_daily_triage_digest_is_curated_scalar_json(monkeypatch) -> None: payloads = { "/state/summary": {