from __future__ import annotations import json from pathlib import Path from typing import Any import pytest from temporalio.exceptions import ApplicationError from activity_core.activities import resolve_context from activity_core.context_resolvers import reuse_surface from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY class _Response: def __init__(self, payload: Any) -> None: self._payload = payload def raise_for_status(self) -> None: return None def json(self) -> Any: return self._payload class _Completed: returncode = 0 stderr = "" def __init__(self, payload: dict[str, Any]) -> None: self.stdout = json.dumps(payload) def _write_rollout(path: Path) -> None: path.write_text( """ domains: reuse: phase: active repos: - reuse-surface - activity-core parked: phase: backlog repos: - ignored-repo """.lstrip(), encoding="utf-8", ) def _write_cli_only_signals(path: Path) -> None: path.write_text( """ signals: empty_capability_scaffold: enabled: true registry_gap: enabled: false stale_scope: enabled: false stale_sbom: enabled: false publish_check_fail: enabled: false """.lstrip(), encoding="utf-8", ) def test_shell_resolver_emits_reuse_surface_gaps_and_advances_cursor( tmp_path, monkeypatch, ) -> None: rollout = tmp_path / "rollout.yaml" _write_rollout(rollout) _write_cli_only_signals(tmp_path / "signals.yml") reuse_root = tmp_path / "reuse-surface" reuse_root.mkdir() (reuse_root / "SCOPE.md").write_text("fresh\n", encoding="utf-8") activity_root = tmp_path / "activity-core" activity_root.mkdir() monkeypatch.setenv("KAIZEN_RUNNER_HOST", "runner") def fake_get(url: str, **kwargs: Any) -> _Response: assert url.endswith("/repos/") return _Response( [ { "slug": "reuse-surface", "host_paths": {"runner": str(reuse_root)}, }, { "slug": "activity-core", "host_paths": {"runner": str(activity_root)}, }, ] ) def fake_run(cmd: list[str], **kwargs: Any) -> _Completed: assert cmd == ["reuse-surface", "report", "gaps", "--format", "json"] return _Completed({"empty_scaffolds": ["reuse-surface"]}) monkeypatch.setattr(reuse_surface.httpx, "get", fake_get) monkeypatch.setattr(reuse_surface.subprocess, "run", fake_run) import activity_core.context_resolvers # noqa: F401 result = CONTEXT_RESOLVER_REGISTRY["shell"]().resolve( "reuse_surface_report_gaps", None, { "roster": str(rollout), "batch_size": 1, }, ) assert result == { "gaps": [ { "repo": "reuse-surface", "root": str(reuse_root), "signal": "empty_capability_scaffold", "hygiene_signal": "empty_capability_scaffold", } ] } state = json.loads((tmp_path / "round-robin-state.json").read_text(encoding="utf-8")) assert state["cursor"] == 1 assert state["last_batch"] == ["reuse-surface"] def test_shell_resolver_keeps_kaizen_fallback_for_existing_queries() -> None: assert CONTEXT_RESOLVER_REGISTRY["shell"]().resolve("unknown_query", None, {}) == {} @pytest.mark.asyncio async def test_optional_reuse_surface_missing_roster_binds_empty_list(tmp_path) -> None: snapshot = await resolve_context( [ { "type": "shell", "query": "reuse_surface_report_gaps", "params": {"roster": str(tmp_path / "missing.yaml")}, "bind_to": "context.gaps", } ] ) assert snapshot == {"gaps": []} @pytest.mark.asyncio async def test_required_reuse_surface_missing_roster_fails_visibly(tmp_path) -> None: with pytest.raises(ApplicationError, match="Required context resolver"): await resolve_context( [ { "type": "shell", "query": "reuse_surface_report_gaps", "params": {"roster": str(tmp_path / "missing.yaml")}, "bind_to": "context.gaps", "required": True, } ] )