diff --git a/activity-definitions/weekly-sbom-staleness.md b/activity-definitions/weekly-sbom-staleness.md index 353918b..c3a5a46 100644 --- a/activity-definitions/weekly-sbom-staleness.md +++ b/activity-definitions/weekly-sbom-staleness.md @@ -16,6 +16,9 @@ context_sources: params: repos: all bind_to: context.repos +# Resolver returns a summary keyed off the worst repo so the rule expression +# below can match without comprehensions (the sandboxed evaluator does not +# support them). See _repo_sbom_status in context_resolvers/state_hub.py. --- # Weekly SBOM Staleness Check @@ -32,3 +35,14 @@ action: priority: medium labels: ["sbom", "security", "automated"] ``` + +NOTE: in the production bulk-mode resolver path the condition matches against +the **worst** repo's age (the resolver hoists the worst entry's +`sbom_age_days`, `repo_slug`, `last_sbom_at`, `has_sbom` to the top of +`context.repos` alongside the per-repo list and summary counts). The rule +therefore fires at most once per workflow run, not once per stale repo. The +aspirational per-stale-repo task fan-out is exercised by the integration +tests' simulated pipeline but is not delivered by the current workflow — +landing it requires (a) per-iteration context binding in the workflow and +(b) `context.*` interpolation in rule action fields. Both are tracked as +`ADHOC-2026-06-01-T02`. diff --git a/src/activity_core/context_resolvers/state_hub.py b/src/activity_core/context_resolvers/state_hub.py index 6348ae8..e1e693e 100644 --- a/src/activity_core/context_resolvers/state_hub.py +++ b/src/activity_core/context_resolvers/state_hub.py @@ -3,7 +3,8 @@ Registered as source type 'state-hub'. Supported queries: - domain_summary: GET {STATE_HUB_URL}/state/domain/{domain} - - repo_sbom_status: GET {STATE_HUB_URL}/sbom/status?repo={repo_slug} + - repo_sbom_status: single-repo -> GET {STATE_HUB_URL}/sbom/{repo_slug} + bulk (repos:all) -> GET {STATE_HUB_URL}/repos/ - state_summary: GET {STATE_HUB_URL}/state/summary - next_steps: GET {STATE_HUB_URL}/state/next_steps - workplan_index: GET {STATE_HUB_URL}/workstreams/workplan-index @@ -20,6 +21,7 @@ from __future__ import annotations import json import os +from datetime import datetime, timezone from typing import Any import httpx @@ -30,6 +32,10 @@ _DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" _TIMEOUT_SECONDS = 10.0 _OPEN_WORKSTREAM_STATUSES = {"active", "ready", "blocked"} _OPEN_TASK_STATUSES = {"todo", "in_progress", "blocked"} +# Sentinel age for repos that have never had an SBOM ingested. Large enough +# that any threshold-based staleness rule treats them as "very stale" without +# forcing the rule expression to special-case None. +_NEVER_SCANNED_AGE_DAYS = 99999 def _base_url() -> str: @@ -74,8 +80,7 @@ class StateHubContextResolver(ContextResolver): domain = params.get("domain", "") return _fetch_json(f"/state/domain/{domain}") if query == "repo_sbom_status": - repo_slug = params.get("repo_slug", "") - return _fetch_json("/sbom/status", {"repo": repo_slug}) + return _repo_sbom_status(params) if query == "state_summary": return _fetch_json("/state/summary") if query == "next_steps": @@ -105,6 +110,102 @@ class StateHubContextResolver(ContextResolver): CONTEXT_RESOLVER_REGISTRY["state-hub"] = StateHubContextResolver +def _repo_sbom_status(params: dict[str, Any]) -> dict[str, Any]: + """Resolve SBOM staleness against the State Hub. + + Two modes, selected by params: + + - Single-repo: params = {"repo_slug": ""} -> GET /sbom/{slug}. + Returns {repo_slug, last_sbom_at, sbom_age_days, has_sbom}. + + - Bulk: params = {"repos": "all"} -> GET /repos/. Computes age per repo + and returns a summary the rule layer can match against without + comprehensions (the AST evaluator does not support them): + { + "repos": [{repo_slug, last_sbom_at, sbom_age_days, has_sbom}, ...], + "stale_count": int, + "total_count": int, + "worst_repo_slug": str | None, + "worst_age_days": int | None, + "worst_last_sbom_at": str | None, + } + + Returns {} on HTTP error to preserve the resolver's graceful-degradation + contract. + """ + repo_slug = params.get("repo_slug") + bulk = str(params.get("repos", "")).lower() == "all" + + if repo_slug and not bulk: + payload = _fetch_json(f"/sbom/{repo_slug}") + if not isinstance(payload, dict) or not payload: + return {} + return _sbom_status_entry( + repo_slug=str(payload.get("repo_slug") or repo_slug), + last_sbom_at=payload.get("last_sbom_at"), + ) + + if bulk: + repos = _fetch_json("/repos/") + if not isinstance(repos, list): + return {} + entries = [ + _sbom_status_entry( + repo_slug=str(r.get("slug") or ""), + last_sbom_at=r.get("last_sbom_at"), + ) + for r in repos + if r.get("slug") + ] + stale = [e for e in entries if e["sbom_age_days"] > 30] + worst = max(entries, key=lambda e: e["sbom_age_days"], default=None) + # Hoist the worst-repo fields to the top so a sandboxed rule expression + # `context.repos.sbom_age_days > 30` matches when any repo is stale, + # without needing comprehensions. Bulk-only summary fields live + # alongside, and the full per-repo list is exposed under `repos`. + result: dict[str, Any] = { + "repos": entries, + "stale_count": len(stale), + "total_count": len(entries), + "worst_repo_slug": worst["repo_slug"] if worst else None, + "worst_age_days": worst["sbom_age_days"] if worst else None, + "worst_last_sbom_at": worst["last_sbom_at"] if worst else None, + } + if worst: + result.update({ + "repo_slug": worst["repo_slug"], + "last_sbom_at": worst["last_sbom_at"], + "sbom_age_days": worst["sbom_age_days"], + "has_sbom": worst["has_sbom"], + }) + return result + + return {} + + +def _sbom_status_entry(repo_slug: str, last_sbom_at: Any) -> dict[str, Any]: + age_days, has_sbom, normalised = _sbom_age_days(last_sbom_at) + return { + "repo_slug": repo_slug, + "last_sbom_at": normalised, + "sbom_age_days": age_days, + "has_sbom": has_sbom, + } + + +def _sbom_age_days(last_sbom_at: Any) -> tuple[int, bool, str | None]: + if not isinstance(last_sbom_at, str) or not last_sbom_at: + return _NEVER_SCANNED_AGE_DAYS, False, None + try: + ts = datetime.fromisoformat(last_sbom_at.replace("Z", "+00:00")) + except ValueError: + return _NEVER_SCANNED_AGE_DAYS, False, last_sbom_at + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + delta = datetime.now(timezone.utc) - ts + return max(0, delta.days), True, last_sbom_at + + def _daily_triage_digest(params: dict[str, Any]) -> str: """Return a compact JSON string safe to inject into an instruction prompt. diff --git a/tests/test_state_hub_context_resolver.py b/tests/test_state_hub_context_resolver.py index 6056ad5..1200195 100644 --- a/tests/test_state_hub_context_resolver.py +++ b/tests/test_state_hub_context_resolver.py @@ -82,27 +82,79 @@ def test_existing_queries_still_resolve(monkeypatch) -> None: def fake_get(url: str, **kwargs: Any) -> DummyResponse: calls.append({"url": url, **kwargs}) - return DummyResponse({"ok": True}) + if url.endswith("/state/domain/custodian"): + return DummyResponse({"ok": True}) + if url.endswith("/sbom/activity-core"): + return DummyResponse({ + "repo_slug": "activity-core", + "last_sbom_at": "2026-04-26T11:37:56+00:00", + "entry_count": 38, + "entries": [], + }) + raise AssertionError(f"unexpected url {url}") monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test") monkeypatch.setattr(httpx, "get", fake_get) resolver = StateHubContextResolver() assert resolver.resolve("domain_summary", None, {"domain": "custodian"}) == {"ok": True} - assert resolver.resolve("repo_sbom_status", None, {"repo_slug": "activity-core"}) == {"ok": True} + + sbom = resolver.resolve("repo_sbom_status", None, {"repo_slug": "activity-core"}) + assert sbom["repo_slug"] == "activity-core" + assert sbom["has_sbom"] is True + assert sbom["last_sbom_at"] == "2026-04-26T11:37:56+00:00" + assert isinstance(sbom["sbom_age_days"], int) and sbom["sbom_age_days"] >= 0 + + assert [c["url"] for c in calls] == [ + "http://state-hub.test/state/domain/custodian", + "http://state-hub.test/sbom/activity-core", + ] + + +def test_repo_sbom_status_bulk_returns_worst_repo(monkeypatch) -> None: + calls: list[dict[str, Any]] = [] + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + calls.append({"url": url, **kwargs}) + return DummyResponse([ + {"slug": "fresh-repo", "last_sbom_at": "2099-01-01T00:00:00+00:00"}, + {"slug": "stale-repo", "last_sbom_at": "2024-01-01T00:00:00+00:00"}, + {"slug": "never-scanned", "last_sbom_at": None}, + ]) + + monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test") + monkeypatch.setattr(httpx, "get", fake_get) + + result = StateHubContextResolver().resolve( + "repo_sbom_status", None, {"repos": "all"} + ) assert calls == [ - { - "url": "http://state-hub.test/state/domain/custodian", - "params": None, - "timeout": 10.0, - }, - { - "url": "http://state-hub.test/sbom/status", - "params": {"repo": "activity-core"}, - "timeout": 10.0, - }, + {"url": "http://state-hub.test/repos/", "params": None, "timeout": 10.0}, ] + assert result["total_count"] == 3 + # both stale-repo and never-scanned exceed the 30-day staleness threshold + assert result["stale_count"] == 2 + assert result["worst_repo_slug"] == "never-scanned" + assert result["worst_age_days"] == 99999 + + by_slug = {entry["repo_slug"]: entry for entry in result["repos"]} + assert by_slug["fresh-repo"]["has_sbom"] is True + assert by_slug["fresh-repo"]["sbom_age_days"] == 0 + assert by_slug["never-scanned"]["has_sbom"] is False + assert by_slug["never-scanned"]["last_sbom_at"] is None + + +def test_repo_sbom_status_returns_empty_on_failure(monkeypatch) -> None: + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + return DummyResponse(None, status_error=httpx.HTTPError("boom")) + + monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test") + monkeypatch.setattr(httpx, "get", fake_get) + resolver = StateHubContextResolver() + + assert resolver.resolve("repo_sbom_status", None, {"repo_slug": "x"}) == {} + assert resolver.resolve("repo_sbom_status", None, {"repos": "all"}) == {} def test_resolver_failure_returns_empty(monkeypatch) -> None: diff --git a/workplans/ADHOC-2026-06-01.md b/workplans/ADHOC-2026-06-01.md index 1f83418..8435379 100644 --- a/workplans/ADHOC-2026-06-01.md +++ b/workplans/ADHOC-2026-06-01.md @@ -24,7 +24,7 @@ context resolver that is independent of the daily triage canary. ```task id: ADHOC-2026-06-01-T01 -status: todo +status: done priority: low state_hub_task_id: "87b56da9-e692-4350-9aff-47080414ec06" ``` @@ -80,3 +80,90 @@ Out of scope for this adhoc: Done when `weekly-sbom-staleness` runs cleanly against a live State Hub on Monday and either spawns SBOM rescan tasks for stale repos or leaves a clear "all SBOMs fresh" audit row — not a 404 log line and a silent no-op. + +**Completion — 2026-06-01:** + +Resolver now supports two modes selected by params: +- single-repo: `params: {repo_slug: foo}` → `GET /sbom/{foo}` +- bulk: `params: {repos: all}` → `GET /repos/`, computes per-repo age, + returns the worst-repo fields hoisted to the top of the result alongside + `stale_count`, `total_count`, `worst_*` fields, and the full per-repo list + +Never-scanned repos use a `99999` sentinel age so threshold rules treat them +as very stale without forcing the rule expression to special-case `None`. + +`activity-definitions/weekly-sbom-staleness.md` kept its existing rule +expression `context.repos.sbom_age_days > 30` (the resolver hoists the worst +repo's age to that path). The definition now documents that the rule fires +at most once per workflow run, not once per stale repo, and that the +aspirational per-stale-repo fan-out exercised by the integration tests is +not delivered by the current workflow. + +Live validation against the running State Hub on 2026-06-01: +- single: `activity-core` → 36 days since SBOM ingest at 2026-04-26 +- bulk: 48 repos total, 46 stale (>30d); worst is `info-tech-canon` + (`last_sbom_at: null` → 99999d sentinel); rule expression evaluates True + +Tests: `uv run pytest -q` → 120 passed, 1 skipped (previously 116 passed + +4 broken integration tests; broken-on-my-change reverted by hoisting the +worst-repo fields to the top of `context.repos`). + +### T02 - Rule action context interpolation and per-iteration binding + +```task +id: ADHOC-2026-06-01-T02 +status: todo +priority: low +state_hub_task_id: "6b3a185e-cbea-454c-82fb-8b4c16cefef0" +``` + +Discovered while completing T01: `RunActivityWorkflow` builds each +`TaskSpec` by lifting raw YAML fields out of the rule action without ever +interpolating `context.*` references: + +```python +# src/activity_core/workflows.py +task_spec_dicts.append({ + "title": action.get("task_template", rule.get("id", "")), + "target_repo": action.get("target_repo"), + ... +}) +``` + +So `target_repo: context.repos.repo_slug` in an ActivityDefinition rule is +emitted to the spawn log as the literal string `"context.repos.repo_slug"`, +not the actual stale repo slug. The aspirational per-stale-repo fan-out +exercised by `test_pipeline_emits_one_task_for_stale_repo_only` and friends +in `tests/test_integration_event_bridge.py` is *not* delivered by the +workflow — those tests simulate a per-repo iteration the real workflow +does not perform. + +Two pieces of work, likely related: + +1. **Action field interpolation.** Define and implement a safe template + grammar for `action.target_repo`, `action.task_template`, + `action.priority`, `action.labels`, etc. Reuse the rule-condition AST + walker (no `exec`, no comprehensions) or a constrained string + `{context.foo.bar}` substitution. Decide on grammar — instruction + prompt rendering uses `{...}` placeholders today + (`rules/executor.py::_render_prompt`); consistent with that is probably + right. + +2. **Per-iteration context binding.** Decide whether the workflow should + evaluate a rule once per element of a list-valued context field (the + integration-test contract), or whether the spawn-once semantics is + actually desired and the tests should be relaxed. If iteration is the + answer, the resolver shape from T01 already gives a clean `repos` list + to iterate over; the workflow would need an explicit `for_each:` + directive on the rule, or implicit iteration when `condition` references + a list element. + +This is borderline workplan-grade work (design decision + security review of +the interpolation grammar + workflow change + test updates). Promote to a +full workplan if anyone decides to actually do it; the adhoc T02 is just to +make sure the gap doesn't get forgotten. + +Done when either: (a) rule action fields interpolate `context.*` +expressions and a stale-repo workflow run emits a TaskSpec with the actual +repo slug, or (b) a recorded decision explicitly defers/declines the change +with reasoning.