Add kaizen context resolver for scheduled agent fleet discovery.

Implement discover_kaizen_scheduled_repos and discover_kaizen_projects per
kaizen-agentic ADR-005 contract: State Hub roster, roster.yaml filter, schedule
validation, and prepare_command emission. Register kaizen/resolver/shell source
types with unit tests and runbook dry-run instructions.
This commit is contained in:
2026-06-18 07:46:46 +02:00
parent 29bf87a44c
commit 517bf9c133
6 changed files with 560 additions and 5 deletions

View File

@@ -25,7 +25,8 @@ ISSUE_SINK_TYPE=rest
# ── Activity definitions ───────────────────────────────────────────────────────
# Colon-separated paths to additional activity-definitions/ directories.
# The local activity-definitions/ directory is always scanned.
ACTIVITY_DEFINITION_DIRS=
# Coulomb-loop kaizen engagement definitions (colon-separated for more roots).
ACTIVITY_DEFINITION_DIRS=/home/worsch/coulomb-loop
# ── Observability ─────────────────────────────────────────────────────────────
# Prometheus metrics bind address (Temporal SDK metrics).

View File

@@ -159,14 +159,34 @@ repos, and emits one automated task per stale repo through explicit
`weekly-coding-retro` follows the same cron -> context resolver -> per-repo task
pattern for coding-session retrospection. It runs Saturdays at 19:00
Europe/Berlin and resolves the latest State Hub `/progress/` item with
`event_type=coding_retro` into `context.retro.suggestions`. Each positive-score
suggestion emits one task to `context.s.repo` with labels
`coding-retro`, `improvement`, and `automated`.
`event_type=coding_retro` and a matching `window_days` into
`context.retro.suggestions`. Each positive-score suggestion emits one task to
`context.s.repo` with labels `coding-retro`, `improvement`, and `automated`.
The weekly schedule intentionally ignores broader retro windows such as 30-day
catch-up reports.
Keep `weekly-coding-retro` disabled until Helix Forge publishes the
`coding_retro` read model and a smoke run confirms the resolver returns a
non-empty suggestion set with no duplicate target tasks on re-run.
## Ops inventory evidence posture
The current accepted live backend for activity-core ops inventory probes is
State Hub progress with `event_type=ops_inventory_probe`.
Inter-Hub / ops-hub per-entity submission remains intentionally deferred until
all of these are true:
- `OPS_HUB_KEY` is provisioned through an operator-owned secret path, never Git,
chat, or State Hub detail.
- Widget or capability mapping is configured for the target ops-hub entities.
- Production Inter-Hub intake is deployed and smoke-tested for the relevant
authenticated routes.
Until then, missing Inter-Hub configuration should produce an explicit skipped
sink result, not a failed probe. This posture was recorded in State Hub decision
`7c235bbb-ee6f-4c3e-b1dd-74717eac9082`.
---
## Temporal UI — filtering by activity
@@ -342,6 +362,14 @@ uv run alembic history # show full migration history
## Railiance Deployment
### Production API access posture
The FastAPI admin surface remains ClusterIP-only in production. Do not publish
it through an external ingress until a separate access-policy work item chooses
the hostname, authentication layer, allowed users/agents, and audit
expectations. This posture was recorded in State Hub decision
`9ffaf7a9-227a-4e39-92e3-cd93d8cda1f2`.
### Pre-requisites
- Docker ≥ 24 with Compose v2 (`docker compose` not `docker-compose`)
- ≥ 4 GB RAM available (Temporal server takes ~1 GB)
@@ -412,6 +440,31 @@ make railiance-up
---
## Kaizen fleet resolver (coulomb-loop)
Dry-run scheduled agent discovery against State Hub + pilot roster:
```bash
export STATE_HUB_URL=http://127.0.0.1:8000
export KAIZEN_RUNNER_HOST=$(hostname)
export ACTIVITY_DEFINITION_DIRS=/home/worsch/coulomb-loop
uv run python -c "
from activity_core.context_resolvers.kaizen import discover_kaizen_scheduled_repos
print(discover_kaizen_scheduled_repos({
'roster': '/home/worsch/coulomb-loop/loops/kaizen-stack/roster.yaml',
'cadence': 'daily',
}))
"
make sync-activity-definitions # requires ACTCORE_DB_URL + stack up
```
Source types: `kaizen`, `resolver`, or `shell` (alias). Queries:
`discover_kaizen_scheduled_repos`, `discover_kaizen_projects`.
---
## Wipe and restart dev stack
```bash

View File

@@ -12,6 +12,7 @@ dependencies = [
"alembic>=1.14",
"nats-py>=2.7",
"httpx>=0.27",
"pyyaml>=6.0",
]
[project.optional-dependencies]

View File

@@ -1 +1 @@
from activity_core.context_resolvers import ops_inventory, repo_scoping, state_hub # noqa: F401
from activity_core.context_resolvers import kaizen, ops_inventory, repo_scoping, state_hub # noqa: F401

View File

@@ -0,0 +1,305 @@
"""Kaizen-agentic fleet context adapter.
Registered as source types ``kaizen`` and ``resolver`` (alias for ADR-005 drafts).
Supported queries:
- discover_kaizen_scheduled_repos: hub roster ∩ valid ``.kaizen/schedule.yml``
- discover_kaizen_projects: repos with ``.kaizen/metrics`` marker (+ optional roster)
Contract: kaizen-agentic ``docs/integrations/discover-kaizen-scheduled-repos.md``
"""
from __future__ import annotations
import json
import logging
import os
import socket
from pathlib import Path
from typing import Any
import httpx
import yaml
from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, ContextResolver
logger = logging.getLogger(__name__)
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
_TIMEOUT_SECONDS = 10.0
_SCHEDULE_VERSION = "1"
_VALID_CADENCES = frozenset({"daily", "weekly", "monthly"})
_PREPARE_BIN = os.environ.get("KAIZEN_AGENTIC_BIN", "kaizen-agentic")
def _base_url() -> str:
return os.environ.get("STATE_HUB_URL", _DEFAULT_STATE_HUB_URL).rstrip("/")
def _runner_host() -> str:
return os.environ.get("KAIZEN_RUNNER_HOST", socket.gethostname())
def _fetch_repos(domain: str | None) -> list[dict[str, Any]]:
url = f"{_base_url()}/repos/"
try:
resp = httpx.get(url, timeout=_TIMEOUT_SECONDS)
resp.raise_for_status()
except httpx.HTTPError as exc:
raise RuntimeError(f"State Hub unreachable at {url}: {exc}") from exc
payload = resp.json()
if not isinstance(payload, list):
raise RuntimeError(f"State Hub /repos/ returned non-list: {type(payload)!r}")
if domain:
payload = [r for r in payload if r.get("domain_slug") == domain]
return payload
def _repo_root(repo: dict[str, Any]) -> Path | None:
host_paths = repo.get("host_paths") or {}
host = _runner_host()
raw = host_paths.get(host) or repo.get("local_path")
if not raw or raw == "(unknown)":
return None
path = Path(raw)
return path if path.is_dir() else None
def _load_roster(params: dict[str, Any]) -> dict[str, dict[str, Any]] | None:
"""Return slug -> roster entry for active repos, or None if no roster param."""
roster_path = params.get("roster")
if not roster_path:
return None
path = Path(roster_path)
if not path.is_file():
logger.warning("kaizen roster file not found: %s", path)
return {}
data = yaml.safe_load(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
logger.warning("kaizen roster invalid (not a mapping): %s", path)
return {}
entries: dict[str, dict[str, Any]] = {}
for item in data.get("active") or []:
if isinstance(item, dict) and item.get("slug"):
slug = str(item["slug"])
if item.get("status", "active") == "saturated":
continue
entries[slug] = item
return entries
def _validate_schedule_file(path: Path) -> list[str]:
"""Structural validation aligned with kaizen-agentic schedule validate."""
errors: list[str] = []
try:
raw = yaml.safe_load(path.read_text(encoding="utf-8"))
except yaml.YAMLError as exc:
return [f"invalid YAML: {exc}"]
if not isinstance(raw, dict):
return ["schedule.yml must be a YAML mapping at the top level"]
version = raw.get("version")
if version is None:
errors.append("missing required key: version")
elif str(version) != _SCHEDULE_VERSION:
errors.append(f"unsupported version '{version}' (expected '{_SCHEDULE_VERSION}')")
agents = raw.get("agents", {})
if not isinstance(agents, dict):
errors.append("agents must be a mapping")
return errors
if not agents:
errors.append("no agents declared under 'agents:'")
seen: set[str] = set()
for name, settings in agents.items():
if settings is None:
settings = {}
if not isinstance(settings, dict):
errors.append(f"agent '{name}' settings must be a mapping")
continue
if name in seen:
errors.append(f"duplicate agent entry: {name}")
seen.add(name)
cadence = str(settings.get("cadence", ""))
if cadence not in _VALID_CADENCES:
errors.append(
f"agent '{name}': invalid cadence '{cadence}' "
f"(expected one of {', '.join(sorted(_VALID_CADENCES))})"
)
cron = settings.get("cron")
if cron is not None and not isinstance(cron, str):
errors.append(f"agent '{name}' cron must be a string")
return errors
def _parse_schedule(path: Path) -> dict[str, Any] | None:
errors = _validate_schedule_file(path)
if errors:
return None
raw = yaml.safe_load(path.read_text(encoding="utf-8"))
return raw if isinstance(raw, dict) else None
def _prepare_command(agent: str, root: Path) -> str:
return f"{_PREPARE_BIN} schedule prepare {agent} --target {root}"
def discover_kaizen_scheduled_repos(params: dict[str, Any]) -> dict[str, Any]:
domain = params.get("domain")
cadence_filter = params.get("cadence")
roster = _load_roster(params)
runs: list[dict[str, Any]] = []
for repo in _fetch_repos(domain):
slug = repo.get("slug", "")
if not slug:
continue
if roster is not None and slug not in roster:
continue
root = _repo_root(repo)
if root is None:
logger.info("kaizen repo_unreachable slug=%s host=%s", slug, _runner_host())
continue
schedule_path = root / ".kaizen" / "schedule.yml"
if not schedule_path.is_file():
continue
errors = _validate_schedule_file(schedule_path)
if errors:
logger.warning(
"kaizen schedule_invalid slug=%s path=%s errors=%s",
slug,
schedule_path,
"; ".join(errors),
)
continue
schedule = _parse_schedule(schedule_path)
if schedule is None:
continue
timezone = schedule.get("timezone") or "Europe/Berlin"
roster_agents = roster.get(slug, {}).get("agents") if roster else None
agents = schedule.get("agents") or {}
for agent_name, settings in agents.items():
if not isinstance(settings, dict):
continue
if not bool(settings.get("enabled", True)):
continue
cadence = str(settings.get("cadence", ""))
if cadence_filter and cadence != cadence_filter:
continue
if roster_agents and agent_name not in roster_agents:
continue
cron = settings.get("cron")
runs.append(
{
"repo": slug,
"root": str(root),
"agent": agent_name,
"cadence": cadence,
"cron": cron,
"timezone": timezone,
"enabled": True,
"prepare_command": _prepare_command(agent_name, root),
}
)
return {"scheduled_runs": runs}
def _read_metrics_summary(metrics_dir: Path) -> dict[str, Any]:
summary_path = metrics_dir / "summary.json"
if not summary_path.is_file():
return {}
try:
data = json.loads(summary_path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except (json.JSONDecodeError, OSError):
return {}
def discover_kaizen_projects(params: dict[str, Any]) -> dict[str, Any]:
"""Discover repos with ``.kaizen/metrics`` (optional per-agent summaries)."""
domain = params.get("domain")
marker = params.get("marker", ".kaizen/metrics")
roster = _load_roster(params)
in_roster_key = "in_pilot_roster"
projects: list[dict[str, Any]] = []
for repo in _fetch_repos(domain):
slug = repo.get("slug", "")
if not slug:
continue
in_pilot = roster is None or slug in roster
if roster is not None and slug not in roster:
continue
root = _repo_root(repo)
if root is None:
continue
metrics_root = root / Path(marker)
if not metrics_root.is_dir():
continue
has_metrics = any(metrics_root.iterdir()) if metrics_root.is_dir() else False
if not has_metrics:
continue
roster_entry = roster.get(slug, {}) if roster else {}
agent_filter = roster_entry.get("agents")
for agent_dir in sorted(metrics_root.iterdir()):
if not agent_dir.is_dir() or agent_dir.name == "optimizer":
continue
agent = agent_dir.name
if agent_filter and agent not in agent_filter:
continue
summary = _read_metrics_summary(agent_dir)
projects.append(
{
"repo": slug,
"root": str(root),
"agent": agent,
"has_metrics": True,
in_roster_key: in_pilot,
"summary": summary,
}
)
if not any(p["repo"] == slug for p in projects):
projects.append(
{
"repo": slug,
"root": str(root),
"agent": None,
"has_metrics": has_metrics,
in_roster_key: in_pilot,
"summary": {},
}
)
return {"projects": projects}
class KaizenContextResolver(ContextResolver):
"""Resolves kaizen fleet scheduling and project metrics discovery."""
def resolve(self, query: str, event: Any, params: dict[str, Any]) -> dict[str, Any]:
if query == "discover_kaizen_scheduled_repos":
return discover_kaizen_scheduled_repos(params)
if query == "discover_kaizen_projects":
return discover_kaizen_projects(params)
return {}
CONTEXT_RESOLVER_REGISTRY["kaizen"] = KaizenContextResolver
CONTEXT_RESOLVER_REGISTRY["resolver"] = KaizenContextResolver
CONTEXT_RESOLVER_REGISTRY["shell"] = KaizenContextResolver

View File

@@ -0,0 +1,195 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
import httpx
import pytest
import yaml
from activity_core.context_resolvers.kaizen import (
KaizenContextResolver,
discover_kaizen_scheduled_repos,
)
class DummyResponse:
def __init__(self, payload: Any, status_error: Exception | None = None) -> None:
self.payload = payload
self.status_error = status_error
def raise_for_status(self) -> None:
if self.status_error is not None:
raise self.status_error
def json(self) -> Any:
return self.payload
def _write_schedule(path: Path, agents: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
yaml.safe_dump(
{"version": "1", "timezone": "Europe/Berlin", "agents": agents},
sort_keys=False,
),
encoding="utf-8",
)
def test_discover_scheduled_repos_emits_enabled_coach(tmp_path, monkeypatch) -> None:
repo_root = tmp_path / "pilot-repo"
repo_root.mkdir()
_write_schedule(
repo_root / ".kaizen" / "schedule.yml",
{"coach": {"cadence": "daily", "cron": "15 * * * *", "enabled": True}},
)
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
return DummyResponse(
[
{
"slug": "pilot-repo",
"domain_slug": "custodian",
"host_paths": {"testhost": str(repo_root)},
}
]
)
monkeypatch.setenv("STATE_HUB_URL", "http://hub.test")
monkeypatch.setenv("KAIZEN_RUNNER_HOST", "testhost")
monkeypatch.setattr(httpx, "get", fake_get)
result = discover_kaizen_scheduled_repos({})
assert len(result["scheduled_runs"]) == 1
run = result["scheduled_runs"][0]
assert run["repo"] == "pilot-repo"
assert run["agent"] == "coach"
assert run["enabled"] is True
assert "schedule prepare coach" in run["prepare_command"]
def test_discover_scheduled_repos_skips_disabled_coach(tmp_path, monkeypatch) -> None:
repo_root = tmp_path / "pilot-repo"
repo_root.mkdir()
_write_schedule(
repo_root / ".kaizen" / "schedule.yml",
{"coach": {"cadence": "daily", "enabled": False}},
)
monkeypatch.setenv("STATE_HUB_URL", "http://hub.test")
monkeypatch.setenv("KAIZEN_RUNNER_HOST", "testhost")
monkeypatch.setattr(
httpx,
"get",
lambda url, **kwargs: DummyResponse(
[{"slug": "pilot-repo", "host_paths": {"testhost": str(repo_root)}}]
),
)
result = discover_kaizen_scheduled_repos({})
assert result["scheduled_runs"] == []
def test_discover_scheduled_repos_skips_missing_schedule(tmp_path, monkeypatch) -> None:
repo_root = tmp_path / "no-schedule"
repo_root.mkdir()
monkeypatch.setenv("STATE_HUB_URL", "http://hub.test")
monkeypatch.setenv("KAIZEN_RUNNER_HOST", "testhost")
monkeypatch.setattr(
httpx,
"get",
lambda url, **kwargs: DummyResponse(
[{"slug": "no-schedule", "host_paths": {"testhost": str(repo_root)}}]
),
)
result = discover_kaizen_scheduled_repos({})
assert result["scheduled_runs"] == []
def test_discover_scheduled_repos_skips_invalid_schedule(tmp_path, monkeypatch) -> None:
repo_root = tmp_path / "bad-schedule"
schedule = repo_root / ".kaizen" / "schedule.yml"
schedule.parent.mkdir(parents=True)
schedule.write_text("version: '2'\nagents: {}\n", encoding="utf-8")
monkeypatch.setenv("STATE_HUB_URL", "http://hub.test")
monkeypatch.setenv("KAIZEN_RUNNER_HOST", "testhost")
monkeypatch.setattr(
httpx,
"get",
lambda url, **kwargs: DummyResponse(
[{"slug": "bad-schedule", "host_paths": {"testhost": str(repo_root)}}]
),
)
result = discover_kaizen_scheduled_repos({})
assert result["scheduled_runs"] == []
def test_discover_scheduled_repos_filters_by_roster_and_cadence(
tmp_path, monkeypatch
) -> None:
repo_a = tmp_path / "kaizen-agentic"
repo_b = tmp_path / "other-repo"
for root in (repo_a, repo_b):
_write_schedule(
root / ".kaizen" / "schedule.yml",
{
"coach": {"cadence": "daily", "enabled": True},
"optimization": {"cadence": "weekly", "enabled": True},
},
)
roster = tmp_path / "roster.yaml"
roster.write_text(
yaml.safe_dump(
{
"active": [
{"slug": "kaizen-agentic", "agents": ["coach"], "status": "active"}
]
}
),
encoding="utf-8",
)
monkeypatch.setenv("STATE_HUB_URL", "http://hub.test")
monkeypatch.setenv("KAIZEN_RUNNER_HOST", "testhost")
monkeypatch.setattr(
httpx,
"get",
lambda url, **kwargs: DummyResponse(
[
{"slug": "kaizen-agentic", "host_paths": {"testhost": str(repo_a)}},
{"slug": "other-repo", "host_paths": {"testhost": str(repo_b)}},
]
),
)
result = discover_kaizen_scheduled_repos(
{"roster": str(roster), "cadence": "daily"}
)
agents = {r["agent"] for r in result["scheduled_runs"]}
repos = {r["repo"] for r in result["scheduled_runs"]}
assert repos == {"kaizen-agentic"}
assert agents == {"coach"}
def test_hub_unreachable_raises(monkeypatch) -> None:
monkeypatch.setenv("STATE_HUB_URL", "http://hub.test")
def fail_get(url: str, **kwargs: Any) -> DummyResponse:
raise httpx.ConnectError("down")
monkeypatch.setattr(httpx, "get", fail_get)
with pytest.raises(RuntimeError, match="State Hub unreachable"):
discover_kaizen_scheduled_repos({})
def test_resolver_registry_alias() -> None:
resolver = KaizenContextResolver()
assert resolver.resolve("unknown_query", None, {}) == {}