diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py index 6fe6357..4ec54e7 100644 --- a/src/activity_core/activities.py +++ b/src/activity_core/activities.py @@ -149,6 +149,8 @@ async def resolve_context( query = source.get("query", "") params = source.get("params") or {} required = bool(source.get("required") or params.get("required", False)) + resolver_params = dict(params) + resolver_params["required"] = required raw_bind = source.get("bind_to") or source.get("name") or source_type # Strip the 'context.' namespace prefix so evaluator can find the key. bind_key = raw_bind.removeprefix("context.") if raw_bind.startswith("context.") else raw_bind @@ -172,7 +174,7 @@ async def resolve_context( continue try: - resolved = resolver_cls().resolve(query, event_envelope, params) + resolved = resolver_cls().resolve(query, event_envelope, resolver_params) snapshot[bind_key] = _bind_resolver_result(bind_key, resolved) except Exception as exc: if required: diff --git a/src/activity_core/context_resolvers/__init__.py b/src/activity_core/context_resolvers/__init__.py index 9391ffe..2886e84 100644 --- a/src/activity_core/context_resolvers/__init__.py +++ b/src/activity_core/context_resolvers/__init__.py @@ -4,4 +4,5 @@ from activity_core.context_resolvers import ( # noqa: F401 ops_inventory, repo_scoping, state_hub, + reuse_surface, ) diff --git a/src/activity_core/context_resolvers/reuse_surface.py b/src/activity_core/context_resolvers/reuse_surface.py new file mode 100644 index 0000000..31a1ebf --- /dev/null +++ b/src/activity_core/context_resolvers/reuse_surface.py @@ -0,0 +1,516 @@ +"""Reuse-surface registry hygiene context adapter. + +Registered as source type ``reuse-surface`` and as the ``shell`` resolver +dispatcher for the ``reuse_surface_report_gaps`` query. Other shell queries +continue to delegate to the kaizen resolver for backward compatibility. +""" + +from __future__ import annotations + +import json +import logging +import os +import socket +import subprocess +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import httpx +import yaml + +from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, ContextResolver +from activity_core.context_resolvers.kaizen import KaizenContextResolver +from activity_core.context_resolvers.state_hub import StateHubContextResolver + +logger = logging.getLogger(__name__) + +_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" +_REPORT_TIMEOUT_SECONDS = 60 +_STATE_HUB_TIMEOUT_SECONDS = 10.0 +_KNOWN_SIGNALS = frozenset( + { + "registry_gap", + "empty_capability_scaffold", + "stale_scope", + "stale_sbom", + "publish_check_fail", + } +) + + +@dataclass(frozen=True) +class RosterEntry: + slug: str + domain: str | None = None + publish_check: str | None = None + + +def _base_url() -> str: + return os.environ.get("STATE_HUB_URL", _DEFAULT_STATE_HUB_URL).rstrip("/") + + +def _runner_host(params: dict[str, Any]) -> str: + return str( + params.get("runner_host") + or os.environ.get("KAIZEN_RUNNER_HOST") + or socket.gethostname() + ) + + +def _as_required(params: dict[str, Any]) -> bool: + return bool(params.get("required", False)) + + +def reuse_surface_report_gaps(params: dict[str, Any]) -> dict[str, Any]: + """Resolve registry-hygiene gaps for the next rollout batch. + + Missing operational dependencies are visible failures for required sources + and graceful empty lists for optional sources so definitions can opt into + either behavior without changing rule logic. + """ + try: + return _resolve_reuse_surface_report_gaps(params) + except Exception as exc: + if _as_required(params): + raise + logger.warning("reuse_surface_report_gaps unavailable: %s", exc) + return {"gaps": []} + + +def _resolve_reuse_surface_report_gaps(params: dict[str, Any]) -> dict[str, Any]: + roster_path = _roster_path(params) + entries = _load_active_roster_entries(roster_path) + if not entries: + return {"gaps": []} + + state_path = _round_robin_state_path(params, roster_path) + selected, next_cursor = _select_round_robin_batch( + entries, + _batch_size(params), + state_path, + ) + if not selected: + return {"gaps": []} + + signals = _enabled_signals(_signals_path(params, roster_path)) + roots = _resolve_repo_roots(selected, _runner_host(params)) + report = _reuse_surface_report(params, signals) + gaps = _gap_records(selected, roots, signals, report) + + _write_round_robin_state(state_path, next_cursor, selected) + return {"gaps": gaps} + + +def _roster_path(params: dict[str, Any]) -> Path: + raw = params.get("roster") + if not raw: + raise ValueError("reuse_surface_report_gaps requires params.roster") + path = Path(str(raw)).expanduser() + if not path.is_file(): + raise FileNotFoundError(f"reuse_surface_report_gaps roster not found: {path}") + return path + + +def _batch_size(params: dict[str, Any]) -> int: + try: + return max(1, int(params.get("batch_size", 3))) + except (TypeError, ValueError): + return 3 + + +def _round_robin_state_path(params: dict[str, Any], roster_path: Path) -> Path: + raw = params.get("round_robin_state") + if raw: + return Path(str(raw)).expanduser() + return roster_path.with_name("round-robin-state.json") + + +def _signals_path(params: dict[str, Any], roster_path: Path) -> Path: + raw = params.get("signals") + if raw: + return Path(str(raw)).expanduser() + return roster_path.with_name("signals.yml") + + +def _load_active_roster_entries(path: Path) -> list[RosterEntry]: + data = yaml.safe_load(path.read_text(encoding="utf-8")) + if not isinstance(data, dict): + raise ValueError(f"reuse_surface rollout roster is not a mapping: {path}") + + entries: dict[str, RosterEntry] = {} + for domain, block in _iter_domain_blocks(data): + if _domain_phase(block) != "active": + continue + for item in _repo_items(block): + entry = _entry_from_item(item, domain, block) + if entry and entry.slug not in entries: + entries[entry.slug] = entry + return list(entries.values()) + + +def _iter_domain_blocks(data: dict[str, Any]) -> list[tuple[str | None, dict[str, Any]]]: + domains = data.get("domains") + if isinstance(domains, dict): + return [ + (str(name), block) + for name, block in domains.items() + if isinstance(block, dict) + ] + if isinstance(domains, list): + return [ + (str(block.get("name") or block.get("domain") or ""), block) + for block in domains + if isinstance(block, dict) + ] + if isinstance(data.get("active"), list): + return [(None, {"phase": "active", "repos": data["active"]})] + return [ + (str(name), block) + for name, block in data.items() + if isinstance(block, dict) and ("phase" in block or "repos" in block) + ] + + +def _domain_phase(block: dict[str, Any]) -> str: + return str(block.get("phase") or block.get("status") or "").lower() + + +def _repo_items(block: dict[str, Any]) -> list[Any]: + repos = ( + block.get("repos") + or block.get("repo_slugs") + or block.get("repositories") + or block.get("slugs") + or [] + ) + if isinstance(repos, dict): + items: list[Any] = [] + for slug, config in repos.items(): + if isinstance(config, dict): + item = dict(config) + item.setdefault("slug", slug) + items.append(item) + else: + items.append(str(slug)) + return items + if isinstance(repos, list): + return repos + return [] + + +def _entry_from_item( + item: Any, + domain: str | None, + block: dict[str, Any], +) -> RosterEntry | None: + publish_check = block.get("publish_check") + if isinstance(item, str): + slug = item + elif isinstance(item, dict): + slug = item.get("slug") or item.get("repo") or item.get("name") + publish_check = item.get("publish_check", publish_check) + else: + return None + if not slug: + return None + return RosterEntry( + slug=str(slug), + domain=domain or None, + publish_check=str(publish_check).lower() if publish_check is not None else None, + ) + + +def _select_round_robin_batch( + entries: list[RosterEntry], + batch_size: int, + state_path: Path, +) -> tuple[list[RosterEntry], int]: + if not entries: + return [], 0 + cursor = _read_round_robin_cursor(state_path) % len(entries) + size = min(batch_size, len(entries)) + selected = [entries[(cursor + offset) % len(entries)] for offset in range(size)] + next_cursor = (cursor + size) % len(entries) + return selected, next_cursor + + +def _read_round_robin_cursor(path: Path) -> int: + if not path.is_file(): + return 0 + try: + data = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return 0 + if not isinstance(data, dict): + return 0 + try: + return int(data.get("cursor", 0)) + except (TypeError, ValueError): + return 0 + + +def _write_round_robin_state( + path: Path, + cursor: int, + selected: list[RosterEntry], +) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "cursor": cursor, + "last_batch": [entry.slug for entry in selected], + "updated_at": datetime.now(timezone.utc).isoformat(), + } + path.write_text( + json.dumps(payload, indent=2, sort_keys=True) + "\n", + encoding="utf-8", + ) + + +def _enabled_signals(path: Path) -> set[str]: + if not path.is_file(): + return set(_KNOWN_SIGNALS) + data = yaml.safe_load(path.read_text(encoding="utf-8")) + node = data.get("signals") if isinstance(data, dict) else data + enabled: set[str] = set() + saw_known_signal = False + + if isinstance(node, dict): + for name, config in node.items(): + if str(name) not in _KNOWN_SIGNALS: + continue + saw_known_signal = True + if isinstance(config, dict) and config.get("enabled") is False: + continue + if config is False: + continue + enabled.add(str(name)) + elif isinstance(node, list): + for item in node: + if isinstance(item, str) and item in _KNOWN_SIGNALS: + saw_known_signal = True + enabled.add(item) + elif isinstance(item, dict): + name = item.get("id") or item.get("signal") or item.get("name") + if str(name) in _KNOWN_SIGNALS and item.get("enabled", True) is not False: + saw_known_signal = True + enabled.add(str(name)) + + return enabled if saw_known_signal else set(_KNOWN_SIGNALS) + + +def _resolve_repo_roots( + entries: list[RosterEntry], + runner_host: str, +) -> dict[str, Path]: + requested = {entry.slug for entry in entries} + roots: dict[str, Path] = {} + for repo in _fetch_repos(): + slug = str(repo.get("slug") or "") + if slug not in requested: + continue + raw = _repo_path_for_host(repo, runner_host) + if raw: + roots[slug] = Path(raw) + return roots + + +def _fetch_repos() -> list[dict[str, Any]]: + url = f"{_base_url()}/repos/" + try: + resp = httpx.get(url, timeout=_STATE_HUB_TIMEOUT_SECONDS) + resp.raise_for_status() + except httpx.HTTPError as exc: + raise RuntimeError(f"State Hub unreachable at {url}: {exc}") from exc + payload = resp.json() + if not isinstance(payload, list): + raise RuntimeError(f"State Hub /repos/ returned non-list: {type(payload)!r}") + return [repo for repo in payload if isinstance(repo, dict)] + + +def _repo_path_for_host(repo: dict[str, Any], runner_host: str) -> str | None: + host_paths = repo.get("host_paths") or {} + raw = None + if isinstance(host_paths, dict): + raw = host_paths.get(runner_host) + raw = raw or repo.get("local_path") + if not raw or raw == "(unknown)": + return None + return str(raw) + + +def _reuse_surface_report(params: dict[str, Any], signals: set[str]) -> dict[str, Any]: + if not (signals & {"registry_gap", "empty_capability_scaffold"}): + return {} + binary = str(params.get("reuse_surface_bin") or "reuse-surface") + try: + completed = subprocess.run( + [binary, "report", "gaps", "--format", "json"], + capture_output=True, + check=False, + text=True, + timeout=_REPORT_TIMEOUT_SECONDS, + ) + except FileNotFoundError as exc: + raise RuntimeError(f"reuse-surface CLI not found: {binary}") from exc + except subprocess.TimeoutExpired as exc: + raise RuntimeError("reuse-surface report gaps timed out") from exc + + if completed.returncode != 0: + detail = completed.stderr.strip() or completed.stdout.strip() + raise RuntimeError(f"reuse-surface report gaps failed: {detail}") + try: + payload = json.loads(completed.stdout or "{}") + except json.JSONDecodeError as exc: + raise RuntimeError("reuse-surface report gaps returned invalid JSON") from exc + if not isinstance(payload, dict): + raise RuntimeError("reuse-surface report gaps returned non-object JSON") + return payload + + +def _gap_records( + entries: list[RosterEntry], + roots: dict[str, Path], + signals: set[str], + report: dict[str, Any], +) -> list[dict[str, Any]]: + empty_scaffolds = _repo_set(report, {"empty_scaffolds", "empty_scaffold"}) + publish_fail = _repo_set( + report, + {"publish_fail", "publish_fails", "publish_failures"}, + ) + gaps: list[dict[str, Any]] = [] + seen: set[tuple[str, str]] = set() + + for entry in entries: + root = roots.get(entry.slug) + if root is None: + logger.info("reuse_surface repo_unreachable slug=%s", entry.slug) + continue + + if ( + signals & {"registry_gap", "empty_capability_scaffold"} + and entry.slug in empty_scaffolds + ): + _append_gap(gaps, seen, entry.slug, root, "empty_capability_scaffold") + + if "registry_gap" in signals and entry.slug in publish_fail: + _append_gap(gaps, seen, entry.slug, root, "registry_gap") + + if "publish_check_fail" in signals and entry.publish_check == "fail": + _append_gap(gaps, seen, entry.slug, root, "publish_check_fail") + + if "stale_scope" in signals and _scope_is_stale(root): + _append_gap(gaps, seen, entry.slug, root, "stale_scope") + + if "stale_sbom" in signals and _sbom_is_stale(entry.slug): + _append_gap(gaps, seen, entry.slug, root, "stale_sbom") + + return gaps + + +def _append_gap( + gaps: list[dict[str, Any]], + seen: set[tuple[str, str]], + slug: str, + root: Path, + signal: str, +) -> None: + key = (slug, signal) + if key in seen: + return + seen.add(key) + gaps.append( + { + "repo": slug, + "root": str(root), + "signal": signal, + "hygiene_signal": signal, + } + ) + + +def _scope_is_stale(root: Path) -> bool: + scope = root / "SCOPE.md" + if not scope.is_file(): + return True + age_seconds = datetime.now(timezone.utc).timestamp() - scope.stat().st_mtime + return age_seconds > 90 * 24 * 60 * 60 + + +def _sbom_is_stale(slug: str) -> bool: + payload = StateHubContextResolver().resolve( + "repo_sbom_status", + None, + {"repo_slug": slug}, + ) + if not isinstance(payload, dict): + return False + try: + return int(payload.get("sbom_age_days", 0)) > 30 + except (TypeError, ValueError): + return False + + +def _repo_set(report: dict[str, Any], keys: set[str]) -> set[str]: + slugs: set[str] = set() + for value in _values_for_keys(report, keys): + slugs.update(_slugs_from_value(value)) + return slugs + + +def _values_for_keys(value: Any, keys: set[str]) -> list[Any]: + values: list[Any] = [] + if isinstance(value, dict): + for key, nested in value.items(): + if key in keys: + values.append(nested) + values.extend(_values_for_keys(nested, keys)) + elif isinstance(value, list): + for item in value: + values.extend(_values_for_keys(item, keys)) + return values + + +def _slugs_from_value(value: Any) -> set[str]: + if isinstance(value, str): + return {value} + if isinstance(value, list): + slugs: set[str] = set() + for item in value: + slugs.update(_slugs_from_value(item)) + return slugs + if isinstance(value, dict): + for key in ("repo", "repo_slug", "slug", "name"): + if value.get(key): + return {str(value[key])} + slugs: set[str] = set() + for key, nested in value.items(): + if nested is True or isinstance(nested, (dict, list)): + slugs.add(str(key)) + slugs.update(_slugs_from_value(nested)) + return slugs + return set() + + +class ReuseSurfaceContextResolver(ContextResolver): + """Resolves reuse-surface registry hygiene gap reports.""" + + def resolve(self, query: str, event: Any, params: dict[str, Any]) -> dict[str, Any]: + if query == "reuse_surface_report_gaps": + return reuse_surface_report_gaps(params) + return {} + + +class ShellContextResolver(ContextResolver): + """Dispatch shell-backed context queries without breaking kaizen aliases.""" + + def resolve(self, query: str, event: Any, params: dict[str, Any]) -> dict[str, Any]: + if query == "reuse_surface_report_gaps": + return reuse_surface_report_gaps(params) + return KaizenContextResolver().resolve(query, event, params) + + +CONTEXT_RESOLVER_REGISTRY["reuse-surface"] = ReuseSurfaceContextResolver +CONTEXT_RESOLVER_REGISTRY["shell"] = ShellContextResolver diff --git a/tests/rules/test_actions.py b/tests/rules/test_actions.py index c4b48e0..010e309 100644 --- a/tests/rules/test_actions.py +++ b/tests/rules/test_actions.py @@ -88,6 +88,43 @@ def test_for_each_binds_each_list_item_before_condition_and_action_rendering() - ] +def test_for_each_can_gate_registry_hygiene_gaps_on_signal() -> None: + rules = [ + { + "id": "flag-registry-hygiene-gap", + "for_each": "context.gaps", + "bind_as": "g", + "condition": 'context.g.hygiene_signal != ""', + "action": { + "task_template": "Close registry hygiene gap for {context.g.repo}", + "target_repo": "context.g.repo", + "priority": "medium", + "labels": ["registry-hygiene", "{context.g.hygiene_signal}"], + }, + } + ] + context = { + "gaps": [ + { + "repo": "reuse-surface", + "hygiene_signal": "empty_capability_scaffold", + }, + { + "repo": "activity-core", + "hygiene_signal": "", + }, + ] + } + + specs = expand_rule_actions(rules, _Event(), context) + + assert [spec["target_repo"] for spec in specs] == ["reuse-surface"] + assert specs[0]["labels"] == [ + "registry-hygiene", + "empty_capability_scaffold", + ] + + def test_for_each_rejects_non_path_expression() -> None: rules = [ { diff --git a/tests/test_reuse_surface_context_resolver.py b/tests/test_reuse_surface_context_resolver.py new file mode 100644 index 0000000..2d03af6 --- /dev/null +++ b/tests/test_reuse_surface_context_resolver.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import pytest +from temporalio.exceptions import ApplicationError + +from activity_core.activities import resolve_context +from activity_core.context_resolvers import reuse_surface +from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY + + +class _Response: + def __init__(self, payload: Any) -> None: + self._payload = payload + + def raise_for_status(self) -> None: + return None + + def json(self) -> Any: + return self._payload + + +class _Completed: + returncode = 0 + stderr = "" + + def __init__(self, payload: dict[str, Any]) -> None: + self.stdout = json.dumps(payload) + + +def _write_rollout(path: Path) -> None: + path.write_text( + """ +domains: + reuse: + phase: active + repos: + - reuse-surface + - activity-core + parked: + phase: backlog + repos: + - ignored-repo +""".lstrip(), + encoding="utf-8", + ) + + +def _write_cli_only_signals(path: Path) -> None: + path.write_text( + """ +signals: + empty_capability_scaffold: + enabled: true + registry_gap: + enabled: false + stale_scope: + enabled: false + stale_sbom: + enabled: false + publish_check_fail: + enabled: false +""".lstrip(), + encoding="utf-8", + ) + + +def test_shell_resolver_emits_reuse_surface_gaps_and_advances_cursor( + tmp_path, + monkeypatch, +) -> None: + rollout = tmp_path / "rollout.yaml" + _write_rollout(rollout) + _write_cli_only_signals(tmp_path / "signals.yml") + reuse_root = tmp_path / "reuse-surface" + reuse_root.mkdir() + (reuse_root / "SCOPE.md").write_text("fresh\n", encoding="utf-8") + activity_root = tmp_path / "activity-core" + activity_root.mkdir() + + monkeypatch.setenv("KAIZEN_RUNNER_HOST", "runner") + + def fake_get(url: str, **kwargs: Any) -> _Response: + assert url.endswith("/repos/") + return _Response( + [ + { + "slug": "reuse-surface", + "host_paths": {"runner": str(reuse_root)}, + }, + { + "slug": "activity-core", + "host_paths": {"runner": str(activity_root)}, + }, + ] + ) + + def fake_run(cmd: list[str], **kwargs: Any) -> _Completed: + assert cmd == ["reuse-surface", "report", "gaps", "--format", "json"] + return _Completed({"empty_scaffolds": ["reuse-surface"]}) + + monkeypatch.setattr(reuse_surface.httpx, "get", fake_get) + monkeypatch.setattr(reuse_surface.subprocess, "run", fake_run) + + import activity_core.context_resolvers # noqa: F401 + + result = CONTEXT_RESOLVER_REGISTRY["shell"]().resolve( + "reuse_surface_report_gaps", + None, + { + "roster": str(rollout), + "batch_size": 1, + }, + ) + + assert result == { + "gaps": [ + { + "repo": "reuse-surface", + "root": str(reuse_root), + "signal": "empty_capability_scaffold", + "hygiene_signal": "empty_capability_scaffold", + } + ] + } + state = json.loads((tmp_path / "round-robin-state.json").read_text(encoding="utf-8")) + assert state["cursor"] == 1 + assert state["last_batch"] == ["reuse-surface"] + + +def test_shell_resolver_keeps_kaizen_fallback_for_existing_queries() -> None: + assert CONTEXT_RESOLVER_REGISTRY["shell"]().resolve("unknown_query", None, {}) == {} + + +@pytest.mark.asyncio +async def test_optional_reuse_surface_missing_roster_binds_empty_list(tmp_path) -> None: + snapshot = await resolve_context( + [ + { + "type": "shell", + "query": "reuse_surface_report_gaps", + "params": {"roster": str(tmp_path / "missing.yaml")}, + "bind_to": "context.gaps", + } + ] + ) + + assert snapshot == {"gaps": []} + + +@pytest.mark.asyncio +async def test_required_reuse_surface_missing_roster_fails_visibly(tmp_path) -> None: + with pytest.raises(ApplicationError, match="Required context resolver"): + await resolve_context( + [ + { + "type": "shell", + "query": "reuse_surface_report_gaps", + "params": {"roster": str(tmp_path / "missing.yaml")}, + "bind_to": "context.gaps", + "required": True, + } + ] + ) diff --git a/workplans/ACTIVITY-WP-0013-reuse-surface-report-gaps-resolver.md b/workplans/ACTIVITY-WP-0013-reuse-surface-report-gaps-resolver.md new file mode 100644 index 0000000..bb2f369 --- /dev/null +++ b/workplans/ACTIVITY-WP-0013-reuse-surface-report-gaps-resolver.md @@ -0,0 +1,78 @@ +--- +id: ACTIVITY-WP-0013 +type: workplan +title: "Reuse Surface Report Gaps Resolver" +domain: custodian +repo: activity-core +status: finished +owner: codex +topic_slug: activity-core +created: "2026-06-18" +updated: "2026-06-18" +state_hub_workstream_id: "01e68dfd-b146-4aef-a575-2d3b178ca5c2" +--- + +# Reuse Surface Report Gaps Resolver + +Implement the R2 handoff from kaizen-agentic (`bffa224c`) so the +`reuse_surface_report_gaps` shell context source populates +`context.gaps` for the Coulomb daily registry hygiene sweep. + +## Register Shell Resolver Query + +```task +id: ACTIVITY-WP-0013-T01 +status: done +priority: high +state_hub_task_id: "a6e1fc5c-7b42-436d-914e-4d605cb6f329" +``` + +Add a dedicated reuse-surface context resolver module and register +`reuse_surface_report_gaps` on the `shell` resolver path while preserving +the existing kaizen shell query behavior. + +## Implement Batch And Signal Semantics + +```task +id: ACTIVITY-WP-0013-T02 +status: done +priority: high +state_hub_task_id: "229cf285-8388-471d-95fd-08400db1553e" +``` + +Load the Coulomb rollout roster, select active repos with a persisted +round-robin cursor, resolve repo roots from State Hub host paths, run +`reuse-surface report gaps --format json`, and emit gap records for the +enabled registry hygiene signals. + +## Cover Required And Optional Failure Modes + +```task +id: ACTIVITY-WP-0013-T03 +status: done +priority: high +state_hub_task_id: "85b5c7d4-40e1-4945-8ada-1dff2363c194" +``` + +Ensure missing required dependencies fail visibly while optional resolver +sources bind an empty `context.gaps` list. Add unit coverage for fixture +rollout data, mocked CLI JSON, resolver binding, and `hygiene_signal` +rule gating. + +## Smoke Real Coulomb Rollout + +```task +id: ACTIVITY-WP-0013-T04 +status: done +priority: medium +state_hub_task_id: "6a5446ed-b4ec-4693-b508-65415571d834" +``` + +Run a live resolver smoke against +`/home/worsch/coulomb-loop/loops/registry-hygiene/rollout.yaml` using a +temporary round-robin cursor. The real active rollout produced five gaps, +including one for `reuse-surface` with `hygiene_signal: stale_sbom`. +The smoke supplied `reuse_surface_bin: +/home/worsch/reuse-surface/.venv/bin/reuse-surface` and +`runner_host: bnt-lap001`; the worker environment or definition params must +provide equivalent values before enabling the production sweep.