From 517bf9c13390887975df5c670828bbf6e6b215ec Mon Sep 17 00:00:00 2001 From: tegwick Date: Thu, 18 Jun 2026 07:46:46 +0200 Subject: [PATCH] Add kaizen context resolver for scheduled agent fleet discovery. Implement discover_kaizen_scheduled_repos and discover_kaizen_projects per kaizen-agentic ADR-005 contract: State Hub roster, roster.yaml filter, schedule validation, and prepare_command emission. Register kaizen/resolver/shell source types with unit tests and runbook dry-run instructions. --- .env.example | 3 +- docs/runbook.md | 59 +++- pyproject.toml | 1 + .../context_resolvers/__init__.py | 2 +- src/activity_core/context_resolvers/kaizen.py | 305 ++++++++++++++++++ tests/test_kaizen_context_resolver.py | 195 +++++++++++ 6 files changed, 560 insertions(+), 5 deletions(-) create mode 100644 src/activity_core/context_resolvers/kaizen.py create mode 100644 tests/test_kaizen_context_resolver.py diff --git a/.env.example b/.env.example index 100dc2c..f58b2d4 100644 --- a/.env.example +++ b/.env.example @@ -25,7 +25,8 @@ ISSUE_SINK_TYPE=rest # ── Activity definitions ─────────────────────────────────────────────────────── # Colon-separated paths to additional activity-definitions/ directories. # The local activity-definitions/ directory is always scanned. -ACTIVITY_DEFINITION_DIRS= +# Coulomb-loop kaizen engagement definitions (colon-separated for more roots). +ACTIVITY_DEFINITION_DIRS=/home/worsch/coulomb-loop # ── Observability ───────────────────────────────────────────────────────────── # Prometheus metrics bind address (Temporal SDK metrics). diff --git a/docs/runbook.md b/docs/runbook.md index 0e4b0f4..3617bd0 100644 --- a/docs/runbook.md +++ b/docs/runbook.md @@ -159,14 +159,34 @@ repos, and emits one automated task per stale repo through explicit `weekly-coding-retro` follows the same cron -> context resolver -> per-repo task pattern for coding-session retrospection. It runs Saturdays at 19:00 Europe/Berlin and resolves the latest State Hub `/progress/` item with -`event_type=coding_retro` into `context.retro.suggestions`. Each positive-score -suggestion emits one task to `context.s.repo` with labels -`coding-retro`, `improvement`, and `automated`. +`event_type=coding_retro` and a matching `window_days` into +`context.retro.suggestions`. Each positive-score suggestion emits one task to +`context.s.repo` with labels `coding-retro`, `improvement`, and `automated`. +The weekly schedule intentionally ignores broader retro windows such as 30-day +catch-up reports. Keep `weekly-coding-retro` disabled until Helix Forge publishes the `coding_retro` read model and a smoke run confirms the resolver returns a non-empty suggestion set with no duplicate target tasks on re-run. +## Ops inventory evidence posture + +The current accepted live backend for activity-core ops inventory probes is +State Hub progress with `event_type=ops_inventory_probe`. + +Inter-Hub / ops-hub per-entity submission remains intentionally deferred until +all of these are true: + +- `OPS_HUB_KEY` is provisioned through an operator-owned secret path, never Git, + chat, or State Hub detail. +- Widget or capability mapping is configured for the target ops-hub entities. +- Production Inter-Hub intake is deployed and smoke-tested for the relevant + authenticated routes. + +Until then, missing Inter-Hub configuration should produce an explicit skipped +sink result, not a failed probe. This posture was recorded in State Hub decision +`7c235bbb-ee6f-4c3e-b1dd-74717eac9082`. + --- ## Temporal UI — filtering by activity @@ -342,6 +362,14 @@ uv run alembic history # show full migration history ## Railiance Deployment +### Production API access posture + +The FastAPI admin surface remains ClusterIP-only in production. Do not publish +it through an external ingress until a separate access-policy work item chooses +the hostname, authentication layer, allowed users/agents, and audit +expectations. This posture was recorded in State Hub decision +`9ffaf7a9-227a-4e39-92e3-cd93d8cda1f2`. + ### Pre-requisites - Docker ≥ 24 with Compose v2 (`docker compose` not `docker-compose`) - ≥ 4 GB RAM available (Temporal server takes ~1 GB) @@ -412,6 +440,31 @@ make railiance-up --- +## Kaizen fleet resolver (coulomb-loop) + +Dry-run scheduled agent discovery against State Hub + pilot roster: + +```bash +export STATE_HUB_URL=http://127.0.0.1:8000 +export KAIZEN_RUNNER_HOST=$(hostname) +export ACTIVITY_DEFINITION_DIRS=/home/worsch/coulomb-loop + +uv run python -c " +from activity_core.context_resolvers.kaizen import discover_kaizen_scheduled_repos +print(discover_kaizen_scheduled_repos({ + 'roster': '/home/worsch/coulomb-loop/loops/kaizen-stack/roster.yaml', + 'cadence': 'daily', +})) +" + +make sync-activity-definitions # requires ACTCORE_DB_URL + stack up +``` + +Source types: `kaizen`, `resolver`, or `shell` (alias). Queries: +`discover_kaizen_scheduled_repos`, `discover_kaizen_projects`. + +--- + ## Wipe and restart dev stack ```bash diff --git a/pyproject.toml b/pyproject.toml index 0731351..3dc11ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ dependencies = [ "alembic>=1.14", "nats-py>=2.7", "httpx>=0.27", + "pyyaml>=6.0", ] [project.optional-dependencies] diff --git a/src/activity_core/context_resolvers/__init__.py b/src/activity_core/context_resolvers/__init__.py index d76fb80..002d059 100644 --- a/src/activity_core/context_resolvers/__init__.py +++ b/src/activity_core/context_resolvers/__init__.py @@ -1 +1 @@ -from activity_core.context_resolvers import ops_inventory, repo_scoping, state_hub # noqa: F401 +from activity_core.context_resolvers import kaizen, ops_inventory, repo_scoping, state_hub # noqa: F401 diff --git a/src/activity_core/context_resolvers/kaizen.py b/src/activity_core/context_resolvers/kaizen.py new file mode 100644 index 0000000..6d09959 --- /dev/null +++ b/src/activity_core/context_resolvers/kaizen.py @@ -0,0 +1,305 @@ +"""Kaizen-agentic fleet context adapter. + +Registered as source types ``kaizen`` and ``resolver`` (alias for ADR-005 drafts). + +Supported queries: + - discover_kaizen_scheduled_repos: hub roster ∩ valid ``.kaizen/schedule.yml`` + - discover_kaizen_projects: repos with ``.kaizen/metrics`` marker (+ optional roster) + +Contract: kaizen-agentic ``docs/integrations/discover-kaizen-scheduled-repos.md`` +""" + +from __future__ import annotations + +import json +import logging +import os +import socket +from pathlib import Path +from typing import Any + +import httpx +import yaml + +from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, ContextResolver + +logger = logging.getLogger(__name__) + +_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" +_TIMEOUT_SECONDS = 10.0 +_SCHEDULE_VERSION = "1" +_VALID_CADENCES = frozenset({"daily", "weekly", "monthly"}) +_PREPARE_BIN = os.environ.get("KAIZEN_AGENTIC_BIN", "kaizen-agentic") + + +def _base_url() -> str: + return os.environ.get("STATE_HUB_URL", _DEFAULT_STATE_HUB_URL).rstrip("/") + + +def _runner_host() -> str: + return os.environ.get("KAIZEN_RUNNER_HOST", socket.gethostname()) + + +def _fetch_repos(domain: str | None) -> list[dict[str, Any]]: + url = f"{_base_url()}/repos/" + try: + resp = httpx.get(url, timeout=_TIMEOUT_SECONDS) + resp.raise_for_status() + except httpx.HTTPError as exc: + raise RuntimeError(f"State Hub unreachable at {url}: {exc}") from exc + payload = resp.json() + if not isinstance(payload, list): + raise RuntimeError(f"State Hub /repos/ returned non-list: {type(payload)!r}") + if domain: + payload = [r for r in payload if r.get("domain_slug") == domain] + return payload + + +def _repo_root(repo: dict[str, Any]) -> Path | None: + host_paths = repo.get("host_paths") or {} + host = _runner_host() + raw = host_paths.get(host) or repo.get("local_path") + if not raw or raw == "(unknown)": + return None + path = Path(raw) + return path if path.is_dir() else None + + +def _load_roster(params: dict[str, Any]) -> dict[str, dict[str, Any]] | None: + """Return slug -> roster entry for active repos, or None if no roster param.""" + roster_path = params.get("roster") + if not roster_path: + return None + path = Path(roster_path) + if not path.is_file(): + logger.warning("kaizen roster file not found: %s", path) + return {} + data = yaml.safe_load(path.read_text(encoding="utf-8")) + if not isinstance(data, dict): + logger.warning("kaizen roster invalid (not a mapping): %s", path) + return {} + entries: dict[str, dict[str, Any]] = {} + for item in data.get("active") or []: + if isinstance(item, dict) and item.get("slug"): + slug = str(item["slug"]) + if item.get("status", "active") == "saturated": + continue + entries[slug] = item + return entries + + +def _validate_schedule_file(path: Path) -> list[str]: + """Structural validation aligned with kaizen-agentic schedule validate.""" + errors: list[str] = [] + try: + raw = yaml.safe_load(path.read_text(encoding="utf-8")) + except yaml.YAMLError as exc: + return [f"invalid YAML: {exc}"] + + if not isinstance(raw, dict): + return ["schedule.yml must be a YAML mapping at the top level"] + + version = raw.get("version") + if version is None: + errors.append("missing required key: version") + elif str(version) != _SCHEDULE_VERSION: + errors.append(f"unsupported version '{version}' (expected '{_SCHEDULE_VERSION}')") + + agents = raw.get("agents", {}) + if not isinstance(agents, dict): + errors.append("agents must be a mapping") + return errors + if not agents: + errors.append("no agents declared under 'agents:'") + + seen: set[str] = set() + for name, settings in agents.items(): + if settings is None: + settings = {} + if not isinstance(settings, dict): + errors.append(f"agent '{name}' settings must be a mapping") + continue + if name in seen: + errors.append(f"duplicate agent entry: {name}") + seen.add(name) + cadence = str(settings.get("cadence", "")) + if cadence not in _VALID_CADENCES: + errors.append( + f"agent '{name}': invalid cadence '{cadence}' " + f"(expected one of {', '.join(sorted(_VALID_CADENCES))})" + ) + cron = settings.get("cron") + if cron is not None and not isinstance(cron, str): + errors.append(f"agent '{name}' cron must be a string") + + return errors + + +def _parse_schedule(path: Path) -> dict[str, Any] | None: + errors = _validate_schedule_file(path) + if errors: + return None + raw = yaml.safe_load(path.read_text(encoding="utf-8")) + return raw if isinstance(raw, dict) else None + + +def _prepare_command(agent: str, root: Path) -> str: + return f"{_PREPARE_BIN} schedule prepare {agent} --target {root}" + + +def discover_kaizen_scheduled_repos(params: dict[str, Any]) -> dict[str, Any]: + domain = params.get("domain") + cadence_filter = params.get("cadence") + roster = _load_roster(params) + runs: list[dict[str, Any]] = [] + + for repo in _fetch_repos(domain): + slug = repo.get("slug", "") + if not slug: + continue + if roster is not None and slug not in roster: + continue + + root = _repo_root(repo) + if root is None: + logger.info("kaizen repo_unreachable slug=%s host=%s", slug, _runner_host()) + continue + + schedule_path = root / ".kaizen" / "schedule.yml" + if not schedule_path.is_file(): + continue + + errors = _validate_schedule_file(schedule_path) + if errors: + logger.warning( + "kaizen schedule_invalid slug=%s path=%s errors=%s", + slug, + schedule_path, + "; ".join(errors), + ) + continue + + schedule = _parse_schedule(schedule_path) + if schedule is None: + continue + + timezone = schedule.get("timezone") or "Europe/Berlin" + roster_agents = roster.get(slug, {}).get("agents") if roster else None + agents = schedule.get("agents") or {} + + for agent_name, settings in agents.items(): + if not isinstance(settings, dict): + continue + if not bool(settings.get("enabled", True)): + continue + cadence = str(settings.get("cadence", "")) + if cadence_filter and cadence != cadence_filter: + continue + if roster_agents and agent_name not in roster_agents: + continue + cron = settings.get("cron") + runs.append( + { + "repo": slug, + "root": str(root), + "agent": agent_name, + "cadence": cadence, + "cron": cron, + "timezone": timezone, + "enabled": True, + "prepare_command": _prepare_command(agent_name, root), + } + ) + + return {"scheduled_runs": runs} + + +def _read_metrics_summary(metrics_dir: Path) -> dict[str, Any]: + summary_path = metrics_dir / "summary.json" + if not summary_path.is_file(): + return {} + try: + data = json.loads(summary_path.read_text(encoding="utf-8")) + return data if isinstance(data, dict) else {} + except (json.JSONDecodeError, OSError): + return {} + + +def discover_kaizen_projects(params: dict[str, Any]) -> dict[str, Any]: + """Discover repos with ``.kaizen/metrics`` (optional per-agent summaries).""" + domain = params.get("domain") + marker = params.get("marker", ".kaizen/metrics") + roster = _load_roster(params) + in_roster_key = "in_pilot_roster" + projects: list[dict[str, Any]] = [] + + for repo in _fetch_repos(domain): + slug = repo.get("slug", "") + if not slug: + continue + in_pilot = roster is None or slug in roster + if roster is not None and slug not in roster: + continue + + root = _repo_root(repo) + if root is None: + continue + + metrics_root = root / Path(marker) + if not metrics_root.is_dir(): + continue + + has_metrics = any(metrics_root.iterdir()) if metrics_root.is_dir() else False + if not has_metrics: + continue + + roster_entry = roster.get(slug, {}) if roster else {} + agent_filter = roster_entry.get("agents") + + for agent_dir in sorted(metrics_root.iterdir()): + if not agent_dir.is_dir() or agent_dir.name == "optimizer": + continue + agent = agent_dir.name + if agent_filter and agent not in agent_filter: + continue + summary = _read_metrics_summary(agent_dir) + projects.append( + { + "repo": slug, + "root": str(root), + "agent": agent, + "has_metrics": True, + in_roster_key: in_pilot, + "summary": summary, + } + ) + + if not any(p["repo"] == slug for p in projects): + projects.append( + { + "repo": slug, + "root": str(root), + "agent": None, + "has_metrics": has_metrics, + in_roster_key: in_pilot, + "summary": {}, + } + ) + + return {"projects": projects} + + +class KaizenContextResolver(ContextResolver): + """Resolves kaizen fleet scheduling and project metrics discovery.""" + + def resolve(self, query: str, event: Any, params: dict[str, Any]) -> dict[str, Any]: + if query == "discover_kaizen_scheduled_repos": + return discover_kaizen_scheduled_repos(params) + if query == "discover_kaizen_projects": + return discover_kaizen_projects(params) + return {} + + +CONTEXT_RESOLVER_REGISTRY["kaizen"] = KaizenContextResolver +CONTEXT_RESOLVER_REGISTRY["resolver"] = KaizenContextResolver +CONTEXT_RESOLVER_REGISTRY["shell"] = KaizenContextResolver \ No newline at end of file diff --git a/tests/test_kaizen_context_resolver.py b/tests/test_kaizen_context_resolver.py new file mode 100644 index 0000000..3f2d594 --- /dev/null +++ b/tests/test_kaizen_context_resolver.py @@ -0,0 +1,195 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import httpx +import pytest +import yaml + +from activity_core.context_resolvers.kaizen import ( + KaizenContextResolver, + discover_kaizen_scheduled_repos, +) + + +class DummyResponse: + def __init__(self, payload: Any, status_error: Exception | None = None) -> None: + self.payload = payload + self.status_error = status_error + + def raise_for_status(self) -> None: + if self.status_error is not None: + raise self.status_error + + def json(self) -> Any: + return self.payload + + +def _write_schedule(path: Path, agents: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text( + yaml.safe_dump( + {"version": "1", "timezone": "Europe/Berlin", "agents": agents}, + sort_keys=False, + ), + encoding="utf-8", + ) + + +def test_discover_scheduled_repos_emits_enabled_coach(tmp_path, monkeypatch) -> None: + repo_root = tmp_path / "pilot-repo" + repo_root.mkdir() + _write_schedule( + repo_root / ".kaizen" / "schedule.yml", + {"coach": {"cadence": "daily", "cron": "15 * * * *", "enabled": True}}, + ) + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + return DummyResponse( + [ + { + "slug": "pilot-repo", + "domain_slug": "custodian", + "host_paths": {"testhost": str(repo_root)}, + } + ] + ) + + monkeypatch.setenv("STATE_HUB_URL", "http://hub.test") + monkeypatch.setenv("KAIZEN_RUNNER_HOST", "testhost") + monkeypatch.setattr(httpx, "get", fake_get) + + result = discover_kaizen_scheduled_repos({}) + + assert len(result["scheduled_runs"]) == 1 + run = result["scheduled_runs"][0] + assert run["repo"] == "pilot-repo" + assert run["agent"] == "coach" + assert run["enabled"] is True + assert "schedule prepare coach" in run["prepare_command"] + + +def test_discover_scheduled_repos_skips_disabled_coach(tmp_path, monkeypatch) -> None: + repo_root = tmp_path / "pilot-repo" + repo_root.mkdir() + _write_schedule( + repo_root / ".kaizen" / "schedule.yml", + {"coach": {"cadence": "daily", "enabled": False}}, + ) + + monkeypatch.setenv("STATE_HUB_URL", "http://hub.test") + monkeypatch.setenv("KAIZEN_RUNNER_HOST", "testhost") + monkeypatch.setattr( + httpx, + "get", + lambda url, **kwargs: DummyResponse( + [{"slug": "pilot-repo", "host_paths": {"testhost": str(repo_root)}}] + ), + ) + + result = discover_kaizen_scheduled_repos({}) + assert result["scheduled_runs"] == [] + + +def test_discover_scheduled_repos_skips_missing_schedule(tmp_path, monkeypatch) -> None: + repo_root = tmp_path / "no-schedule" + repo_root.mkdir() + + monkeypatch.setenv("STATE_HUB_URL", "http://hub.test") + monkeypatch.setenv("KAIZEN_RUNNER_HOST", "testhost") + monkeypatch.setattr( + httpx, + "get", + lambda url, **kwargs: DummyResponse( + [{"slug": "no-schedule", "host_paths": {"testhost": str(repo_root)}}] + ), + ) + + result = discover_kaizen_scheduled_repos({}) + assert result["scheduled_runs"] == [] + + +def test_discover_scheduled_repos_skips_invalid_schedule(tmp_path, monkeypatch) -> None: + repo_root = tmp_path / "bad-schedule" + schedule = repo_root / ".kaizen" / "schedule.yml" + schedule.parent.mkdir(parents=True) + schedule.write_text("version: '2'\nagents: {}\n", encoding="utf-8") + + monkeypatch.setenv("STATE_HUB_URL", "http://hub.test") + monkeypatch.setenv("KAIZEN_RUNNER_HOST", "testhost") + monkeypatch.setattr( + httpx, + "get", + lambda url, **kwargs: DummyResponse( + [{"slug": "bad-schedule", "host_paths": {"testhost": str(repo_root)}}] + ), + ) + + result = discover_kaizen_scheduled_repos({}) + assert result["scheduled_runs"] == [] + + +def test_discover_scheduled_repos_filters_by_roster_and_cadence( + tmp_path, monkeypatch +) -> None: + repo_a = tmp_path / "kaizen-agentic" + repo_b = tmp_path / "other-repo" + for root in (repo_a, repo_b): + _write_schedule( + root / ".kaizen" / "schedule.yml", + { + "coach": {"cadence": "daily", "enabled": True}, + "optimization": {"cadence": "weekly", "enabled": True}, + }, + ) + + roster = tmp_path / "roster.yaml" + roster.write_text( + yaml.safe_dump( + { + "active": [ + {"slug": "kaizen-agentic", "agents": ["coach"], "status": "active"} + ] + } + ), + encoding="utf-8", + ) + + monkeypatch.setenv("STATE_HUB_URL", "http://hub.test") + monkeypatch.setenv("KAIZEN_RUNNER_HOST", "testhost") + monkeypatch.setattr( + httpx, + "get", + lambda url, **kwargs: DummyResponse( + [ + {"slug": "kaizen-agentic", "host_paths": {"testhost": str(repo_a)}}, + {"slug": "other-repo", "host_paths": {"testhost": str(repo_b)}}, + ] + ), + ) + + result = discover_kaizen_scheduled_repos( + {"roster": str(roster), "cadence": "daily"} + ) + agents = {r["agent"] for r in result["scheduled_runs"]} + repos = {r["repo"] for r in result["scheduled_runs"]} + assert repos == {"kaizen-agentic"} + assert agents == {"coach"} + + +def test_hub_unreachable_raises(monkeypatch) -> None: + monkeypatch.setenv("STATE_HUB_URL", "http://hub.test") + + def fail_get(url: str, **kwargs: Any) -> DummyResponse: + raise httpx.ConnectError("down") + + monkeypatch.setattr(httpx, "get", fail_get) + + with pytest.raises(RuntimeError, match="State Hub unreachable"): + discover_kaizen_scheduled_repos({}) + + +def test_resolver_registry_alias() -> None: + resolver = KaizenContextResolver() + assert resolver.resolve("unknown_query", None, {}) == {} \ No newline at end of file