Add reuse-surface report gaps resolver

This commit is contained in:
2026-06-18 17:58:00 +02:00
parent 23f4956b68
commit 2078915854
6 changed files with 802 additions and 1 deletions

View File

@@ -149,6 +149,8 @@ async def resolve_context(
query = source.get("query", "")
params = source.get("params") or {}
required = bool(source.get("required") or params.get("required", False))
resolver_params = dict(params)
resolver_params["required"] = required
raw_bind = source.get("bind_to") or source.get("name") or source_type
# Strip the 'context.' namespace prefix so evaluator can find the key.
bind_key = raw_bind.removeprefix("context.") if raw_bind.startswith("context.") else raw_bind
@@ -172,7 +174,7 @@ async def resolve_context(
continue
try:
resolved = resolver_cls().resolve(query, event_envelope, params)
resolved = resolver_cls().resolve(query, event_envelope, resolver_params)
snapshot[bind_key] = _bind_resolver_result(bind_key, resolved)
except Exception as exc:
if required:

View File

@@ -4,4 +4,5 @@ from activity_core.context_resolvers import ( # noqa: F401
ops_inventory,
repo_scoping,
state_hub,
reuse_surface,
)

View File

@@ -0,0 +1,516 @@
"""Reuse-surface registry hygiene context adapter.
Registered as source type ``reuse-surface`` and as the ``shell`` resolver
dispatcher for the ``reuse_surface_report_gaps`` query. Other shell queries
continue to delegate to the kaizen resolver for backward compatibility.
"""
from __future__ import annotations
import json
import logging
import os
import socket
import subprocess
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import httpx
import yaml
from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, ContextResolver
from activity_core.context_resolvers.kaizen import KaizenContextResolver
from activity_core.context_resolvers.state_hub import StateHubContextResolver
logger = logging.getLogger(__name__)
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
_REPORT_TIMEOUT_SECONDS = 60
_STATE_HUB_TIMEOUT_SECONDS = 10.0
_KNOWN_SIGNALS = frozenset(
{
"registry_gap",
"empty_capability_scaffold",
"stale_scope",
"stale_sbom",
"publish_check_fail",
}
)
@dataclass(frozen=True)
class RosterEntry:
slug: str
domain: str | None = None
publish_check: str | None = None
def _base_url() -> str:
return os.environ.get("STATE_HUB_URL", _DEFAULT_STATE_HUB_URL).rstrip("/")
def _runner_host(params: dict[str, Any]) -> str:
return str(
params.get("runner_host")
or os.environ.get("KAIZEN_RUNNER_HOST")
or socket.gethostname()
)
def _as_required(params: dict[str, Any]) -> bool:
return bool(params.get("required", False))
def reuse_surface_report_gaps(params: dict[str, Any]) -> dict[str, Any]:
"""Resolve registry-hygiene gaps for the next rollout batch.
Missing operational dependencies are visible failures for required sources
and graceful empty lists for optional sources so definitions can opt into
either behavior without changing rule logic.
"""
try:
return _resolve_reuse_surface_report_gaps(params)
except Exception as exc:
if _as_required(params):
raise
logger.warning("reuse_surface_report_gaps unavailable: %s", exc)
return {"gaps": []}
def _resolve_reuse_surface_report_gaps(params: dict[str, Any]) -> dict[str, Any]:
roster_path = _roster_path(params)
entries = _load_active_roster_entries(roster_path)
if not entries:
return {"gaps": []}
state_path = _round_robin_state_path(params, roster_path)
selected, next_cursor = _select_round_robin_batch(
entries,
_batch_size(params),
state_path,
)
if not selected:
return {"gaps": []}
signals = _enabled_signals(_signals_path(params, roster_path))
roots = _resolve_repo_roots(selected, _runner_host(params))
report = _reuse_surface_report(params, signals)
gaps = _gap_records(selected, roots, signals, report)
_write_round_robin_state(state_path, next_cursor, selected)
return {"gaps": gaps}
def _roster_path(params: dict[str, Any]) -> Path:
raw = params.get("roster")
if not raw:
raise ValueError("reuse_surface_report_gaps requires params.roster")
path = Path(str(raw)).expanduser()
if not path.is_file():
raise FileNotFoundError(f"reuse_surface_report_gaps roster not found: {path}")
return path
def _batch_size(params: dict[str, Any]) -> int:
try:
return max(1, int(params.get("batch_size", 3)))
except (TypeError, ValueError):
return 3
def _round_robin_state_path(params: dict[str, Any], roster_path: Path) -> Path:
raw = params.get("round_robin_state")
if raw:
return Path(str(raw)).expanduser()
return roster_path.with_name("round-robin-state.json")
def _signals_path(params: dict[str, Any], roster_path: Path) -> Path:
raw = params.get("signals")
if raw:
return Path(str(raw)).expanduser()
return roster_path.with_name("signals.yml")
def _load_active_roster_entries(path: Path) -> list[RosterEntry]:
data = yaml.safe_load(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError(f"reuse_surface rollout roster is not a mapping: {path}")
entries: dict[str, RosterEntry] = {}
for domain, block in _iter_domain_blocks(data):
if _domain_phase(block) != "active":
continue
for item in _repo_items(block):
entry = _entry_from_item(item, domain, block)
if entry and entry.slug not in entries:
entries[entry.slug] = entry
return list(entries.values())
def _iter_domain_blocks(data: dict[str, Any]) -> list[tuple[str | None, dict[str, Any]]]:
domains = data.get("domains")
if isinstance(domains, dict):
return [
(str(name), block)
for name, block in domains.items()
if isinstance(block, dict)
]
if isinstance(domains, list):
return [
(str(block.get("name") or block.get("domain") or ""), block)
for block in domains
if isinstance(block, dict)
]
if isinstance(data.get("active"), list):
return [(None, {"phase": "active", "repos": data["active"]})]
return [
(str(name), block)
for name, block in data.items()
if isinstance(block, dict) and ("phase" in block or "repos" in block)
]
def _domain_phase(block: dict[str, Any]) -> str:
return str(block.get("phase") or block.get("status") or "").lower()
def _repo_items(block: dict[str, Any]) -> list[Any]:
repos = (
block.get("repos")
or block.get("repo_slugs")
or block.get("repositories")
or block.get("slugs")
or []
)
if isinstance(repos, dict):
items: list[Any] = []
for slug, config in repos.items():
if isinstance(config, dict):
item = dict(config)
item.setdefault("slug", slug)
items.append(item)
else:
items.append(str(slug))
return items
if isinstance(repos, list):
return repos
return []
def _entry_from_item(
item: Any,
domain: str | None,
block: dict[str, Any],
) -> RosterEntry | None:
publish_check = block.get("publish_check")
if isinstance(item, str):
slug = item
elif isinstance(item, dict):
slug = item.get("slug") or item.get("repo") or item.get("name")
publish_check = item.get("publish_check", publish_check)
else:
return None
if not slug:
return None
return RosterEntry(
slug=str(slug),
domain=domain or None,
publish_check=str(publish_check).lower() if publish_check is not None else None,
)
def _select_round_robin_batch(
entries: list[RosterEntry],
batch_size: int,
state_path: Path,
) -> tuple[list[RosterEntry], int]:
if not entries:
return [], 0
cursor = _read_round_robin_cursor(state_path) % len(entries)
size = min(batch_size, len(entries))
selected = [entries[(cursor + offset) % len(entries)] for offset in range(size)]
next_cursor = (cursor + size) % len(entries)
return selected, next_cursor
def _read_round_robin_cursor(path: Path) -> int:
if not path.is_file():
return 0
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return 0
if not isinstance(data, dict):
return 0
try:
return int(data.get("cursor", 0))
except (TypeError, ValueError):
return 0
def _write_round_robin_state(
path: Path,
cursor: int,
selected: list[RosterEntry],
) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"cursor": cursor,
"last_batch": [entry.slug for entry in selected],
"updated_at": datetime.now(timezone.utc).isoformat(),
}
path.write_text(
json.dumps(payload, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
def _enabled_signals(path: Path) -> set[str]:
if not path.is_file():
return set(_KNOWN_SIGNALS)
data = yaml.safe_load(path.read_text(encoding="utf-8"))
node = data.get("signals") if isinstance(data, dict) else data
enabled: set[str] = set()
saw_known_signal = False
if isinstance(node, dict):
for name, config in node.items():
if str(name) not in _KNOWN_SIGNALS:
continue
saw_known_signal = True
if isinstance(config, dict) and config.get("enabled") is False:
continue
if config is False:
continue
enabled.add(str(name))
elif isinstance(node, list):
for item in node:
if isinstance(item, str) and item in _KNOWN_SIGNALS:
saw_known_signal = True
enabled.add(item)
elif isinstance(item, dict):
name = item.get("id") or item.get("signal") or item.get("name")
if str(name) in _KNOWN_SIGNALS and item.get("enabled", True) is not False:
saw_known_signal = True
enabled.add(str(name))
return enabled if saw_known_signal else set(_KNOWN_SIGNALS)
def _resolve_repo_roots(
entries: list[RosterEntry],
runner_host: str,
) -> dict[str, Path]:
requested = {entry.slug for entry in entries}
roots: dict[str, Path] = {}
for repo in _fetch_repos():
slug = str(repo.get("slug") or "")
if slug not in requested:
continue
raw = _repo_path_for_host(repo, runner_host)
if raw:
roots[slug] = Path(raw)
return roots
def _fetch_repos() -> list[dict[str, Any]]:
url = f"{_base_url()}/repos/"
try:
resp = httpx.get(url, timeout=_STATE_HUB_TIMEOUT_SECONDS)
resp.raise_for_status()
except httpx.HTTPError as exc:
raise RuntimeError(f"State Hub unreachable at {url}: {exc}") from exc
payload = resp.json()
if not isinstance(payload, list):
raise RuntimeError(f"State Hub /repos/ returned non-list: {type(payload)!r}")
return [repo for repo in payload if isinstance(repo, dict)]
def _repo_path_for_host(repo: dict[str, Any], runner_host: str) -> str | None:
host_paths = repo.get("host_paths") or {}
raw = None
if isinstance(host_paths, dict):
raw = host_paths.get(runner_host)
raw = raw or repo.get("local_path")
if not raw or raw == "(unknown)":
return None
return str(raw)
def _reuse_surface_report(params: dict[str, Any], signals: set[str]) -> dict[str, Any]:
if not (signals & {"registry_gap", "empty_capability_scaffold"}):
return {}
binary = str(params.get("reuse_surface_bin") or "reuse-surface")
try:
completed = subprocess.run(
[binary, "report", "gaps", "--format", "json"],
capture_output=True,
check=False,
text=True,
timeout=_REPORT_TIMEOUT_SECONDS,
)
except FileNotFoundError as exc:
raise RuntimeError(f"reuse-surface CLI not found: {binary}") from exc
except subprocess.TimeoutExpired as exc:
raise RuntimeError("reuse-surface report gaps timed out") from exc
if completed.returncode != 0:
detail = completed.stderr.strip() or completed.stdout.strip()
raise RuntimeError(f"reuse-surface report gaps failed: {detail}")
try:
payload = json.loads(completed.stdout or "{}")
except json.JSONDecodeError as exc:
raise RuntimeError("reuse-surface report gaps returned invalid JSON") from exc
if not isinstance(payload, dict):
raise RuntimeError("reuse-surface report gaps returned non-object JSON")
return payload
def _gap_records(
entries: list[RosterEntry],
roots: dict[str, Path],
signals: set[str],
report: dict[str, Any],
) -> list[dict[str, Any]]:
empty_scaffolds = _repo_set(report, {"empty_scaffolds", "empty_scaffold"})
publish_fail = _repo_set(
report,
{"publish_fail", "publish_fails", "publish_failures"},
)
gaps: list[dict[str, Any]] = []
seen: set[tuple[str, str]] = set()
for entry in entries:
root = roots.get(entry.slug)
if root is None:
logger.info("reuse_surface repo_unreachable slug=%s", entry.slug)
continue
if (
signals & {"registry_gap", "empty_capability_scaffold"}
and entry.slug in empty_scaffolds
):
_append_gap(gaps, seen, entry.slug, root, "empty_capability_scaffold")
if "registry_gap" in signals and entry.slug in publish_fail:
_append_gap(gaps, seen, entry.slug, root, "registry_gap")
if "publish_check_fail" in signals and entry.publish_check == "fail":
_append_gap(gaps, seen, entry.slug, root, "publish_check_fail")
if "stale_scope" in signals and _scope_is_stale(root):
_append_gap(gaps, seen, entry.slug, root, "stale_scope")
if "stale_sbom" in signals and _sbom_is_stale(entry.slug):
_append_gap(gaps, seen, entry.slug, root, "stale_sbom")
return gaps
def _append_gap(
gaps: list[dict[str, Any]],
seen: set[tuple[str, str]],
slug: str,
root: Path,
signal: str,
) -> None:
key = (slug, signal)
if key in seen:
return
seen.add(key)
gaps.append(
{
"repo": slug,
"root": str(root),
"signal": signal,
"hygiene_signal": signal,
}
)
def _scope_is_stale(root: Path) -> bool:
scope = root / "SCOPE.md"
if not scope.is_file():
return True
age_seconds = datetime.now(timezone.utc).timestamp() - scope.stat().st_mtime
return age_seconds > 90 * 24 * 60 * 60
def _sbom_is_stale(slug: str) -> bool:
payload = StateHubContextResolver().resolve(
"repo_sbom_status",
None,
{"repo_slug": slug},
)
if not isinstance(payload, dict):
return False
try:
return int(payload.get("sbom_age_days", 0)) > 30
except (TypeError, ValueError):
return False
def _repo_set(report: dict[str, Any], keys: set[str]) -> set[str]:
slugs: set[str] = set()
for value in _values_for_keys(report, keys):
slugs.update(_slugs_from_value(value))
return slugs
def _values_for_keys(value: Any, keys: set[str]) -> list[Any]:
values: list[Any] = []
if isinstance(value, dict):
for key, nested in value.items():
if key in keys:
values.append(nested)
values.extend(_values_for_keys(nested, keys))
elif isinstance(value, list):
for item in value:
values.extend(_values_for_keys(item, keys))
return values
def _slugs_from_value(value: Any) -> set[str]:
if isinstance(value, str):
return {value}
if isinstance(value, list):
slugs: set[str] = set()
for item in value:
slugs.update(_slugs_from_value(item))
return slugs
if isinstance(value, dict):
for key in ("repo", "repo_slug", "slug", "name"):
if value.get(key):
return {str(value[key])}
slugs: set[str] = set()
for key, nested in value.items():
if nested is True or isinstance(nested, (dict, list)):
slugs.add(str(key))
slugs.update(_slugs_from_value(nested))
return slugs
return set()
class ReuseSurfaceContextResolver(ContextResolver):
"""Resolves reuse-surface registry hygiene gap reports."""
def resolve(self, query: str, event: Any, params: dict[str, Any]) -> dict[str, Any]:
if query == "reuse_surface_report_gaps":
return reuse_surface_report_gaps(params)
return {}
class ShellContextResolver(ContextResolver):
"""Dispatch shell-backed context queries without breaking kaizen aliases."""
def resolve(self, query: str, event: Any, params: dict[str, Any]) -> dict[str, Any]:
if query == "reuse_surface_report_gaps":
return reuse_surface_report_gaps(params)
return KaizenContextResolver().resolve(query, event, params)
CONTEXT_RESOLVER_REGISTRY["reuse-surface"] = ReuseSurfaceContextResolver
CONTEXT_RESOLVER_REGISTRY["shell"] = ShellContextResolver

View File

@@ -88,6 +88,43 @@ def test_for_each_binds_each_list_item_before_condition_and_action_rendering() -
]
def test_for_each_can_gate_registry_hygiene_gaps_on_signal() -> None:
rules = [
{
"id": "flag-registry-hygiene-gap",
"for_each": "context.gaps",
"bind_as": "g",
"condition": 'context.g.hygiene_signal != ""',
"action": {
"task_template": "Close registry hygiene gap for {context.g.repo}",
"target_repo": "context.g.repo",
"priority": "medium",
"labels": ["registry-hygiene", "{context.g.hygiene_signal}"],
},
}
]
context = {
"gaps": [
{
"repo": "reuse-surface",
"hygiene_signal": "empty_capability_scaffold",
},
{
"repo": "activity-core",
"hygiene_signal": "",
},
]
}
specs = expand_rule_actions(rules, _Event(), context)
assert [spec["target_repo"] for spec in specs] == ["reuse-surface"]
assert specs[0]["labels"] == [
"registry-hygiene",
"empty_capability_scaffold",
]
def test_for_each_rejects_non_path_expression() -> None:
rules = [
{

View File

@@ -0,0 +1,167 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import pytest
from temporalio.exceptions import ApplicationError
from activity_core.activities import resolve_context
from activity_core.context_resolvers import reuse_surface
from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY
class _Response:
def __init__(self, payload: Any) -> None:
self._payload = payload
def raise_for_status(self) -> None:
return None
def json(self) -> Any:
return self._payload
class _Completed:
returncode = 0
stderr = ""
def __init__(self, payload: dict[str, Any]) -> None:
self.stdout = json.dumps(payload)
def _write_rollout(path: Path) -> None:
path.write_text(
"""
domains:
reuse:
phase: active
repos:
- reuse-surface
- activity-core
parked:
phase: backlog
repos:
- ignored-repo
""".lstrip(),
encoding="utf-8",
)
def _write_cli_only_signals(path: Path) -> None:
path.write_text(
"""
signals:
empty_capability_scaffold:
enabled: true
registry_gap:
enabled: false
stale_scope:
enabled: false
stale_sbom:
enabled: false
publish_check_fail:
enabled: false
""".lstrip(),
encoding="utf-8",
)
def test_shell_resolver_emits_reuse_surface_gaps_and_advances_cursor(
tmp_path,
monkeypatch,
) -> None:
rollout = tmp_path / "rollout.yaml"
_write_rollout(rollout)
_write_cli_only_signals(tmp_path / "signals.yml")
reuse_root = tmp_path / "reuse-surface"
reuse_root.mkdir()
(reuse_root / "SCOPE.md").write_text("fresh\n", encoding="utf-8")
activity_root = tmp_path / "activity-core"
activity_root.mkdir()
monkeypatch.setenv("KAIZEN_RUNNER_HOST", "runner")
def fake_get(url: str, **kwargs: Any) -> _Response:
assert url.endswith("/repos/")
return _Response(
[
{
"slug": "reuse-surface",
"host_paths": {"runner": str(reuse_root)},
},
{
"slug": "activity-core",
"host_paths": {"runner": str(activity_root)},
},
]
)
def fake_run(cmd: list[str], **kwargs: Any) -> _Completed:
assert cmd == ["reuse-surface", "report", "gaps", "--format", "json"]
return _Completed({"empty_scaffolds": ["reuse-surface"]})
monkeypatch.setattr(reuse_surface.httpx, "get", fake_get)
monkeypatch.setattr(reuse_surface.subprocess, "run", fake_run)
import activity_core.context_resolvers # noqa: F401
result = CONTEXT_RESOLVER_REGISTRY["shell"]().resolve(
"reuse_surface_report_gaps",
None,
{
"roster": str(rollout),
"batch_size": 1,
},
)
assert result == {
"gaps": [
{
"repo": "reuse-surface",
"root": str(reuse_root),
"signal": "empty_capability_scaffold",
"hygiene_signal": "empty_capability_scaffold",
}
]
}
state = json.loads((tmp_path / "round-robin-state.json").read_text(encoding="utf-8"))
assert state["cursor"] == 1
assert state["last_batch"] == ["reuse-surface"]
def test_shell_resolver_keeps_kaizen_fallback_for_existing_queries() -> None:
assert CONTEXT_RESOLVER_REGISTRY["shell"]().resolve("unknown_query", None, {}) == {}
@pytest.mark.asyncio
async def test_optional_reuse_surface_missing_roster_binds_empty_list(tmp_path) -> None:
snapshot = await resolve_context(
[
{
"type": "shell",
"query": "reuse_surface_report_gaps",
"params": {"roster": str(tmp_path / "missing.yaml")},
"bind_to": "context.gaps",
}
]
)
assert snapshot == {"gaps": []}
@pytest.mark.asyncio
async def test_required_reuse_surface_missing_roster_fails_visibly(tmp_path) -> None:
with pytest.raises(ApplicationError, match="Required context resolver"):
await resolve_context(
[
{
"type": "shell",
"query": "reuse_surface_report_gaps",
"params": {"roster": str(tmp_path / "missing.yaml")},
"bind_to": "context.gaps",
"required": True,
}
]
)

View File

@@ -0,0 +1,78 @@
---
id: ACTIVITY-WP-0013
type: workplan
title: "Reuse Surface Report Gaps Resolver"
domain: custodian
repo: activity-core
status: finished
owner: codex
topic_slug: activity-core
created: "2026-06-18"
updated: "2026-06-18"
state_hub_workstream_id: "01e68dfd-b146-4aef-a575-2d3b178ca5c2"
---
# Reuse Surface Report Gaps Resolver
Implement the R2 handoff from kaizen-agentic (`bffa224c`) so the
`reuse_surface_report_gaps` shell context source populates
`context.gaps` for the Coulomb daily registry hygiene sweep.
## Register Shell Resolver Query
```task
id: ACTIVITY-WP-0013-T01
status: done
priority: high
state_hub_task_id: "a6e1fc5c-7b42-436d-914e-4d605cb6f329"
```
Add a dedicated reuse-surface context resolver module and register
`reuse_surface_report_gaps` on the `shell` resolver path while preserving
the existing kaizen shell query behavior.
## Implement Batch And Signal Semantics
```task
id: ACTIVITY-WP-0013-T02
status: done
priority: high
state_hub_task_id: "229cf285-8388-471d-95fd-08400db1553e"
```
Load the Coulomb rollout roster, select active repos with a persisted
round-robin cursor, resolve repo roots from State Hub host paths, run
`reuse-surface report gaps --format json`, and emit gap records for the
enabled registry hygiene signals.
## Cover Required And Optional Failure Modes
```task
id: ACTIVITY-WP-0013-T03
status: done
priority: high
state_hub_task_id: "85b5c7d4-40e1-4945-8ada-1dff2363c194"
```
Ensure missing required dependencies fail visibly while optional resolver
sources bind an empty `context.gaps` list. Add unit coverage for fixture
rollout data, mocked CLI JSON, resolver binding, and `hygiene_signal`
rule gating.
## Smoke Real Coulomb Rollout
```task
id: ACTIVITY-WP-0013-T04
status: done
priority: medium
state_hub_task_id: "6a5446ed-b4ec-4693-b508-65415571d834"
```
Run a live resolver smoke against
`/home/worsch/coulomb-loop/loops/registry-hygiene/rollout.yaml` using a
temporary round-robin cursor. The real active rollout produced five gaps,
including one for `reuse-surface` with `hygiene_signal: stale_sbom`.
The smoke supplied `reuse_surface_bin:
/home/worsch/reuse-surface/.venv/bin/reuse-surface` and
`runner_host: bnt-lap001`; the worker environment or definition params must
provide equivalent values before enabling the production sweep.