"""Atomic cadence promotion across engagement contract layers (ADR-003). Orchestrates three layers that must stay aligned on promotion: 1. Customer policy — ``loops/*/cadence.yml`` in the engagement repo 2. Scheduler contract — ``activity-definitions/*.md`` → activity-core sync 3. Fleet opt-in — ``.kaizen/schedule.yml`` on roster target repos See coulomb-loop ADR-002 / ADR-003 and supplier customer-engagement-playbook. """ from __future__ import annotations import os import re import subprocess from dataclasses import dataclass, field from datetime import date from pathlib import Path from typing import Any, Callable import yaml from .schedule import ScheduleError, engagement_schedule_yaml, schedule_path PHASE_ORDER = ("bootstrap", "stabilize", "operate") PHASE_FILE_PREFIX = { "bootstrap": "hourly", "stabilize": "daily", "operate": "weekly", } CADENCE_ENUM = { "bootstrap": "daily", # hourly crons keep daily enum for resolver filter "stabilize": "daily", "operate": "weekly", } LOOP_DIR_BY_ID = { "kaizen-improvement-stack": "kaizen-stack", "quality-escalation": "quality-escalation", "registry-hygiene": "registry-hygiene", "loop-regulator": "regulator", } # Activity-definition stems per loop (without phase prefix). Crons come from # operate_target in cadence.yml when present, else these defaults. DEFAULT_OPERATE_CRONS: dict[str, dict[str, str]] = { "kaizen-improvement-stack": { "metrics-optimize": "0 8 * * 1", "coach-orientation": "0 9 * * 1", "optimization-review": "0 10 * * 1", }, "registry-hygiene": { "registry-hygiene-sweep": "0 9 * * 1", }, "quality-escalation": { "metrics-health-sweep": "0 6 * * 1", }, "loop-regulator": { "loop-health-collector": "0 11 * * 1", }, } EVENT_DEFINITIONS = frozenset({"low-success-rate-review"}) class PromoteError(Exception): """Raised when promotion cannot proceed.""" @dataclass class PromoteAction: layer: str description: str path: str | None = None @dataclass class PromoteResult: actions: list[PromoteAction] = field(default_factory=list) errors: list[str] = field(default_factory=list) @property def ok(self) -> bool: return not self.errors def _next_phase(current: str) -> str | None: try: idx = PHASE_ORDER.index(current) except ValueError: return None if idx + 1 >= len(PHASE_ORDER): return None return PHASE_ORDER[idx + 1] def _loop_paths(engagement_repo: Path) -> dict[str, Path]: loops_root = engagement_repo / "loops" if not loops_root.is_dir(): raise PromoteError(f"loops/ not found under {engagement_repo}") mapping: dict[str, Path] = {} for loop_id, dirname in LOOP_DIR_BY_ID.items(): path = loops_root / dirname / "cadence.yml" if path.is_file(): mapping[loop_id] = path if not mapping: raise PromoteError(f"no loops/*/cadence.yml found under {engagement_repo}") return mapping def _load_yaml(path: Path) -> dict[str, Any]: data = yaml.safe_load(path.read_text(encoding="utf-8")) if not isinstance(data, dict): raise PromoteError(f"expected mapping in {path}") return data def _write_yaml(path: Path, data: dict[str, Any]) -> None: path.write_text(yaml.safe_dump(data, sort_keys=False), encoding="utf-8") def _split_markdown_frontmatter(text: str) -> tuple[dict[str, Any], str]: if not text.startswith("---"): raise PromoteError("activity definition missing YAML frontmatter") parts = text.split("---", 2) if len(parts) < 3: raise PromoteError("activity definition frontmatter not closed") meta = yaml.safe_load(parts[1]) if not isinstance(meta, dict): raise PromoteError("activity definition frontmatter must be a mapping") body = parts[2].lstrip("\n") return meta, body def _render_markdown(meta: dict[str, Any], body: str) -> str: header = yaml.safe_dump(meta, sort_keys=False) return f"---\n{header}---\n\n{body}" def _roster_repos(engagement_repo: Path) -> list[tuple[str, Path, list[str]]]: roster_path = engagement_repo / "loops" / "kaizen-stack" / "roster.yaml" if not roster_path.is_file(): return [] data = _load_yaml(roster_path) active = data.get("active") or [] repos: list[tuple[str, Path, list[str]]] = [] for entry in active: if not isinstance(entry, dict): continue slug = str(entry.get("slug", "")) root = entry.get("root") agents = entry.get("agents") or ["coach", "optimization"] if slug and root: repos.append((slug, Path(str(root)), list(agents))) return repos def _operate_crons(loop_id: str, cadence: dict[str, Any]) -> dict[str, str]: target = cadence.get("operate_target") crons = dict(DEFAULT_OPERATE_CRONS.get(loop_id, {})) if not isinstance(target, dict): return crons if loop_id == "kaizen-improvement-stack": chain = target.get("chain") if isinstance(chain, dict): for stem, cron_key in [ ("metrics-optimize", "metrics"), ("coach-orientation", "coach"), ("optimization-review", "optimization"), ]: if cron_key in chain: crons[stem] = str(chain[cron_key]) elif loop_id == "registry-hygiene" and "cron" in target: crons["registry-hygiene-sweep"] = str(target["cron"]) elif loop_id == "quality-escalation" and "sweep_cron" in target: crons["metrics-health-sweep"] = str(target["sweep_cron"]) elif loop_id == "loop-regulator": if "collector_cron" in target: crons["loop-health-collector"] = str(target["collector_cron"]) return crons def _apply_operate_target( cadence: dict[str, Any], loop_id: str, promoted_at: str ) -> None: target = cadence.pop("operate_target", None) cadence["phase"] = "operate" cadence["promoted_at"] = promoted_at if not isinstance(target, dict): target = {} if loop_id == "kaizen-improvement-stack": chain = target.get("chain") or DEFAULT_OPERATE_CRONS[loop_id] cadence["cron"] = chain.get("metrics", "0 8 * * 1") cadence["chain"] = { "metrics": chain.get("metrics", "0 8 * * 1"), "coach": chain.get("coach", "0 9 * * 1"), "optimization": chain.get("optimization", "0 10 * * 1"), } elif loop_id == "registry-hygiene": cadence["cron"] = target.get("cron", "0 9 * * 1") cadence["batch_size"] = target.get("batch_size", 2) cadence["domain_rotation"] = target.get("domain_rotation", "weekly") elif loop_id == "quality-escalation": cadence["cron"] = target.get("sweep_cron", "0 6 * * 1") cadence["sweep_fallback"] = "weekly-metrics-health-sweep" cadence.pop("stabilize_target", None) elif loop_id == "loop-regulator": cron = target.get("collector_cron", "0 11 * * 1") cadence["collector_cron"] = cron cadence["regulator_session_cron"] = target.get("regulator_session_cron", cron) def _transform_definition( path: Path, from_prefix: str, to_prefix: str, cron: str, cadence_enum: str, ) -> Path: meta, body = _split_markdown_frontmatter(path.read_text(encoding="utf-8")) old_id = str(meta.get("id", "")) stem = path.stem.removeprefix(f"{from_prefix}-") new_id = f"coulomb-{to_prefix}-{stem.replace('-', '-')}" if old_id.startswith("coulomb-"): new_id = old_id.replace(f"coulomb-{from_prefix}-", f"coulomb-{to_prefix}-", 1) meta["id"] = new_id if isinstance(meta.get("name"), str): meta["name"] = meta["name"].replace(from_prefix.title(), to_prefix.title()) meta["name"] = re.sub( rf"\b{from_prefix}\b", to_prefix, meta["name"], flags=re.IGNORECASE ) trigger = meta.get("trigger") if isinstance(trigger, dict) and trigger.get("type") == "cron": trigger["cron_expression"] = cron for source in meta.get("context_sources") or []: if isinstance(source, dict): params = source.get("params") if isinstance(params, dict) and "cadence" in params: params["cadence"] = cadence_enum body = body.replace(f"{from_prefix}-", f"{to_prefix}-") body = re.sub(rf"\b{from_prefix}\b", to_prefix, body, flags=re.IGNORECASE) dest = path.with_name(f"{to_prefix}-{stem}.md") dest.write_text(_render_markdown(meta, body), encoding="utf-8") if dest != path: path.unlink() return dest def _promote_definitions( engagement_repo: Path, from_phase: str, to_phase: str, loop_crons: dict[str, dict[str, str]], *, dry_run: bool, result: PromoteResult, ) -> None: from_prefix = PHASE_FILE_PREFIX[from_phase] to_prefix = PHASE_FILE_PREFIX[to_phase] cadence_enum = CADENCE_ENUM[to_phase] defs_dir = engagement_repo / "activity-definitions" if not defs_dir.is_dir(): raise PromoteError(f"activity-definitions/ not found under {engagement_repo}") for loop_id, crons in loop_crons.items(): for stem, cron in crons.items(): src = defs_dir / f"{from_prefix}-{stem}.md" if not src.is_file(): result.errors.append(f"missing definition file: {src}") continue dest = defs_dir / f"{to_prefix}-{stem}.md" result.actions.append( PromoteAction( layer="definitions", description=f"{src.name} → {dest.name} (cron {cron})", path=str(src), ) ) if dry_run: continue _transform_definition(src, from_prefix, to_prefix, cron, cadence_enum) # Update event-definition fallback references for name in EVENT_DEFINITIONS: path = defs_dir / f"{name}.md" if not path.is_file(): continue text = path.read_text(encoding="utf-8") new_text = text.replace( f"{from_prefix}-metrics-health-sweep", f"{to_prefix}-metrics-health-sweep", ) new_text = new_text.replace( f"`0 6 * * *`", f"`{loop_crons.get('quality-escalation', {}).get('metrics-health-sweep', '0 6 * * 1')}`", ) if new_text != text: result.actions.append( PromoteAction( layer="definitions", description=f"update fallback reference in {path.name}", path=str(path), ) ) if not dry_run: path.write_text(new_text, encoding="utf-8") def _promote_cadence_files( engagement_repo: Path, to_phase: str, *, loop_filter: str | None, dry_run: bool, result: PromoteResult, ) -> dict[str, dict[str, str]]: if to_phase != "operate": raise PromoteError(f"promotion to '{to_phase}' not implemented yet") promoted_at = date.today().isoformat() loop_crons: dict[str, dict[str, str]] = {} for loop_id, path in _loop_paths(engagement_repo).items(): if loop_filter and loop_filter != loop_id: continue data = _load_yaml(path) current = str(data.get("phase", "")) if current == to_phase: loop_crons[loop_id] = _operate_crons(loop_id, data) continue if current != "stabilize": result.errors.append( f"{loop_id}: cannot promote from phase '{current}' to '{to_phase}'" ) continue loop_crons[loop_id] = _operate_crons(loop_id, data) result.actions.append( PromoteAction( layer="cadence", description=f"{path.relative_to(engagement_repo)}: {current} → {to_phase}", path=str(path), ) ) if dry_run: continue _apply_operate_target(data, loop_id, promoted_at) _write_yaml(path, data) roster_path = engagement_repo / "loops" / "kaizen-stack" / "roster.yaml" if roster_path.is_file() and ( loop_filter is None or loop_filter == "kaizen-improvement-stack" ): roster = _load_yaml(roster_path) if roster.get("phase") != to_phase: result.actions.append( PromoteAction( layer="cadence", description=f"roster.yaml phase → {to_phase}", path=str(roster_path), ) ) if not dry_run: roster["phase"] = to_phase _write_yaml(roster_path, roster) return loop_crons def _sync_roster_phase( engagement_repo: Path, to_phase: str, *, dry_run: bool, result: PromoteResult, ) -> None: roster_path = engagement_repo / "loops" / "kaizen-stack" / "roster.yaml" if not roster_path.is_file(): return roster = _load_yaml(roster_path) if roster.get("phase") == to_phase: return result.actions.append( PromoteAction( layer="cadence", description=f"roster.yaml phase → {to_phase}", path=str(roster_path), ) ) if not dry_run: roster["phase"] = to_phase _write_yaml(roster_path, roster) def _promote_fleet( engagement_repo: Path, engagement_slug: str, to_phase: str, *, dry_run: bool, result: PromoteResult, ) -> None: if to_phase != "operate": return _sync_roster_phase(engagement_repo, to_phase, dry_run=dry_run, result=result) for slug, root, agents in _roster_repos(engagement_repo): if not root.is_dir(): result.errors.append(f"fleet repo root missing: {root} ({slug})") continue sched = schedule_path(root) result.actions.append( PromoteAction( layer="fleet", description=f"{slug}: .kaizen/schedule.yml → weekly ({', '.join(agents)})", path=str(sched), ) ) if dry_run: continue yaml_text = engagement_schedule_yaml( engagement_slug, agents=agents, bootstrap_cadence="weekly", ) sched.parent.mkdir(parents=True, exist_ok=True) sched.write_text(yaml_text, encoding="utf-8") def _sync_activity_core( engagement_repo: Path, activity_core_root: Path | None, *, dry_run: bool, result: PromoteResult, ) -> None: ac_root = activity_core_root or Path(os.environ.get("ACTIVITY_CORE_ROOT", "")) if not ac_root or not (ac_root / "src" / "activity_core").is_dir(): result.errors.append( "activity-core root not found (set --activity-core or ACTIVITY_CORE_ROOT)" ) return if not os.environ.get("ACTCORE_DB_URL"): result.errors.append("ACTCORE_DB_URL not set — skip sync or configure database") return cmd = ["uv", "run", "python", "-m", "activity_core.sync_activity_definitions"] result.actions.append( PromoteAction( layer="sync", description="activity-core sync_activity_definitions", path=str(ac_root), ) ) if dry_run: return env = os.environ.copy() env["ACTIVITY_DEFINITION_DIRS"] = str(engagement_repo.resolve()) subprocess.run(cmd, cwd=ac_root, env=env, check=True) def promote_engagement( engagement_repo: Path, *, engagement_slug: str = "coulomb-loop", to_phase: str | None = None, loop: str | None = None, dry_run: bool = False, skip_cadence: bool = False, skip_definitions: bool = False, skip_fleet: bool = False, skip_sync: bool = False, activity_core_root: Path | None = None, ) -> PromoteResult: """Run atomic cadence promotion (or dry-run plan).""" engagement_repo = engagement_repo.resolve() result = PromoteResult() if not engagement_repo.is_dir(): result.errors.append(f"engagement repo not found: {engagement_repo}") return result # Resolve target phase from first loop if not specified if to_phase is None: for path in _loop_paths(engagement_repo).values(): data = _load_yaml(path) nxt = _next_phase(str(data.get("phase", "bootstrap"))) if nxt: to_phase = nxt break if to_phase is None: result.errors.append("could not infer target phase (all loops at operate?)") return result loop_crons: dict[str, dict[str, str]] = {} from_phase = ( PHASE_ORDER[PHASE_ORDER.index(to_phase) - 1] if to_phase in PHASE_ORDER[1:] else "stabilize" ) try: if not skip_cadence: loop_crons = _promote_cadence_files( engagement_repo, to_phase, loop_filter=loop, dry_run=dry_run, result=result, ) else: for loop_id, path in _loop_paths(engagement_repo).items(): if loop and loop != loop_id: continue loop_crons[loop_id] = _operate_crons(loop_id, _load_yaml(path)) if ( not skip_definitions and from_phase in PHASE_FILE_PREFIX and to_phase in PHASE_FILE_PREFIX ): if from_phase != to_phase: _promote_definitions( engagement_repo, from_phase, to_phase, loop_crons, dry_run=dry_run, result=result, ) if not skip_fleet: _promote_fleet( engagement_repo, engagement_slug, to_phase, dry_run=dry_run, result=result, ) if not skip_sync: _sync_activity_core( engagement_repo, activity_core_root, dry_run=dry_run, result=result, ) except (PromoteError, ScheduleError, subprocess.CalledProcessError) as exc: result.errors.append(str(exc)) return result