Fix repo_sbom_status resolver — close ADHOC-2026-06-01-T01

The state-hub resolver was calling GET /sbom/status?repo={slug}, which State
Hub does not expose. Real SBOM routes are /sbom/, /sbom/{slug},
/sbom/snapshots/, /sbom/snapshots/{id}, /sbom/ingest/, /sbom/report/licences/.
The weekly-sbom-staleness ActivityDefinition was passing params {repos: all}
and the resolver was reading params.get("repo_slug", ""), so the URL
collapsed to /sbom/status?repo= and 404'd. _fetch_json swallowed the error,
the rule context.repos.sbom_age_days > 30 evaluated against {} and never
matched, and the weekly SBOM check has been a silent no-op for as long as
the route mismatch has existed.

Resolver now supports two modes selected by params:
- single-repo: {repo_slug: foo} → GET /sbom/{foo}, returns
  {repo_slug, last_sbom_at, sbom_age_days, has_sbom}
- bulk: {repos: all} → GET /repos/, computes per-repo age, returns the
  worst repo's fields hoisted to the top of the result alongside
  stale_count, total_count, worst_* fields, and the full per-repo list

Never-scanned repos get a 99999 sentinel age so threshold rules treat
them as very stale without forcing the rule to special-case None.

Hoisting the worst entry to the top preserves the existing rule
expression context.repos.sbom_age_days > 30 (and target_repo:
context.repos.repo_slug, though that field is a separate interpolation
gap tracked as ADHOC-2026-06-01-T02). The integration tests'
aspirational per-repo iteration model is left intact.

Live validation against State Hub on 2026-06-01:
- single: activity-core → 36 days since 2026-04-26 ingest
- bulk: 48 repos total, 46 stale (>30d), worst is info-tech-canon (never
  scanned), rule expression evaluates True

Tests: 120 passed, 1 skipped.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-02 03:31:56 +02:00
parent 5d3fb33c6b
commit a8d3cc2782
4 changed files with 270 additions and 16 deletions

View File

@@ -16,6 +16,9 @@ context_sources:
params:
repos: all
bind_to: context.repos
# Resolver returns a summary keyed off the worst repo so the rule expression
# below can match without comprehensions (the sandboxed evaluator does not
# support them). See _repo_sbom_status in context_resolvers/state_hub.py.
---
# Weekly SBOM Staleness Check
@@ -32,3 +35,14 @@ action:
priority: medium
labels: ["sbom", "security", "automated"]
```
NOTE: in the production bulk-mode resolver path the condition matches against
the **worst** repo's age (the resolver hoists the worst entry's
`sbom_age_days`, `repo_slug`, `last_sbom_at`, `has_sbom` to the top of
`context.repos` alongside the per-repo list and summary counts). The rule
therefore fires at most once per workflow run, not once per stale repo. The
aspirational per-stale-repo task fan-out is exercised by the integration
tests' simulated pipeline but is not delivered by the current workflow —
landing it requires (a) per-iteration context binding in the workflow and
(b) `context.*` interpolation in rule action fields. Both are tracked as
`ADHOC-2026-06-01-T02`.

View File

@@ -3,7 +3,8 @@
Registered as source type 'state-hub'.
Supported queries:
- domain_summary: GET {STATE_HUB_URL}/state/domain/{domain}
- repo_sbom_status: GET {STATE_HUB_URL}/sbom/status?repo={repo_slug}
- repo_sbom_status: single-repo -> GET {STATE_HUB_URL}/sbom/{repo_slug}
bulk (repos:all) -> GET {STATE_HUB_URL}/repos/
- state_summary: GET {STATE_HUB_URL}/state/summary
- next_steps: GET {STATE_HUB_URL}/state/next_steps
- workplan_index: GET {STATE_HUB_URL}/workstreams/workplan-index
@@ -20,6 +21,7 @@ from __future__ import annotations
import json
import os
from datetime import datetime, timezone
from typing import Any
import httpx
@@ -30,6 +32,10 @@ _DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
_TIMEOUT_SECONDS = 10.0
_OPEN_WORKSTREAM_STATUSES = {"active", "ready", "blocked"}
_OPEN_TASK_STATUSES = {"todo", "in_progress", "blocked"}
# Sentinel age for repos that have never had an SBOM ingested. Large enough
# that any threshold-based staleness rule treats them as "very stale" without
# forcing the rule expression to special-case None.
_NEVER_SCANNED_AGE_DAYS = 99999
def _base_url() -> str:
@@ -74,8 +80,7 @@ class StateHubContextResolver(ContextResolver):
domain = params.get("domain", "")
return _fetch_json(f"/state/domain/{domain}")
if query == "repo_sbom_status":
repo_slug = params.get("repo_slug", "")
return _fetch_json("/sbom/status", {"repo": repo_slug})
return _repo_sbom_status(params)
if query == "state_summary":
return _fetch_json("/state/summary")
if query == "next_steps":
@@ -105,6 +110,102 @@ class StateHubContextResolver(ContextResolver):
CONTEXT_RESOLVER_REGISTRY["state-hub"] = StateHubContextResolver
def _repo_sbom_status(params: dict[str, Any]) -> dict[str, Any]:
"""Resolve SBOM staleness against the State Hub.
Two modes, selected by params:
- Single-repo: params = {"repo_slug": "<slug>"} -> GET /sbom/{slug}.
Returns {repo_slug, last_sbom_at, sbom_age_days, has_sbom}.
- Bulk: params = {"repos": "all"} -> GET /repos/. Computes age per repo
and returns a summary the rule layer can match against without
comprehensions (the AST evaluator does not support them):
{
"repos": [{repo_slug, last_sbom_at, sbom_age_days, has_sbom}, ...],
"stale_count": int,
"total_count": int,
"worst_repo_slug": str | None,
"worst_age_days": int | None,
"worst_last_sbom_at": str | None,
}
Returns {} on HTTP error to preserve the resolver's graceful-degradation
contract.
"""
repo_slug = params.get("repo_slug")
bulk = str(params.get("repos", "")).lower() == "all"
if repo_slug and not bulk:
payload = _fetch_json(f"/sbom/{repo_slug}")
if not isinstance(payload, dict) or not payload:
return {}
return _sbom_status_entry(
repo_slug=str(payload.get("repo_slug") or repo_slug),
last_sbom_at=payload.get("last_sbom_at"),
)
if bulk:
repos = _fetch_json("/repos/")
if not isinstance(repos, list):
return {}
entries = [
_sbom_status_entry(
repo_slug=str(r.get("slug") or ""),
last_sbom_at=r.get("last_sbom_at"),
)
for r in repos
if r.get("slug")
]
stale = [e for e in entries if e["sbom_age_days"] > 30]
worst = max(entries, key=lambda e: e["sbom_age_days"], default=None)
# Hoist the worst-repo fields to the top so a sandboxed rule expression
# `context.repos.sbom_age_days > 30` matches when any repo is stale,
# without needing comprehensions. Bulk-only summary fields live
# alongside, and the full per-repo list is exposed under `repos`.
result: dict[str, Any] = {
"repos": entries,
"stale_count": len(stale),
"total_count": len(entries),
"worst_repo_slug": worst["repo_slug"] if worst else None,
"worst_age_days": worst["sbom_age_days"] if worst else None,
"worst_last_sbom_at": worst["last_sbom_at"] if worst else None,
}
if worst:
result.update({
"repo_slug": worst["repo_slug"],
"last_sbom_at": worst["last_sbom_at"],
"sbom_age_days": worst["sbom_age_days"],
"has_sbom": worst["has_sbom"],
})
return result
return {}
def _sbom_status_entry(repo_slug: str, last_sbom_at: Any) -> dict[str, Any]:
age_days, has_sbom, normalised = _sbom_age_days(last_sbom_at)
return {
"repo_slug": repo_slug,
"last_sbom_at": normalised,
"sbom_age_days": age_days,
"has_sbom": has_sbom,
}
def _sbom_age_days(last_sbom_at: Any) -> tuple[int, bool, str | None]:
if not isinstance(last_sbom_at, str) or not last_sbom_at:
return _NEVER_SCANNED_AGE_DAYS, False, None
try:
ts = datetime.fromisoformat(last_sbom_at.replace("Z", "+00:00"))
except ValueError:
return _NEVER_SCANNED_AGE_DAYS, False, last_sbom_at
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
delta = datetime.now(timezone.utc) - ts
return max(0, delta.days), True, last_sbom_at
def _daily_triage_digest(params: dict[str, Any]) -> str:
"""Return a compact JSON string safe to inject into an instruction prompt.

View File

@@ -82,27 +82,79 @@ def test_existing_queries_still_resolve(monkeypatch) -> None:
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
calls.append({"url": url, **kwargs})
return DummyResponse({"ok": True})
if url.endswith("/state/domain/custodian"):
return DummyResponse({"ok": True})
if url.endswith("/sbom/activity-core"):
return DummyResponse({
"repo_slug": "activity-core",
"last_sbom_at": "2026-04-26T11:37:56+00:00",
"entry_count": 38,
"entries": [],
})
raise AssertionError(f"unexpected url {url}")
monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test")
monkeypatch.setattr(httpx, "get", fake_get)
resolver = StateHubContextResolver()
assert resolver.resolve("domain_summary", None, {"domain": "custodian"}) == {"ok": True}
assert resolver.resolve("repo_sbom_status", None, {"repo_slug": "activity-core"}) == {"ok": True}
sbom = resolver.resolve("repo_sbom_status", None, {"repo_slug": "activity-core"})
assert sbom["repo_slug"] == "activity-core"
assert sbom["has_sbom"] is True
assert sbom["last_sbom_at"] == "2026-04-26T11:37:56+00:00"
assert isinstance(sbom["sbom_age_days"], int) and sbom["sbom_age_days"] >= 0
assert [c["url"] for c in calls] == [
"http://state-hub.test/state/domain/custodian",
"http://state-hub.test/sbom/activity-core",
]
def test_repo_sbom_status_bulk_returns_worst_repo(monkeypatch) -> None:
calls: list[dict[str, Any]] = []
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
calls.append({"url": url, **kwargs})
return DummyResponse([
{"slug": "fresh-repo", "last_sbom_at": "2099-01-01T00:00:00+00:00"},
{"slug": "stale-repo", "last_sbom_at": "2024-01-01T00:00:00+00:00"},
{"slug": "never-scanned", "last_sbom_at": None},
])
monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test")
monkeypatch.setattr(httpx, "get", fake_get)
result = StateHubContextResolver().resolve(
"repo_sbom_status", None, {"repos": "all"}
)
assert calls == [
{
"url": "http://state-hub.test/state/domain/custodian",
"params": None,
"timeout": 10.0,
},
{
"url": "http://state-hub.test/sbom/status",
"params": {"repo": "activity-core"},
"timeout": 10.0,
},
{"url": "http://state-hub.test/repos/", "params": None, "timeout": 10.0},
]
assert result["total_count"] == 3
# both stale-repo and never-scanned exceed the 30-day staleness threshold
assert result["stale_count"] == 2
assert result["worst_repo_slug"] == "never-scanned"
assert result["worst_age_days"] == 99999
by_slug = {entry["repo_slug"]: entry for entry in result["repos"]}
assert by_slug["fresh-repo"]["has_sbom"] is True
assert by_slug["fresh-repo"]["sbom_age_days"] == 0
assert by_slug["never-scanned"]["has_sbom"] is False
assert by_slug["never-scanned"]["last_sbom_at"] is None
def test_repo_sbom_status_returns_empty_on_failure(monkeypatch) -> None:
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
return DummyResponse(None, status_error=httpx.HTTPError("boom"))
monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test")
monkeypatch.setattr(httpx, "get", fake_get)
resolver = StateHubContextResolver()
assert resolver.resolve("repo_sbom_status", None, {"repo_slug": "x"}) == {}
assert resolver.resolve("repo_sbom_status", None, {"repos": "all"}) == {}
def test_resolver_failure_returns_empty(monkeypatch) -> None:

View File

@@ -24,7 +24,7 @@ context resolver that is independent of the daily triage canary.
```task
id: ADHOC-2026-06-01-T01
status: todo
status: done
priority: low
state_hub_task_id: "87b56da9-e692-4350-9aff-47080414ec06"
```
@@ -80,3 +80,90 @@ Out of scope for this adhoc:
Done when `weekly-sbom-staleness` runs cleanly against a live State Hub on
Monday and either spawns SBOM rescan tasks for stale repos or leaves a clear
"all SBOMs fresh" audit row — not a 404 log line and a silent no-op.
**Completion — 2026-06-01:**
Resolver now supports two modes selected by params:
- single-repo: `params: {repo_slug: foo}``GET /sbom/{foo}`
- bulk: `params: {repos: all}``GET /repos/`, computes per-repo age,
returns the worst-repo fields hoisted to the top of the result alongside
`stale_count`, `total_count`, `worst_*` fields, and the full per-repo list
Never-scanned repos use a `99999` sentinel age so threshold rules treat them
as very stale without forcing the rule expression to special-case `None`.
`activity-definitions/weekly-sbom-staleness.md` kept its existing rule
expression `context.repos.sbom_age_days > 30` (the resolver hoists the worst
repo's age to that path). The definition now documents that the rule fires
at most once per workflow run, not once per stale repo, and that the
aspirational per-stale-repo fan-out exercised by the integration tests is
not delivered by the current workflow.
Live validation against the running State Hub on 2026-06-01:
- single: `activity-core` → 36 days since SBOM ingest at 2026-04-26
- bulk: 48 repos total, 46 stale (>30d); worst is `info-tech-canon`
(`last_sbom_at: null` → 99999d sentinel); rule expression evaluates True
Tests: `uv run pytest -q` → 120 passed, 1 skipped (previously 116 passed +
4 broken integration tests; broken-on-my-change reverted by hoisting the
worst-repo fields to the top of `context.repos`).
### T02 - Rule action context interpolation and per-iteration binding
```task
id: ADHOC-2026-06-01-T02
status: todo
priority: low
state_hub_task_id: "6b3a185e-cbea-454c-82fb-8b4c16cefef0"
```
Discovered while completing T01: `RunActivityWorkflow` builds each
`TaskSpec` by lifting raw YAML fields out of the rule action without ever
interpolating `context.*` references:
```python
# src/activity_core/workflows.py
task_spec_dicts.append({
"title": action.get("task_template", rule.get("id", "")),
"target_repo": action.get("target_repo"),
...
})
```
So `target_repo: context.repos.repo_slug` in an ActivityDefinition rule is
emitted to the spawn log as the literal string `"context.repos.repo_slug"`,
not the actual stale repo slug. The aspirational per-stale-repo fan-out
exercised by `test_pipeline_emits_one_task_for_stale_repo_only` and friends
in `tests/test_integration_event_bridge.py` is *not* delivered by the
workflow — those tests simulate a per-repo iteration the real workflow
does not perform.
Two pieces of work, likely related:
1. **Action field interpolation.** Define and implement a safe template
grammar for `action.target_repo`, `action.task_template`,
`action.priority`, `action.labels`, etc. Reuse the rule-condition AST
walker (no `exec`, no comprehensions) or a constrained string
`{context.foo.bar}` substitution. Decide on grammar — instruction
prompt rendering uses `{...}` placeholders today
(`rules/executor.py::_render_prompt`); consistent with that is probably
right.
2. **Per-iteration context binding.** Decide whether the workflow should
evaluate a rule once per element of a list-valued context field (the
integration-test contract), or whether the spawn-once semantics is
actually desired and the tests should be relaxed. If iteration is the
answer, the resolver shape from T01 already gives a clean `repos` list
to iterate over; the workflow would need an explicit `for_each:`
directive on the rule, or implicit iteration when `condition` references
a list element.
This is borderline workplan-grade work (design decision + security review of
the interpolation grammar + workflow change + test updates). Promote to a
full workplan if anyone decides to actually do it; the adhoc T02 is just to
make sure the gap doesn't get forgotten.
Done when either: (a) rule action fields interpolate `context.*`
expressions and a stale-repo workflow run emits a TaskSpec with the actual
repo slug, or (b) a recorded decision explicitly defers/declines the change
with reasoning.