From 0c2df43a2fc3e6b869d41e4872cef04aead41180 Mon Sep 17 00:00:00 2001 From: tegwick Date: Fri, 19 Jun 2026 01:57:56 +0200 Subject: [PATCH] Add schedule promote for atomic cadence promotion Orchestrates cadence.yml, activity-definitions, fleet schedule.yml, and activity-core sync in one command. Supports --dry-run and --fleet-only for repairing partial promotions. --- .kaizen/schedule.yml | 18 +- .../customer-engagement-playbook.md | 16 +- src/kaizen_agentic/cli.py | 100 ++++ src/kaizen_agentic/engagement_promote.py | 555 ++++++++++++++++++ tests/test_engagement_promote.py | 161 +++++ uv.lock | 15 +- 6 files changed, 853 insertions(+), 12 deletions(-) create mode 100644 src/kaizen_agentic/engagement_promote.py create mode 100644 tests/test_engagement_promote.py diff --git a/.kaizen/schedule.yml b/.kaizen/schedule.yml index e33ef14..cf2d279 100644 --- a/.kaizen/schedule.yml +++ b/.kaizen/schedule.yml @@ -1,17 +1,15 @@ -# Kaizen scheduled agent execution (ADR-005) -# Engagement: coulomb-loop — stabilize phase (daily crons per ADR-003) -# Promoted 2026-06-18 after 3/3 bootstrap E2E cycles +# Kaizen scheduled agent execution manifest (ADR-005) +# Engagement: coulomb-loop bootstrap — weekly cadence +# Regulator promotes cadence per customer engagement policy (ADR-003). +# Validate with: kaizen-agentic schedule validate version: '1' timezone: Europe/Berlin agents: coach: - cadence: daily - cron: "0 9 * * *" + cadence: weekly + cron: 0 9 * * 1 enabled: true optimization: - cadence: daily - cron: "0 10 * * *" + cadence: weekly + cron: 0 10 * * 1 enabled: true - tdd-workflow: - cadence: monthly - enabled: false diff --git a/docs/integrations/customer-engagement-playbook.md b/docs/integrations/customer-engagement-playbook.md index 738c5a2..f4f18a0 100644 --- a/docs/integrations/customer-engagement-playbook.md +++ b/docs/integrations/customer-engagement-playbook.md @@ -65,7 +65,21 @@ Requires `pip install 'kaizen-agentic[events]'` for `--emit-event`. ## Cadence promotion -Customer regulator (LOOP-WP-0004) approves promotion. Re-init schedules: +Customer regulator (LOOP-WP-0004) approves promotion. Use atomic promote to +align all three layers (cadence.yml, activity-definitions, fleet schedule.yml, +activity-core sync): + +```bash +kaizen-agentic schedule promote \ + --engagement-repo /path/to/customer-loop \ + --engagement \ + --to-phase operate \ + --activity-core /path/to/activity-core +``` + +Dry-run: `--dry-run`. Repair fleet drift after a partial promotion: `--fleet-only`. + +Legacy per-layer commands still work: ```bash kaizen-agentic schedule init --engagement --bootstrap-cadence daily --force diff --git a/src/kaizen_agentic/cli.py b/src/kaizen_agentic/cli.py index 3ed74a2..ed0c2a2 100644 --- a/src/kaizen_agentic/cli.py +++ b/src/kaizen_agentic/cli.py @@ -23,6 +23,7 @@ from .integrations.event_bus import ( from .integrations.helix import HelixCorrelationAdapter, enrich_helix_correlation from .metrics import MetricsStore, OptimizerStore, performance_summary_markdown from .optimization import OptimizationLoop, MIN_SAMPLES_FOR_RECOMMENDATIONS +from .engagement_promote import promote_engagement from .schedule import ( ScheduleError, default_schedule_yaml, @@ -1534,6 +1535,105 @@ def schedule_init( click.echo(" Validate with: kaizen-agentic schedule validate") +@schedule.command("promote") +@click.option( + "--engagement-repo", + "-e", + required=True, + type=click.Path(exists=True, file_okay=False, path_type=Path), + help="Customer engagement repo (e.g. coulomb-loop)", +) +@click.option( + "--engagement", + default="coulomb-loop", + show_default=True, + help="Engagement slug written into fleet schedule.yml headers", +) +@click.option( + "--to-phase", + type=click.Choice(["stabilize", "operate"]), + default=None, + help="Target phase (default: next phase after current)", +) +@click.option("--loop", default=None, help="Promote a single loop id only") +@click.option("--dry-run", is_flag=True, help="Print planned actions without writing") +@click.option("--skip-cadence", is_flag=True, help="Skip loops/*/cadence.yml updates") +@click.option( + "--skip-definitions", is_flag=True, help="Skip activity-definitions transforms" +) +@click.option("--skip-fleet", is_flag=True, help="Skip .kaizen/schedule.yml on roster") +@click.option("--skip-sync", is_flag=True, help="Skip activity-core definition sync") +@click.option( + "--fleet-only", + is_flag=True, + help="Only update fleet schedule.yml (+ sync unless --skip-sync)", +) +@click.option( + "--activity-core", + type=click.Path(exists=True, file_okay=False, path_type=Path), + default=None, + help="activity-core repo root (or ACTIVITY_CORE_ROOT env)", +) +def schedule_promote( + engagement_repo: Path, + engagement: str, + to_phase: Optional[str], + loop: Optional[str], + dry_run: bool, + skip_cadence: bool, + skip_definitions: bool, + skip_fleet: bool, + skip_sync: bool, + fleet_only: bool, + activity_core: Optional[Path], +): + """Atomically promote cadence across cadence.yml, definitions, fleet, and sync.""" + if fleet_only: + skip_cadence = True + skip_definitions = True + if to_phase is None: + to_phase = "operate" + + result = promote_engagement( + engagement_repo, + engagement_slug=engagement, + to_phase=to_phase, + loop=loop, + dry_run=dry_run, + skip_cadence=skip_cadence, + skip_definitions=skip_definitions, + skip_fleet=skip_fleet, + skip_sync=skip_sync, + activity_core_root=activity_core, + ) + + if dry_run: + click.echo("Dry run — planned actions:") + else: + click.echo("Promotion complete — actions:") + + by_layer: dict[str, list[str]] = {} + for action in result.actions: + by_layer.setdefault(action.layer, []).append(action.description) + + for layer in ("cadence", "definitions", "fleet", "sync"): + items = by_layer.get(layer) + if items: + click.echo(f"\n[{layer}]") + for item in items: + click.echo(f" • {item}") + + if result.errors: + click.echo("\nWarnings / errors:", err=True) + for err in result.errors: + click.echo(f" ! {err}", err=True) + if not result.actions: + sys.exit(1) + + if not result.actions and not result.errors: + click.echo("Nothing to do — layers already aligned.") + + @schedule.command("list") @click.option("--target", "-t", default=".", help="Project root (default: current)") @click.option("--all", "show_all", is_flag=True, help="Include disabled entries") diff --git a/src/kaizen_agentic/engagement_promote.py b/src/kaizen_agentic/engagement_promote.py new file mode 100644 index 0000000..ee0d3ad --- /dev/null +++ b/src/kaizen_agentic/engagement_promote.py @@ -0,0 +1,555 @@ +"""Atomic cadence promotion across engagement contract layers (ADR-003). + +Orchestrates three layers that must stay aligned on promotion: + +1. Customer policy — ``loops/*/cadence.yml`` in the engagement repo +2. Scheduler contract — ``activity-definitions/*.md`` → activity-core sync +3. Fleet opt-in — ``.kaizen/schedule.yml`` on roster target repos + +See coulomb-loop ADR-002 / ADR-003 and supplier customer-engagement-playbook. +""" + +from __future__ import annotations + +import os +import re +import subprocess +from dataclasses import dataclass, field +from datetime import date +from pathlib import Path +from typing import Any, Callable + +import yaml + +from .schedule import ScheduleError, engagement_schedule_yaml, schedule_path + +PHASE_ORDER = ("bootstrap", "stabilize", "operate") +PHASE_FILE_PREFIX = { + "bootstrap": "hourly", + "stabilize": "daily", + "operate": "weekly", +} +CADENCE_ENUM = { + "bootstrap": "daily", # hourly crons keep daily enum for resolver filter + "stabilize": "daily", + "operate": "weekly", +} + +LOOP_DIR_BY_ID = { + "kaizen-improvement-stack": "kaizen-stack", + "quality-escalation": "quality-escalation", + "registry-hygiene": "registry-hygiene", + "loop-regulator": "regulator", +} + +# Activity-definition stems per loop (without phase prefix). Crons come from +# operate_target in cadence.yml when present, else these defaults. +DEFAULT_OPERATE_CRONS: dict[str, dict[str, str]] = { + "kaizen-improvement-stack": { + "metrics-optimize": "0 8 * * 1", + "coach-orientation": "0 9 * * 1", + "optimization-review": "0 10 * * 1", + }, + "registry-hygiene": { + "registry-hygiene-sweep": "0 9 * * 1", + }, + "quality-escalation": { + "metrics-health-sweep": "0 6 * * 1", + }, + "loop-regulator": { + "loop-health-collector": "0 11 * * 1", + }, +} + +EVENT_DEFINITIONS = frozenset({"low-success-rate-review"}) + + +class PromoteError(Exception): + """Raised when promotion cannot proceed.""" + + +@dataclass +class PromoteAction: + layer: str + description: str + path: str | None = None + + +@dataclass +class PromoteResult: + actions: list[PromoteAction] = field(default_factory=list) + errors: list[str] = field(default_factory=list) + + @property + def ok(self) -> bool: + return not self.errors + + +def _next_phase(current: str) -> str | None: + try: + idx = PHASE_ORDER.index(current) + except ValueError: + return None + if idx + 1 >= len(PHASE_ORDER): + return None + return PHASE_ORDER[idx + 1] + + +def _loop_paths(engagement_repo: Path) -> dict[str, Path]: + loops_root = engagement_repo / "loops" + if not loops_root.is_dir(): + raise PromoteError(f"loops/ not found under {engagement_repo}") + mapping: dict[str, Path] = {} + for loop_id, dirname in LOOP_DIR_BY_ID.items(): + path = loops_root / dirname / "cadence.yml" + if path.is_file(): + mapping[loop_id] = path + if not mapping: + raise PromoteError(f"no loops/*/cadence.yml found under {engagement_repo}") + return mapping + + +def _load_yaml(path: Path) -> dict[str, Any]: + data = yaml.safe_load(path.read_text(encoding="utf-8")) + if not isinstance(data, dict): + raise PromoteError(f"expected mapping in {path}") + return data + + +def _write_yaml(path: Path, data: dict[str, Any]) -> None: + path.write_text(yaml.safe_dump(data, sort_keys=False), encoding="utf-8") + + +def _split_markdown_frontmatter(text: str) -> tuple[dict[str, Any], str]: + if not text.startswith("---"): + raise PromoteError("activity definition missing YAML frontmatter") + parts = text.split("---", 2) + if len(parts) < 3: + raise PromoteError("activity definition frontmatter not closed") + meta = yaml.safe_load(parts[1]) + if not isinstance(meta, dict): + raise PromoteError("activity definition frontmatter must be a mapping") + body = parts[2].lstrip("\n") + return meta, body + + +def _render_markdown(meta: dict[str, Any], body: str) -> str: + header = yaml.safe_dump(meta, sort_keys=False) + return f"---\n{header}---\n\n{body}" + + +def _roster_repos(engagement_repo: Path) -> list[tuple[str, Path, list[str]]]: + roster_path = engagement_repo / "loops" / "kaizen-stack" / "roster.yaml" + if not roster_path.is_file(): + return [] + data = _load_yaml(roster_path) + active = data.get("active") or [] + repos: list[tuple[str, Path, list[str]]] = [] + for entry in active: + if not isinstance(entry, dict): + continue + slug = str(entry.get("slug", "")) + root = entry.get("root") + agents = entry.get("agents") or ["coach", "optimization"] + if slug and root: + repos.append((slug, Path(str(root)), list(agents))) + return repos + + +def _operate_crons(loop_id: str, cadence: dict[str, Any]) -> dict[str, str]: + target = cadence.get("operate_target") + crons = dict(DEFAULT_OPERATE_CRONS.get(loop_id, {})) + if not isinstance(target, dict): + return crons + if loop_id == "kaizen-improvement-stack": + chain = target.get("chain") + if isinstance(chain, dict): + for stem, cron_key in [ + ("metrics-optimize", "metrics"), + ("coach-orientation", "coach"), + ("optimization-review", "optimization"), + ]: + if cron_key in chain: + crons[stem] = str(chain[cron_key]) + elif loop_id == "registry-hygiene" and "cron" in target: + crons["registry-hygiene-sweep"] = str(target["cron"]) + elif loop_id == "quality-escalation" and "sweep_cron" in target: + crons["metrics-health-sweep"] = str(target["sweep_cron"]) + elif loop_id == "loop-regulator": + if "collector_cron" in target: + crons["loop-health-collector"] = str(target["collector_cron"]) + return crons + + +def _apply_operate_target( + cadence: dict[str, Any], loop_id: str, promoted_at: str +) -> None: + target = cadence.pop("operate_target", None) + cadence["phase"] = "operate" + cadence["promoted_at"] = promoted_at + if not isinstance(target, dict): + target = {} + + if loop_id == "kaizen-improvement-stack": + chain = target.get("chain") or DEFAULT_OPERATE_CRONS[loop_id] + cadence["cron"] = chain.get("metrics", "0 8 * * 1") + cadence["chain"] = { + "metrics": chain.get("metrics", "0 8 * * 1"), + "coach": chain.get("coach", "0 9 * * 1"), + "optimization": chain.get("optimization", "0 10 * * 1"), + } + elif loop_id == "registry-hygiene": + cadence["cron"] = target.get("cron", "0 9 * * 1") + cadence["batch_size"] = target.get("batch_size", 2) + cadence["domain_rotation"] = target.get("domain_rotation", "weekly") + elif loop_id == "quality-escalation": + cadence["cron"] = target.get("sweep_cron", "0 6 * * 1") + cadence["sweep_fallback"] = "weekly-metrics-health-sweep" + cadence.pop("stabilize_target", None) + elif loop_id == "loop-regulator": + cron = target.get("collector_cron", "0 11 * * 1") + cadence["collector_cron"] = cron + cadence["regulator_session_cron"] = target.get("regulator_session_cron", cron) + + +def _transform_definition( + path: Path, + from_prefix: str, + to_prefix: str, + cron: str, + cadence_enum: str, +) -> Path: + meta, body = _split_markdown_frontmatter(path.read_text(encoding="utf-8")) + old_id = str(meta.get("id", "")) + stem = path.stem.removeprefix(f"{from_prefix}-") + new_id = f"coulomb-{to_prefix}-{stem.replace('-', '-')}" + if old_id.startswith("coulomb-"): + new_id = old_id.replace(f"coulomb-{from_prefix}-", f"coulomb-{to_prefix}-", 1) + + meta["id"] = new_id + if isinstance(meta.get("name"), str): + meta["name"] = meta["name"].replace(from_prefix.title(), to_prefix.title()) + meta["name"] = re.sub( + rf"\b{from_prefix}\b", to_prefix, meta["name"], flags=re.IGNORECASE + ) + + trigger = meta.get("trigger") + if isinstance(trigger, dict) and trigger.get("type") == "cron": + trigger["cron_expression"] = cron + + for source in meta.get("context_sources") or []: + if isinstance(source, dict): + params = source.get("params") + if isinstance(params, dict) and "cadence" in params: + params["cadence"] = cadence_enum + + body = body.replace(f"{from_prefix}-", f"{to_prefix}-") + body = re.sub(rf"\b{from_prefix}\b", to_prefix, body, flags=re.IGNORECASE) + + dest = path.with_name(f"{to_prefix}-{stem}.md") + dest.write_text(_render_markdown(meta, body), encoding="utf-8") + if dest != path: + path.unlink() + return dest + + +def _promote_definitions( + engagement_repo: Path, + from_phase: str, + to_phase: str, + loop_crons: dict[str, dict[str, str]], + *, + dry_run: bool, + result: PromoteResult, +) -> None: + from_prefix = PHASE_FILE_PREFIX[from_phase] + to_prefix = PHASE_FILE_PREFIX[to_phase] + cadence_enum = CADENCE_ENUM[to_phase] + defs_dir = engagement_repo / "activity-definitions" + if not defs_dir.is_dir(): + raise PromoteError(f"activity-definitions/ not found under {engagement_repo}") + + for loop_id, crons in loop_crons.items(): + for stem, cron in crons.items(): + src = defs_dir / f"{from_prefix}-{stem}.md" + if not src.is_file(): + result.errors.append(f"missing definition file: {src}") + continue + dest = defs_dir / f"{to_prefix}-{stem}.md" + result.actions.append( + PromoteAction( + layer="definitions", + description=f"{src.name} → {dest.name} (cron {cron})", + path=str(src), + ) + ) + if dry_run: + continue + _transform_definition(src, from_prefix, to_prefix, cron, cadence_enum) + + # Update event-definition fallback references + for name in EVENT_DEFINITIONS: + path = defs_dir / f"{name}.md" + if not path.is_file(): + continue + text = path.read_text(encoding="utf-8") + new_text = text.replace( + f"{from_prefix}-metrics-health-sweep", + f"{to_prefix}-metrics-health-sweep", + ) + new_text = new_text.replace( + f"`0 6 * * *`", + f"`{loop_crons.get('quality-escalation', {}).get('metrics-health-sweep', '0 6 * * 1')}`", + ) + if new_text != text: + result.actions.append( + PromoteAction( + layer="definitions", + description=f"update fallback reference in {path.name}", + path=str(path), + ) + ) + if not dry_run: + path.write_text(new_text, encoding="utf-8") + + +def _promote_cadence_files( + engagement_repo: Path, + to_phase: str, + *, + loop_filter: str | None, + dry_run: bool, + result: PromoteResult, +) -> dict[str, dict[str, str]]: + if to_phase != "operate": + raise PromoteError(f"promotion to '{to_phase}' not implemented yet") + + promoted_at = date.today().isoformat() + loop_crons: dict[str, dict[str, str]] = {} + for loop_id, path in _loop_paths(engagement_repo).items(): + if loop_filter and loop_filter != loop_id: + continue + data = _load_yaml(path) + current = str(data.get("phase", "")) + if current == to_phase: + loop_crons[loop_id] = _operate_crons(loop_id, data) + continue + if current != "stabilize": + result.errors.append( + f"{loop_id}: cannot promote from phase '{current}' to '{to_phase}'" + ) + continue + loop_crons[loop_id] = _operate_crons(loop_id, data) + result.actions.append( + PromoteAction( + layer="cadence", + description=f"{path.relative_to(engagement_repo)}: {current} → {to_phase}", + path=str(path), + ) + ) + if dry_run: + continue + _apply_operate_target(data, loop_id, promoted_at) + _write_yaml(path, data) + + roster_path = engagement_repo / "loops" / "kaizen-stack" / "roster.yaml" + if roster_path.is_file() and ( + loop_filter is None or loop_filter == "kaizen-improvement-stack" + ): + roster = _load_yaml(roster_path) + if roster.get("phase") != to_phase: + result.actions.append( + PromoteAction( + layer="cadence", + description=f"roster.yaml phase → {to_phase}", + path=str(roster_path), + ) + ) + if not dry_run: + roster["phase"] = to_phase + _write_yaml(roster_path, roster) + + return loop_crons + + +def _sync_roster_phase( + engagement_repo: Path, + to_phase: str, + *, + dry_run: bool, + result: PromoteResult, +) -> None: + roster_path = engagement_repo / "loops" / "kaizen-stack" / "roster.yaml" + if not roster_path.is_file(): + return + roster = _load_yaml(roster_path) + if roster.get("phase") == to_phase: + return + result.actions.append( + PromoteAction( + layer="cadence", + description=f"roster.yaml phase → {to_phase}", + path=str(roster_path), + ) + ) + if not dry_run: + roster["phase"] = to_phase + _write_yaml(roster_path, roster) + + +def _promote_fleet( + engagement_repo: Path, + engagement_slug: str, + to_phase: str, + *, + dry_run: bool, + result: PromoteResult, +) -> None: + if to_phase != "operate": + return + _sync_roster_phase(engagement_repo, to_phase, dry_run=dry_run, result=result) + for slug, root, agents in _roster_repos(engagement_repo): + if not root.is_dir(): + result.errors.append(f"fleet repo root missing: {root} ({slug})") + continue + sched = schedule_path(root) + result.actions.append( + PromoteAction( + layer="fleet", + description=f"{slug}: .kaizen/schedule.yml → weekly ({', '.join(agents)})", + path=str(sched), + ) + ) + if dry_run: + continue + yaml_text = engagement_schedule_yaml( + engagement_slug, + agents=agents, + bootstrap_cadence="weekly", + ) + sched.parent.mkdir(parents=True, exist_ok=True) + sched.write_text(yaml_text, encoding="utf-8") + + +def _sync_activity_core( + engagement_repo: Path, + activity_core_root: Path | None, + *, + dry_run: bool, + result: PromoteResult, +) -> None: + ac_root = activity_core_root or Path(os.environ.get("ACTIVITY_CORE_ROOT", "")) + if not ac_root or not (ac_root / "src" / "activity_core").is_dir(): + result.errors.append( + "activity-core root not found (set --activity-core or ACTIVITY_CORE_ROOT)" + ) + return + if not os.environ.get("ACTCORE_DB_URL"): + result.errors.append("ACTCORE_DB_URL not set — skip sync or configure database") + return + + cmd = ["uv", "run", "python", "-m", "activity_core.sync_activity_definitions"] + result.actions.append( + PromoteAction( + layer="sync", + description="activity-core sync_activity_definitions", + path=str(ac_root), + ) + ) + if dry_run: + return + env = os.environ.copy() + env["ACTIVITY_DEFINITION_DIRS"] = str(engagement_repo.resolve()) + subprocess.run(cmd, cwd=ac_root, env=env, check=True) + + +def promote_engagement( + engagement_repo: Path, + *, + engagement_slug: str = "coulomb-loop", + to_phase: str | None = None, + loop: str | None = None, + dry_run: bool = False, + skip_cadence: bool = False, + skip_definitions: bool = False, + skip_fleet: bool = False, + skip_sync: bool = False, + activity_core_root: Path | None = None, +) -> PromoteResult: + """Run atomic cadence promotion (or dry-run plan).""" + engagement_repo = engagement_repo.resolve() + result = PromoteResult() + + if not engagement_repo.is_dir(): + result.errors.append(f"engagement repo not found: {engagement_repo}") + return result + + # Resolve target phase from first loop if not specified + if to_phase is None: + for path in _loop_paths(engagement_repo).values(): + data = _load_yaml(path) + nxt = _next_phase(str(data.get("phase", "bootstrap"))) + if nxt: + to_phase = nxt + break + if to_phase is None: + result.errors.append("could not infer target phase (all loops at operate?)") + return result + + loop_crons: dict[str, dict[str, str]] = {} + from_phase = ( + PHASE_ORDER[PHASE_ORDER.index(to_phase) - 1] + if to_phase in PHASE_ORDER[1:] + else "stabilize" + ) + + try: + if not skip_cadence: + loop_crons = _promote_cadence_files( + engagement_repo, + to_phase, + loop_filter=loop, + dry_run=dry_run, + result=result, + ) + else: + for loop_id, path in _loop_paths(engagement_repo).items(): + if loop and loop != loop_id: + continue + loop_crons[loop_id] = _operate_crons(loop_id, _load_yaml(path)) + + if ( + not skip_definitions + and from_phase in PHASE_FILE_PREFIX + and to_phase in PHASE_FILE_PREFIX + ): + if from_phase != to_phase: + _promote_definitions( + engagement_repo, + from_phase, + to_phase, + loop_crons, + dry_run=dry_run, + result=result, + ) + + if not skip_fleet: + _promote_fleet( + engagement_repo, + engagement_slug, + to_phase, + dry_run=dry_run, + result=result, + ) + + if not skip_sync: + _sync_activity_core( + engagement_repo, + activity_core_root, + dry_run=dry_run, + result=result, + ) + except (PromoteError, ScheduleError, subprocess.CalledProcessError) as exc: + result.errors.append(str(exc)) + + return result diff --git a/tests/test_engagement_promote.py b/tests/test_engagement_promote.py new file mode 100644 index 0000000..2c9e5ba --- /dev/null +++ b/tests/test_engagement_promote.py @@ -0,0 +1,161 @@ +"""Tests for atomic engagement cadence promotion.""" + +from __future__ import annotations + +from pathlib import Path + +import yaml +from click.testing import CliRunner + +from kaizen_agentic.cli import cli +from kaizen_agentic.engagement_promote import promote_engagement +from kaizen_agentic.schedule import schedule_path + + +def _write_cadence( + repo: Path, + loop_dir: str, + loop_id: str, + phase: str, + *, + with_operate_target: bool = True, +) -> None: + data: dict = {"loop": loop_id, "phase": phase, "regulator_approval": "approved"} + if phase == "stabilize" and with_operate_target: + if loop_id == "kaizen-improvement-stack": + data["operate_target"] = { + "phase": "operate", + "chain": { + "metrics": "0 8 * * 1", + "coach": "0 9 * * 1", + "optimization": "0 10 * * 1", + }, + } + elif loop_id == "registry-hygiene": + data["operate_target"] = { + "phase": "operate", + "cron": "0 9 * * 1", + "batch_size": 2, + } + path = repo / "loops" / loop_dir / "cadence.yml" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(yaml.safe_dump(data, sort_keys=False), encoding="utf-8") + + +def _write_definition(repo: Path, name: str, cron: str, cadence: str) -> None: + defs_dir = repo / "activity-definitions" + defs_dir.mkdir(parents=True, exist_ok=True) + stem = name.removeprefix("daily-") + content = f"""--- +id: coulomb-daily-{stem} +name: Daily test {stem} +enabled: true +trigger: + type: cron + cron_expression: "{cron}" +context_sources: + - type: kaizen + query: discover_kaizen_scheduled_repos + params: + cadence: {cadence} +--- + +# Daily {stem} +""" + (repo / "activity-definitions" / name).write_text(content, encoding="utf-8") + + +def _engagement_fixture(tmp_path: Path) -> Path: + repo = tmp_path / "coulomb-loop" + repo.mkdir() + _write_cadence(repo, "kaizen-stack", "kaizen-improvement-stack", "stabilize") + _write_cadence(repo, "registry-hygiene", "registry-hygiene", "stabilize") + _write_definition(repo, "daily-coach-orientation.md", "0 9 * * *", "daily") + _write_definition(repo, "daily-metrics-optimize.md", "0 8 * * *", "daily") + _write_definition(repo, "daily-optimization-review.md", "0 10 * * *", "daily") + _write_definition(repo, "daily-registry-hygiene-sweep.md", "0 7 * * *", "daily") + + roster = { + "version": "1", + "loop": "kaizen-improvement-stack", + "phase": "bootstrap", + "active": [ + { + "slug": "pilot-a", + "root": str(tmp_path / "pilot-a"), + "agents": ["coach", "optimization"], + } + ], + } + roster_path = repo / "loops" / "kaizen-stack" / "roster.yaml" + roster_path.parent.mkdir(parents=True, exist_ok=True) + roster_path.write_text(yaml.safe_dump(roster, sort_keys=False), encoding="utf-8") + (tmp_path / "pilot-a").mkdir() + return repo + + +class TestEngagementPromote: + def test_dry_run_lists_all_layers(self, tmp_path: Path) -> None: + repo = _engagement_fixture(tmp_path) + result = promote_engagement( + repo, + to_phase="operate", + dry_run=True, + skip_sync=True, + ) + layers = {a.layer for a in result.actions} + assert "cadence" in layers + assert "definitions" in layers + assert "fleet" in layers + assert result.ok + + def test_promote_updates_cadence_and_definitions(self, tmp_path: Path) -> None: + repo = _engagement_fixture(tmp_path) + result = promote_engagement( + repo, + to_phase="operate", + skip_fleet=True, + skip_sync=True, + ) + assert result.ok + cadence = yaml.safe_load( + (repo / "loops" / "kaizen-stack" / "cadence.yml").read_text() + ) + assert cadence["phase"] == "operate" + assert "operate_target" not in cadence + assert (repo / "activity-definitions" / "weekly-coach-orientation.md").is_file() + assert not ( + repo / "activity-definitions" / "daily-coach-orientation.md" + ).is_file() + + def test_fleet_only_writes_schedule(self, tmp_path: Path) -> None: + repo = _engagement_fixture(tmp_path) + # Pre-promote cadence so fleet-only path applies + promote_engagement(repo, to_phase="operate", skip_fleet=True, skip_sync=True) + result = promote_engagement( + repo, + to_phase="operate", + skip_cadence=True, + skip_definitions=True, + skip_sync=True, + ) + sched = schedule_path(tmp_path / "pilot-a") + assert sched.is_file() + assert "cadence: weekly" in sched.read_text() + + def test_cli_fleet_only(self, tmp_path: Path) -> None: + repo = _engagement_fixture(tmp_path) + runner = CliRunner() + result = runner.invoke( + cli, + [ + "schedule", + "promote", + "--engagement-repo", + str(repo), + "--fleet-only", + "--skip-sync", + ], + ) + assert result.exit_code == 0, result.output + assert "[fleet]" in result.output diff --git a/uv.lock b/uv.lock index fc054cc..57e5506 100644 --- a/uv.lock +++ b/uv.lock @@ -805,7 +805,7 @@ wheels = [ [[package]] name = "kaizen-agentic" -version = "1.3.0" +version = "1.4.0" source = { editable = "." } dependencies = [ { name = "click", version = "8.1.8", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, @@ -835,6 +835,9 @@ dev = [ { name = "pytest-cov", version = "5.0.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, { name = "pytest-cov", version = "7.1.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.9'" }, ] +events = [ + { name = "nats-py" }, +] test = [ { name = "pytest", version = "8.3.5", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, { name = "pytest", version = "8.4.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version == '3.9.*'" }, @@ -852,6 +855,7 @@ requires-dist = [ { name = "click", specifier = ">=8.0.0" }, { name = "flake8", marker = "extra == 'dev'", specifier = ">=5.0.0" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.0.0" }, + { name = "nats-py", marker = "extra == 'events'", specifier = ">=2.6.0" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = ">=2.20.0" }, { name = "pydantic", specifier = ">=2.0.0" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=6.0.0" }, @@ -1149,6 +1153,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/79/7b/2c79738432f5c924bef5071f933bcc9efd0473bac3b4aa584a6f7c1c8df8/mypy_extensions-1.1.0-py3-none-any.whl", hash = "sha256:1be4cccdb0f2482337c4743e60421de3a356cd97508abadd57d47403e94f5505", size = 4963 }, ] +[[package]] +name = "nats-py" +version = "2.15.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/02/f0/fc5e93f2b0dd14a202590ad9d30eda1955ea872039b5204357348d0f4b1e/nats_py-2.15.0.tar.gz", hash = "sha256:6622c547d9a7d2313d9c147d46c386188f4ec2c7b5c9f9a0438a4d1b55f54a93", size = 75995 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/db/a8/b55606c7c621fb813c8ec78baf201d2c78bf6051091ec0c7ada572999e95/nats_py-2.15.0-py3-none-any.whl", hash = "sha256:9f8d36aa52a9926a88b8f1d70cf1fdce0ad387941479b500ee9ab3e51073cefd", size = 90334 }, +] + [[package]] name = "nodeenv" version = "1.10.0"