Files
kaizen-agentic/tests/test_engagement_promote.py
tegwick 0c2df43a2f
Some checks failed
ci / test (push) Failing after 13m58s
Add schedule promote for atomic cadence promotion
Orchestrates cadence.yml, activity-definitions, fleet schedule.yml,
and activity-core sync in one command. Supports --dry-run and
--fleet-only for repairing partial promotions.
2026-06-19 01:57:56 +02:00

162 lines
5.2 KiB
Python

"""Tests for atomic engagement cadence promotion."""
from __future__ import annotations
from pathlib import Path
import yaml
from click.testing import CliRunner
from kaizen_agentic.cli import cli
from kaizen_agentic.engagement_promote import promote_engagement
from kaizen_agentic.schedule import schedule_path
def _write_cadence(
repo: Path,
loop_dir: str,
loop_id: str,
phase: str,
*,
with_operate_target: bool = True,
) -> None:
data: dict = {"loop": loop_id, "phase": phase, "regulator_approval": "approved"}
if phase == "stabilize" and with_operate_target:
if loop_id == "kaizen-improvement-stack":
data["operate_target"] = {
"phase": "operate",
"chain": {
"metrics": "0 8 * * 1",
"coach": "0 9 * * 1",
"optimization": "0 10 * * 1",
},
}
elif loop_id == "registry-hygiene":
data["operate_target"] = {
"phase": "operate",
"cron": "0 9 * * 1",
"batch_size": 2,
}
path = repo / "loops" / loop_dir / "cadence.yml"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(yaml.safe_dump(data, sort_keys=False), encoding="utf-8")
def _write_definition(repo: Path, name: str, cron: str, cadence: str) -> None:
defs_dir = repo / "activity-definitions"
defs_dir.mkdir(parents=True, exist_ok=True)
stem = name.removeprefix("daily-")
content = f"""---
id: coulomb-daily-{stem}
name: Daily test {stem}
enabled: true
trigger:
type: cron
cron_expression: "{cron}"
context_sources:
- type: kaizen
query: discover_kaizen_scheduled_repos
params:
cadence: {cadence}
---
# Daily {stem}
"""
(repo / "activity-definitions" / name).write_text(content, encoding="utf-8")
def _engagement_fixture(tmp_path: Path) -> Path:
repo = tmp_path / "coulomb-loop"
repo.mkdir()
_write_cadence(repo, "kaizen-stack", "kaizen-improvement-stack", "stabilize")
_write_cadence(repo, "registry-hygiene", "registry-hygiene", "stabilize")
_write_definition(repo, "daily-coach-orientation.md", "0 9 * * *", "daily")
_write_definition(repo, "daily-metrics-optimize.md", "0 8 * * *", "daily")
_write_definition(repo, "daily-optimization-review.md", "0 10 * * *", "daily")
_write_definition(repo, "daily-registry-hygiene-sweep.md", "0 7 * * *", "daily")
roster = {
"version": "1",
"loop": "kaizen-improvement-stack",
"phase": "bootstrap",
"active": [
{
"slug": "pilot-a",
"root": str(tmp_path / "pilot-a"),
"agents": ["coach", "optimization"],
}
],
}
roster_path = repo / "loops" / "kaizen-stack" / "roster.yaml"
roster_path.parent.mkdir(parents=True, exist_ok=True)
roster_path.write_text(yaml.safe_dump(roster, sort_keys=False), encoding="utf-8")
(tmp_path / "pilot-a").mkdir()
return repo
class TestEngagementPromote:
def test_dry_run_lists_all_layers(self, tmp_path: Path) -> None:
repo = _engagement_fixture(tmp_path)
result = promote_engagement(
repo,
to_phase="operate",
dry_run=True,
skip_sync=True,
)
layers = {a.layer for a in result.actions}
assert "cadence" in layers
assert "definitions" in layers
assert "fleet" in layers
assert result.ok
def test_promote_updates_cadence_and_definitions(self, tmp_path: Path) -> None:
repo = _engagement_fixture(tmp_path)
result = promote_engagement(
repo,
to_phase="operate",
skip_fleet=True,
skip_sync=True,
)
assert result.ok
cadence = yaml.safe_load(
(repo / "loops" / "kaizen-stack" / "cadence.yml").read_text()
)
assert cadence["phase"] == "operate"
assert "operate_target" not in cadence
assert (repo / "activity-definitions" / "weekly-coach-orientation.md").is_file()
assert not (
repo / "activity-definitions" / "daily-coach-orientation.md"
).is_file()
def test_fleet_only_writes_schedule(self, tmp_path: Path) -> None:
repo = _engagement_fixture(tmp_path)
# Pre-promote cadence so fleet-only path applies
promote_engagement(repo, to_phase="operate", skip_fleet=True, skip_sync=True)
result = promote_engagement(
repo,
to_phase="operate",
skip_cadence=True,
skip_definitions=True,
skip_sync=True,
)
sched = schedule_path(tmp_path / "pilot-a")
assert sched.is_file()
assert "cadence: weekly" in sched.read_text()
def test_cli_fleet_only(self, tmp_path: Path) -> None:
repo = _engagement_fixture(tmp_path)
runner = CliRunner()
result = runner.invoke(
cli,
[
"schedule",
"promote",
"--engagement-repo",
str(repo),
"--fleet-only",
"--skip-sync",
],
)
assert result.exit_code == 0, result.output
assert "[fleet]" in result.output