Files
state-hub/tests/test_consistency_check.py

1121 lines
44 KiB
Python

"""Unit tests for the stateless / pure-function layer of consistency_check.py.
Covers:
- parse_frontmatter — YAML frontmatter splitting
- parse_task_blocks — ```task``` block extraction
- get_tasks_from_workplan — block vs. frontmatter fallback
- ConsistencyReport — issue accumulation and severity filtering
- render_text — text rendering smoke tests
- report_to_dict — serialisation shape
No network calls, no DB, no live API — these tests run fully offline.
"""
from __future__ import annotations
import os
import shutil
import subprocess
import sys
from pathlib import Path
import pytest
# Make scripts/ importable without installing
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from consistency_check import (
ConsistencyReport,
Issue,
FILE_TO_DB_WORKSTREAM_STATUS,
RENORMALIZATION_RULES,
STATUS_ORDER,
_BACKGROUND_CHECKS,
_detect_behind_remote,
_git_pull,
_patch_frontmatter_field,
_patch_task_status_in_file,
_report_needs_action,
archive_closed_workplans,
canonical_workplan_filename,
check_repo,
consistency_exit_code,
fix_repo,
get_tasks_from_workplan,
iter_workplan_files,
normalise_workstream_status,
parse_frontmatter,
parse_task_blocks,
render_text,
render_renormalization_guide,
report_to_dict,
)
from api.workplan_status import ready_review_status
# _detect_behind_remote and _git_pull are re-exported from consistency_check
# for backward compat; their canonical implementations live in repo_sync.py.
# ---------------------------------------------------------------------------
# parse_frontmatter
# ---------------------------------------------------------------------------
class TestParseFrontmatter:
def test_valid_frontmatter(self):
text = "---\nid: WP-001\ntitle: Test\nstatus: active\n---\n\n# Body"
meta, body = parse_frontmatter(text)
assert meta["id"] == "WP-001"
assert meta["title"] == "Test"
assert meta["status"] == "active"
assert "# Body" in body
def test_no_frontmatter_returns_empty_meta(self):
text = "# Just a heading\n\nSome body text."
meta, body = parse_frontmatter(text)
assert meta == {}
assert body == text
def test_empty_frontmatter(self):
text = "---\n---\n\nBody only."
meta, body = parse_frontmatter(text)
assert isinstance(meta, dict)
assert "Body only." in body
def test_frontmatter_with_quoted_strings(self):
text = '---\nid: "WP-007"\ntitle: "My Workplan"\n---\nBody'
meta, _ = parse_frontmatter(text)
assert meta["id"] == "WP-007"
assert meta["title"] == "My Workplan"
def test_incomplete_frontmatter_delimiter(self):
"""Only one --- means no valid frontmatter block."""
text = "---\nid: foo\nNo closing delimiter"
meta, body = parse_frontmatter(text)
assert meta == {}
def test_body_preserved_fully(self):
text = "---\nid: X\n---\nLine1\nLine2\nLine3"
_, body = parse_frontmatter(text)
assert "Line1" in body
assert "Line2" in body
assert "Line3" in body
# ---------------------------------------------------------------------------
# parse_task_blocks
# ---------------------------------------------------------------------------
class TestParseTaskBlocks:
def test_single_task_block(self):
body = "## T01\n\n```task\nid: WP-T01\nstatus: done\npriority: high\n```\n"
tasks = parse_task_blocks(body)
assert len(tasks) == 1
assert tasks[0]["id"] == "WP-T01"
assert tasks[0]["status"] == "done"
def test_multiple_task_blocks(self):
body = (
"## T01\n```task\nid: T01\nstatus: done\n```\n\n"
"## T02\n```task\nid: T02\nstatus: todo\n```\n"
)
tasks = parse_task_blocks(body)
assert len(tasks) == 2
assert tasks[0]["id"] == "T01"
assert tasks[1]["id"] == "T02"
def test_no_task_blocks(self):
body = "# Workplan\n\nNo task blocks here.\n"
tasks = parse_task_blocks(body)
assert tasks == []
def test_task_block_with_all_fields(self):
body = (
"```task\n"
"id: CUST-WP-0008-T01\n"
"status: done\n"
"priority: critical\n"
"assignee: custodian\n"
'state_hub_task_id: "abc-123"\n'
"```\n"
)
tasks = parse_task_blocks(body)
assert len(tasks) == 1
t = tasks[0]
assert t["id"] == "CUST-WP-0008-T01"
assert t["status"] == "done"
assert t["priority"] == "critical"
assert t["state_hub_task_id"] == "abc-123"
def test_ignores_non_task_fenced_blocks(self):
body = (
"```python\nprint('hello')\n```\n\n"
"```task\nid: T01\nstatus: todo\n```\n"
"```bash\necho hi\n```\n"
)
tasks = parse_task_blocks(body)
assert len(tasks) == 1
assert tasks[0]["id"] == "T01"
def test_extracts_markdown_after_task_block_as_description(self):
body = (
"### T01 — Configure registry\n\n"
"```task\nid: T01\nstatus: todo\npriority: high\n```\n\n"
"Update `app.ini` so packages are enabled.\n\n"
"```ini\n[packages]\nENABLED = true\n```\n\n"
"**Done when:** a rendered config contains the package settings.\n\n"
"---\n\n"
"### T02 — Next task\n\n"
"```task\nid: T02\nstatus: todo\n```\n"
)
tasks = parse_task_blocks(body)
assert tasks[0]["description"] == (
"Update `app.ini` so packages are enabled.\n\n"
"```ini\n[packages]\nENABLED = true\n```\n\n"
"**Done when:** a rendered config contains the package settings."
)
assert "Next task" not in tasks[0]["description"]
def test_keeps_nested_headings_inside_task_description(self):
body = (
"## T01\n\n"
"```task\nid: T01\nstatus: todo\n```\n\n"
"Intro.\n\n"
"### Notes\n\n"
"- Keep this nested heading in the task content.\n\n"
"## T02\n\n"
"```task\nid: T02\nstatus: todo\n```\n"
)
tasks = parse_task_blocks(body)
assert "### Notes" in tasks[0]["description"]
assert "## T02" not in tasks[0]["description"]
# ---------------------------------------------------------------------------
# get_tasks_from_workplan
# ---------------------------------------------------------------------------
class TestGetTasksFromWorkplan:
def test_prefers_task_blocks_over_frontmatter(self):
meta = {"tasks": [{"id": "from-frontmatter", "status": "todo"}]}
body = "```task\nid: from-block\nstatus: done\n```\n"
tasks = get_tasks_from_workplan(meta, body)
assert len(tasks) == 1
assert tasks[0]["id"] == "from-block"
def test_falls_back_to_frontmatter_tasks(self):
meta = {"tasks": [{"id": "FM-01", "status": "todo"}, {"id": "FM-02", "status": "done"}]}
body = "# No task blocks here\n"
tasks = get_tasks_from_workplan(meta, body)
assert len(tasks) == 2
assert tasks[0]["id"] == "FM-01"
assert tasks[1]["id"] == "FM-02"
def test_returns_empty_when_no_tasks(self):
meta = {}
body = "# Just prose\n"
tasks = get_tasks_from_workplan(meta, body)
assert tasks == []
class TestArchivedWorkplans:
def test_canonical_workplan_filename_strips_archive_date_prefix(self):
assert canonical_workplan_filename(Path("260501-CUST-WP-0001-demo.md")) == "CUST-WP-0001-demo.md"
assert canonical_workplan_filename(Path("CUST-WP-0001-demo.md")) == "CUST-WP-0001-demo.md"
def test_iter_workplan_files_includes_archived_directory(self, tmp_path):
workplans = tmp_path / "workplans"
archived = workplans / "archived"
archived.mkdir(parents=True)
active_file = workplans / "CUST-WP-0001-active.md"
archived_file = archived / "260501-CUST-WP-0000-old.md"
active_file.write_text("---\nid: CUST-WP-0001\n---\n", encoding="utf-8")
archived_file.write_text("---\nid: CUST-WP-0000\n---\n", encoding="utf-8")
assert iter_workplan_files(workplans) == [active_file, archived_file]
def test_archive_closed_workplans_moves_done_file_with_date_prefix(self, tmp_path):
repo = tmp_path / "repo"
workplans = repo / "workplans"
workplans.mkdir(parents=True)
wp = workplans / "CUST-WP-0001-demo.md"
wp.write_text(
"---\n"
"id: CUST-WP-0001\n"
"type: workplan\n"
"title: Demo\n"
"domain: custodian\n"
"status: done\n"
"owner: codex\n"
"created: \"2026-05-01\"\n"
"---\n"
"```task\nid: CUST-WP-0001-T01\nstatus: done\npriority: medium\n```\n",
encoding="utf-8",
)
moved = archive_closed_workplans(str(repo), completion_date="260501")
assert moved == ["workplans/CUST-WP-0001-demo.md -> workplans/archived/260501-CUST-WP-0001-demo.md"]
assert not wp.exists()
assert (workplans / "archived" / "260501-CUST-WP-0001-demo.md").exists()
def test_archive_closed_workplans_leaves_open_tasks_in_place(self, tmp_path):
repo = tmp_path / "repo"
workplans = repo / "workplans"
workplans.mkdir(parents=True)
wp = workplans / "CUST-WP-0001-demo.md"
wp.write_text(
"---\nid: CUST-WP-0001\ntype: workplan\ntitle: Demo\ndomain: custodian\n"
"status: done\nowner: codex\ncreated: \"2026-05-01\"\n---\n"
"```task\nid: CUST-WP-0001-T01\nstatus: todo\npriority: medium\n```\n",
encoding="utf-8",
)
assert archive_closed_workplans(str(repo), completion_date="260501") == []
assert wp.exists()
def test_ignores_non_list_frontmatter_tasks(self):
meta = {"tasks": "not-a-list"}
body = "# No blocks\n"
tasks = get_tasks_from_workplan(meta, body)
assert tasks == []
# ---------------------------------------------------------------------------
# ConsistencyReport — issue accumulation and severity filtering
# ---------------------------------------------------------------------------
class TestConsistencyReport:
def _report(self):
return ConsistencyReport(repo_slug="test-repo", repo_path="/tmp/test-repo")
def test_add_fail_issue(self):
r = self._report()
r.add(severity="FAIL", check_id="C-01", message="No workplans/ dir")
assert len(r.issues) == 1
assert len(r.failures) == 1
assert len(r.warnings) == 0
assert len(r.infos) == 0
def test_add_warn_issue(self):
r = self._report()
r.add(severity="WARN", check_id="C-04", message="Status drift", fixable=True)
assert len(r.warnings) == 1
assert r.warnings[0].fixable is True
def test_add_info_issue(self):
r = self._report()
r.add(severity="INFO", check_id="C-08", message="Completed orphan")
assert len(r.infos) == 1
def test_mixed_severities(self):
r = self._report()
r.add(severity="FAIL", check_id="C-01", message="F1")
r.add(severity="FAIL", check_id="C-03", message="F2")
r.add(severity="WARN", check_id="C-04", message="W1")
r.add(severity="INFO", check_id="C-08", message="I1")
assert len(r.failures) == 2
assert len(r.warnings) == 1
assert len(r.infos) == 1
assert len(r.issues) == 4
def test_add_returns_issue(self):
r = self._report()
issue = r.add(severity="FAIL", check_id="C-01", message="msg")
assert isinstance(issue, Issue)
assert issue.severity == "FAIL"
def test_empty_report(self):
r = self._report()
assert r.failures == []
assert r.warnings == []
assert r.infos == []
assert r.issues == []
# ---------------------------------------------------------------------------
# render_text
# ---------------------------------------------------------------------------
class TestRenderText:
def _clean_report(self):
return ConsistencyReport(repo_slug="my-repo", repo_path="/path/to/repo")
def test_pass_result_shown(self):
r = self._clean_report()
text = render_text(r)
assert "PASS" in text
assert "my-repo" in text
def test_fail_result_shown(self):
r = self._clean_report()
r.add(severity="FAIL", check_id="C-01", message="No workplans/ directory")
text = render_text(r)
assert "FAIL" in text
assert "C-01" in text
assert "No workplans/ directory" in text
def test_warn_result_shown(self):
r = self._clean_report()
r.add(severity="WARN", check_id="C-04", message="Status drift", fixable=True)
text = render_text(r)
assert "WARN" in text
assert "[fixable]" in text
def test_fix_applied_shown(self):
r = self._clean_report()
r.fixes_applied.append("C-04: updated status active→done for WP-001")
text = render_text(r)
assert "FIXES APPLIED" in text
assert "C-04" in text
def test_summary_counts_correct(self):
r = self._clean_report()
r.add(severity="FAIL", check_id="C-01", message="f")
r.add(severity="WARN", check_id="C-04", message="w")
r.add(severity="INFO", check_id="C-08", message="i")
text = render_text(r)
assert "1 fail" in text
assert "1 warn" in text
assert "1 info" in text
def test_info_hidden_when_show_info_false(self):
r = self._clean_report()
r.add(severity="INFO", check_id="C-08", message="Completed orphan")
text = render_text(r, show_info=False)
assert "C-08" not in text
class TestRepoLookupFailures:
def test_check_repo_reports_api_error_distinct_from_missing_repo(self, monkeypatch):
def fake_get(_api_base, _path, params=None, *, return_error=False):
assert params is None
return {"_error": "Connection refused"} if return_error else None
monkeypatch.setattr("consistency_check._api_get", fake_get)
report = check_repo("http://127.0.0.1:8000", "state-hub")
assert report.repo_path == "(unknown)"
assert len(report.failures) == 1
assert report.failures[0].check_id == "C-00"
assert "Could not query State Hub API" in report.failures[0].message
assert "not found in state-hub DB" not in report.failures[0].message
def test_fix_repo_does_not_push_when_repo_lookup_failed(self, monkeypatch):
pushed_paths = []
def fake_get(_api_base, _path, params=None, *, return_error=False):
assert params is None
return {"_error": "Connection refused"} if return_error else None
def fake_push(repo_path):
pushed_paths.append(repo_path)
return True, "pushed"
monkeypatch.setattr("consistency_check._api_get", fake_get)
monkeypatch.setattr("consistency_check._git_push", fake_push)
report = fix_repo("http://127.0.0.1:8000", "state-hub")
assert report.failures[0].check_id == "C-00"
assert pushed_paths == []
assert report.fixes_applied == []
class TestRenormalizationGuide:
def test_registry_contains_c23_rule(self):
rule = next(rule for rule in RENORMALIZATION_RULES if rule.check_id == "C-23")
assert "Planning-state" in rule.invariant
assert "TestLifecycleRenormalization" in rule.test_anchor
def test_guide_points_to_next_guard_pattern(self):
text = render_renormalization_guide()
assert "C-23" in text
assert "Add The Next Guard" in text
assert "Add detection in check_repo" in text
assert "Add the repair branch in fix_repo" in text
# ---------------------------------------------------------------------------
# report_to_dict
# ---------------------------------------------------------------------------
class TestReportToDict:
def test_clean_report_result_is_pass(self):
r = ConsistencyReport(repo_slug="r", repo_path="/p")
d = report_to_dict(r)
assert d["result"] == "pass"
assert d["summary"] == {"fail": 0, "warn": 0, "info": 0}
assert d["issues"] == []
def test_fail_result(self):
r = ConsistencyReport(repo_slug="r", repo_path="/p")
r.add(severity="FAIL", check_id="C-01", message="missing")
d = report_to_dict(r)
assert d["result"] == "fail"
assert d["summary"]["fail"] == 1
def test_warn_only_result(self):
r = ConsistencyReport(repo_slug="r", repo_path="/p")
r.add(severity="WARN", check_id="C-04", message="drift")
d = report_to_dict(r)
assert d["result"] == "warn"
assert d["summary"]["warn"] == 1
assert d["summary"]["fail"] == 0
def test_issue_fields_serialised(self):
r = ConsistencyReport(repo_slug="r", repo_path="/p")
r.add(
severity="FAIL",
check_id="C-03",
message="stale ref",
file_path="workplans/WP.md",
db_id="abc-123",
file_value="abc-123",
db_value="",
)
d = report_to_dict(r)
issue = d["issues"][0]
assert issue["severity"] == "FAIL"
assert issue["check_id"] == "C-03"
assert issue["file_path"] == "workplans/WP.md"
assert issue["db_id"] == "abc-123"
def test_fixes_applied_in_dict(self):
r = ConsistencyReport(repo_slug="r", repo_path="/p")
r.fixes_applied.append("C-04: status fixed")
d = report_to_dict(r)
assert "C-04: status fixed" in d["fixes_applied"]
def test_repo_slug_and_path_preserved(self):
r = ConsistencyReport(repo_slug="the-custodian", repo_path="/home/worsch/the-custodian")
d = report_to_dict(r)
assert d["repo_slug"] == "the-custodian"
assert d["repo_path"] == "/home/worsch/the-custodian"
# ---------------------------------------------------------------------------
# Consistency exit contract
# ---------------------------------------------------------------------------
class TestConsistencyExitContract:
def _report(self, severity: str | None = None) -> ConsistencyReport:
r = ConsistencyReport(repo_slug="r", repo_path="/p")
if severity:
r.add(severity=severity, check_id="C-test", message="issue")
return r
def test_strict_cli_exit_code_clean_success(self):
assert consistency_exit_code([self._report()]) == 0
def test_strict_cli_exit_code_warning_only_is_two(self):
assert consistency_exit_code([self._report("WARN")]) == 2
def test_strict_cli_exit_code_failure_is_one(self):
assert consistency_exit_code([self._report("FAIL")]) == 1
def test_remote_all_treats_warning_only_as_success(self):
assert consistency_exit_code([self._report("WARN")], remote_all=True) == 0
class TestConsistencyMakeTargets:
CONSISTENCY_TARGETS = [
("check-consistency", ["REPO=state-hub"]),
("fix-consistency", ["REPO=state-hub"]),
("fix-consistency-remote", []),
("check-consistency-here", []),
("fix-consistency-here", []),
("check-consistency-all", []),
("fix-consistency-all", []),
]
def _fake_uv(self, tmp_path: Path) -> Path:
fake_uv = tmp_path / "uv"
fake_uv.write_text('#!/bin/sh\nexit "${FAKE_UV_EXIT:-0}"\n', encoding="utf-8")
fake_uv.chmod(0o755)
return fake_uv
def _run_make(self, tmp_path: Path, target: str, args: list[str], uv_exit: int):
if shutil.which("make") is None:
pytest.skip("make is not installed")
repo_root = Path(__file__).resolve().parent.parent
env = os.environ.copy()
env["FAKE_UV_EXIT"] = str(uv_exit)
return subprocess.run(
[
"make",
"--no-print-directory",
"-f",
"Makefile",
target,
*args,
f"UV={self._fake_uv(tmp_path)}",
],
cwd=repo_root,
env=env,
text=True,
capture_output=True,
check=False,
)
def test_makefile_uv_resolver_checks_local_bin_for_non_login_shells(self):
repo_root = Path(__file__).resolve().parent.parent
makefile = (repo_root / "Makefile").read_text(encoding="utf-8")
assert "UV ?=" in makefile
assert "$$HOME/.local/bin/uv" in makefile
@pytest.mark.parametrize(("target", "args"), CONSISTENCY_TARGETS)
def test_consistency_targets_treat_warning_exit_as_success(self, tmp_path, target, args):
result = self._run_make(tmp_path, target, args, uv_exit=2)
assert result.returncode == 0, result.stdout + result.stderr
def test_fix_consistency_target_treats_clean_exit_as_success(self, tmp_path):
result = self._run_make(tmp_path, "fix-consistency", ["REPO=state-hub"], uv_exit=0)
assert result.returncode == 0, result.stdout + result.stderr
def test_fix_consistency_target_keeps_failure_non_zero(self, tmp_path):
result = self._run_make(tmp_path, "fix-consistency", ["REPO=state-hub"], uv_exit=1)
assert result.returncode != 0
# ---------------------------------------------------------------------------
# Status vocabulary normalisation
# ---------------------------------------------------------------------------
class TestNormaliseWorkstreamStatus:
"""Legacy workplan/API vocabulary maps to the canonical lifecycle model."""
def test_done_maps_to_finished(self):
assert normalise_workstream_status("done") == "finished"
def test_completed_maps_to_finished(self):
assert normalise_workstream_status("completed") == "finished"
def test_accepted_maps_to_finished(self):
assert normalise_workstream_status("accepted") == "finished"
def test_todo_maps_to_ready_by_default(self):
assert normalise_workstream_status("todo") == "ready"
def test_todo_maps_to_active_when_started(self):
assert normalise_workstream_status("todo", has_started=True) == "active"
def test_active_is_identity(self):
assert normalise_workstream_status("active") == "active"
def test_blocked_is_identity(self):
assert normalise_workstream_status("blocked") == "blocked"
def test_archived_is_identity(self):
assert normalise_workstream_status("archived") == "archived"
def test_unknown_value_returned_as_is(self):
# Don't crash on unexpected values — return them unchanged
assert normalise_workstream_status("foobar") == "foobar"
def test_map_constant_covers_legacy_aliases(self):
assert FILE_TO_DB_WORKSTREAM_STATUS["done"] == "finished"
assert FILE_TO_DB_WORKSTREAM_STATUS["completed"] == "finished"
def test_c04_no_spurious_drift_when_done_vs_finished(self):
"""done (file) vs finished (DB) must NOT be reported as C-04 drift."""
assert normalise_workstream_status("done") == normalise_workstream_status("completed")
def test_c04_real_drift_still_detected(self):
"""done (file) vs active (DB) IS real drift and must be detected."""
assert normalise_workstream_status("done") != normalise_workstream_status("active")
class TestReadyReviewStatus:
def _repo_with_commit(self, tmp_path):
import subprocess
repo = tmp_path / "repo"
repo.mkdir()
subprocess.run(["git", "-C", str(repo), "init"], check=True, capture_output=True)
subprocess.run(["git", "-C", str(repo), "config", "user.email", "test@example.invalid"], check=True)
subprocess.run(["git", "-C", str(repo), "config", "user.name", "Test"], check=True)
tracked = repo / "src" / "app.py"
tracked.parent.mkdir()
tracked.write_text("print('one')\n", encoding="utf-8")
subprocess.run(["git", "-C", str(repo), "add", "."], check=True, capture_output=True)
subprocess.run(["git", "-C", str(repo), "commit", "-m", "init"], check=True, capture_output=True)
base = subprocess.check_output(["git", "-C", str(repo), "rev-parse", "HEAD"], text=True).strip()
return repo, tracked, base
def test_same_commit_is_current(self, tmp_path):
repo, _tracked, base = self._repo_with_commit(tmp_path)
result = ready_review_status(repo, base)
assert result.needs_review is False
def test_changed_context_path_needs_review(self, tmp_path):
import subprocess
repo, tracked, base = self._repo_with_commit(tmp_path)
tracked.write_text("print('two')\n", encoding="utf-8")
subprocess.run(["git", "-C", str(repo), "add", "."], check=True, capture_output=True)
subprocess.run(["git", "-C", str(repo), "commit", "-m", "change app"], check=True, capture_output=True)
result = ready_review_status(repo, base, ["src"])
assert result.needs_review is True
assert result.changed_paths == ("src/app.py",)
def test_unrelated_context_path_does_not_need_review(self, tmp_path):
import subprocess
repo, _tracked, base = self._repo_with_commit(tmp_path)
docs = repo / "docs" / "note.md"
docs.parent.mkdir()
docs.write_text("note\n", encoding="utf-8")
subprocess.run(["git", "-C", str(repo), "add", "."], check=True, capture_output=True)
subprocess.run(["git", "-C", str(repo), "commit", "-m", "docs"], check=True, capture_output=True)
result = ready_review_status(repo, base, ["src"])
assert result.needs_review is False
# ---------------------------------------------------------------------------
# STATUS_ORDER / no-regress rule (T01 / C-15)
# ---------------------------------------------------------------------------
class TestStatusOrder:
"""STATUS_ORDER drives the no-regress rule: DB-ahead wins, file-ahead syncs."""
def test_todo_is_lowest(self):
assert STATUS_ORDER["todo"] == 0
def test_done_and_cancel_are_highest(self):
assert STATUS_ORDER["done"] == STATUS_ORDER["cancel"] == 2
def test_progress_and_wait_are_mid(self):
assert STATUS_ORDER["progress"] == STATUS_ORDER["wait"] == 1
def test_db_ahead_detected(self):
"""done (DB) vs todo (file) — DB is ahead."""
assert STATUS_ORDER["done"] > STATUS_ORDER["todo"]
def test_file_ahead_detected(self):
"""done (file) vs todo (DB) — file is ahead, should sync."""
assert STATUS_ORDER["todo"] < STATUS_ORDER["done"]
def test_same_rank_treated_as_db_ahead(self):
"""progress (DB) vs wait (file) — same rank, no regression."""
assert STATUS_ORDER["progress"] >= STATUS_ORDER["wait"]
def test_todo_to_done_is_regression(self):
"""Applying file=todo to DB=done would be a regression."""
db_rank = STATUS_ORDER["done"]
file_rank = STATUS_ORDER["todo"]
assert db_rank >= file_rank # → should emit C-15, not C-10
# ---------------------------------------------------------------------------
# _detect_behind_remote (T02 / C-16)
# ---------------------------------------------------------------------------
class TestDetectBehindRemote:
"""_detect_behind_remote must return False on any error (best-effort)."""
def test_returns_false_for_nonexistent_path(self):
assert _detect_behind_remote("/nonexistent/path/xyz") is False
def test_returns_false_for_non_git_dir(self, tmp_path):
assert _detect_behind_remote(str(tmp_path)) is False
def test_returns_false_for_repo_without_remote(self, tmp_path):
"""A local-only repo with no remote tracking branch → not behind."""
import subprocess
subprocess.run(["git", "init", str(tmp_path)], capture_output=True)
subprocess.run(
["git", "-C", str(tmp_path), "commit", "--allow-empty", "-m", "init"],
capture_output=True,
)
# No remote configured → rev-parse @{u} fails → best-effort returns False
assert _detect_behind_remote(str(tmp_path)) is False
def test_returns_false_when_up_to_date(self, tmp_path):
"""Clone from a local bare repo, no new commits → not behind."""
import subprocess
bare = tmp_path / "bare.git"
clone = tmp_path / "clone"
bare.mkdir()
subprocess.run(["git", "init", "--bare", str(bare)], capture_output=True)
subprocess.run(["git", "clone", str(bare), str(clone)], capture_output=True)
subprocess.run(
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "init"],
capture_output=True,
)
subprocess.run(["git", "-C", str(clone), "push", "origin", "HEAD"], capture_output=True)
assert _detect_behind_remote(str(clone)) is False
def test_returns_false_when_local_ahead(self, tmp_path):
"""Local has unpushed commits — NOT considered behind."""
import subprocess
bare = tmp_path / "bare.git"
clone = tmp_path / "clone"
bare.mkdir()
subprocess.run(["git", "init", "--bare", str(bare)], capture_output=True)
subprocess.run(["git", "clone", str(bare), str(clone)], capture_output=True)
subprocess.run(
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "init"],
capture_output=True,
)
subprocess.run(["git", "-C", str(clone), "push", "origin", "HEAD"], capture_output=True)
# Add a local-only commit (not pushed)
subprocess.run(
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "local only"],
capture_output=True,
)
assert _detect_behind_remote(str(clone)) is False
def test_returns_true_when_behind(self, tmp_path):
"""Remote has a commit the clone doesn't → clone is behind."""
import subprocess
bare = tmp_path / "bare.git"
clone = tmp_path / "clone"
bare.mkdir()
subprocess.run(["git", "init", "--bare", str(bare)], capture_output=True)
subprocess.run(["git", "clone", str(bare), str(clone)], capture_output=True)
# Initial commit pushed so there's an upstream ref
subprocess.run(
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "init"],
capture_output=True,
)
subprocess.run(["git", "-C", str(clone), "push", "origin", "HEAD"], capture_output=True)
# Add a commit directly in the bare repo (simulates remote-only progress)
work = tmp_path / "work"
subprocess.run(["git", "clone", str(bare), str(work)], capture_output=True)
subprocess.run(
["git", "-C", str(work), "commit", "--allow-empty", "-m", "remote commit"],
capture_output=True,
)
subprocess.run(["git", "-C", str(work), "push"], capture_output=True)
# After fetch the clone should appear behind
assert _detect_behind_remote(str(clone)) is True
# ---------------------------------------------------------------------------
# _patch_task_status_in_file (T03 writeback)
# ---------------------------------------------------------------------------
class TestPatchTaskStatusInFile:
"""_patch_task_status_in_file must only change the status field in the
matching task block and leave everything else untouched."""
def _make_workplan(self, tmp_path, content: str) -> Path:
f = tmp_path / "workplan.md"
f.write_text(content, encoding="utf-8")
return f
def test_patches_matching_task_block(self, tmp_path):
content = (
"---\nid: WP-001\n---\n"
"## My Task\n\n"
"```task\n"
"id: T01\n"
"status: todo\n"
"priority: high\n"
"```\n"
)
f = self._make_workplan(tmp_path, content)
result = _patch_task_status_in_file(f, "T01", "done")
assert result is True
patched = f.read_text()
assert "status: done" in patched
assert "status: todo" not in patched
assert "priority: high" in patched # other fields untouched
def test_does_not_patch_non_matching_block(self, tmp_path):
content = (
"---\nid: WP-001\n---\n"
"```task\n"
"id: T02\n"
"status: todo\n"
"```\n"
)
f = self._make_workplan(tmp_path, content)
result = _patch_task_status_in_file(f, "T01", "done")
assert result is False
assert "status: todo" in f.read_text()
def test_patches_correct_block_among_multiple(self, tmp_path):
content = (
"---\nid: WP-001\n---\n"
"```task\nid: T01\nstatus: todo\n```\n"
"```task\nid: T02\nstatus: progress\n```\n"
)
f = self._make_workplan(tmp_path, content)
_patch_task_status_in_file(f, "T02", "done")
patched = f.read_text()
assert "id: T01\nstatus: todo" in patched
assert "id: T02\nstatus: done" in patched
def test_does_not_touch_non_status_fields(self, tmp_path):
content = (
"---\nid: WP\n---\n"
"```task\nid: T01\nstatus: todo\npriority: high\nstate_hub_task_id: \"abc\"\n```\n"
)
f = self._make_workplan(tmp_path, content)
_patch_task_status_in_file(f, "T01", "done")
patched = f.read_text()
assert "priority: high" in patched
assert 'state_hub_task_id: "abc"' in patched
def test_idempotent_when_already_correct(self, tmp_path):
content = "---\nid: WP\n---\n```task\nid: T01\nstatus: done\n```\n"
f = self._make_workplan(tmp_path, content)
result = _patch_task_status_in_file(f, "T01", "done")
# status matches, re.sub produces same text → no write → False
assert result is False
class TestPatchFrontmatterField:
def test_patches_existing_scalar_field(self, tmp_path):
f = tmp_path / "workplan.md"
f.write_text("---\nid: WP-001\nstatus: proposed\n---\nBody\n", encoding="utf-8")
assert _patch_frontmatter_field(f, "status", "active") is True
patched = f.read_text(encoding="utf-8")
assert "status: active" in patched
assert "id: WP-001" in patched
def test_is_idempotent_when_field_already_matches(self, tmp_path):
f = tmp_path / "workplan.md"
f.write_text("---\nid: WP-001\nstatus: active\n---\nBody\n", encoding="utf-8")
assert _patch_frontmatter_field(f, "status", "active") is False
assert f.read_text(encoding="utf-8").count("status: active") == 1
class TestLifecycleRenormalization:
def _make_repo(self, tmp_path, status: str = "proposed") -> Path:
repo = tmp_path / "repo"
workplans = repo / "workplans"
workplans.mkdir(parents=True)
(workplans / "STATE-WP-0001-demo.md").write_text(
"---\n"
"id: STATE-WP-0001\n"
"type: workplan\n"
"title: Demo\n"
"domain: custodian\n"
"repo: state-hub\n"
f"status: {status}\n"
"owner: codex\n"
"state_hub_workstream_id: \"ws-1\"\n"
"---\n\n"
"## Implement Demo\n\n"
"```task\n"
"id: STATE-WP-0001-T01\n"
"status: progress\n"
"priority: high\n"
"state_hub_task_id: \"task-1\"\n"
"```\n",
encoding="utf-8",
)
return repo
def _api_get_for_repo(self, repo: Path):
ws = {
"id": "ws-1",
"repo_id": "repo-1",
"topic_id": "topic-1",
"slug": "state-wp-0001",
"title": "Demo",
"status": "proposed",
"planning_priority": None,
"planning_order": None,
}
task = {
"id": "task-1",
"title": "Implement Demo",
"status": "progress",
"description": None,
}
def fake_get(_api_base, path, params=None, **_kwargs):
if path == "/repos/state-hub":
import socket
return {
"id": "repo-1",
"slug": "state-hub",
"local_path": str(repo),
"host_paths": {socket.gethostname(): str(repo)},
}
if path == "/workstreams/ws-1":
return ws
if path == "/tasks/task-1":
return task
if path == "/tasks" and params == {"workstream_id": "ws-1"}:
return [task]
if path == "/workstreams/ws-1/dependencies":
return []
if path == "/workstreams" and params == {"repo_id": "repo-1"}:
return [ws]
if path == "/workstreams" and params and params.get("topic_id") == "topic-1":
return []
return []
return fake_get
def test_active_task_in_planning_workplan_reports_c23_not_c04(self, tmp_path, monkeypatch):
repo = self._make_repo(tmp_path)
monkeypatch.setattr("consistency_check._api_get", self._api_get_for_repo(repo))
report = check_repo("http://unused", "state-hub")
check_ids = [issue.check_id for issue in report.issues]
assert "C-23" in check_ids
assert "C-04" not in check_ids
issue = next(issue for issue in report.issues if issue.check_id == "C-23")
assert issue.fixable is True
assert issue.file_value == "proposed"
def test_fix_repo_repairs_planning_workplan_with_active_task(self, tmp_path, monkeypatch):
repo = self._make_repo(tmp_path)
wp = repo / "workplans" / "STATE-WP-0001-demo.md"
patches = []
def fake_patch(_api_base, path, body):
patches.append((path, body))
return {"ok": True}
monkeypatch.setattr("consistency_check._api_get", self._api_get_for_repo(repo))
monkeypatch.setattr("consistency_check._api_patch", fake_patch)
monkeypatch.setattr("consistency_check._detect_behind_remote", lambda _repo_path: False)
monkeypatch.setattr("consistency_check._detect_ahead_of_remote", lambda _repo_path: 0)
monkeypatch.setattr("consistency_check._git_commit_writeback", lambda *args, **kwargs: True)
monkeypatch.setattr("consistency_check._write_custodian_brief", lambda *args, **kwargs: False)
monkeypatch.setattr("consistency_check._git_push", lambda _repo_path: (True, "pushed"))
report = fix_repo("http://unused", "state-hub")
assert ("/workstreams/ws-1", {"status": "active"}) in patches
assert "status: active" in wp.read_text(encoding="utf-8")
assert any("C-23 fixed" in fix for fix in report.fixes_applied)
# ---------------------------------------------------------------------------
# _git_pull (T02 remote fix helper)
# ---------------------------------------------------------------------------
class TestGitPull:
def test_returns_false_for_nonexistent_path(self):
ok, msg = _git_pull("/nonexistent/path")
assert ok is False
assert msg
def test_returns_true_for_up_to_date_repo(self, tmp_path):
import subprocess
bare = tmp_path / "bare.git"
clone = tmp_path / "clone"
bare.mkdir()
subprocess.run(["git", "init", "--bare", str(bare)], capture_output=True)
subprocess.run(["git", "clone", str(bare), str(clone)], capture_output=True)
subprocess.run(
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "init"],
capture_output=True,
)
subprocess.run(["git", "-C", str(clone), "push", "origin", "HEAD"], capture_output=True)
ok, msg = _git_pull(str(clone))
assert ok is True
def test_pulls_new_remote_commit(self, tmp_path):
import subprocess
bare = tmp_path / "bare.git"
clone = tmp_path / "clone"
work = tmp_path / "work"
bare.mkdir()
subprocess.run(["git", "init", "--bare", str(bare)], capture_output=True)
subprocess.run(["git", "clone", str(bare), str(clone)], capture_output=True)
subprocess.run(
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "init"],
capture_output=True,
)
subprocess.run(["git", "-C", str(clone), "push", "origin", "HEAD"], capture_output=True)
# Push a new commit from a second clone
subprocess.run(["git", "clone", str(bare), str(work)], capture_output=True)
subprocess.run(
["git", "-C", str(work), "commit", "--allow-empty", "-m", "remote work"],
capture_output=True,
)
subprocess.run(["git", "-C", str(work), "push"], capture_output=True)
# Pull should succeed and bring in the new commit
ok, msg = _git_pull(str(clone))
assert ok is True
head = subprocess.run(
["git", "-C", str(clone), "log", "--oneline", "-1"],
capture_output=True, text=True,
).stdout
assert "remote work" in head
# ---------------------------------------------------------------------------
# _report_needs_action (smart skip logic)
# ---------------------------------------------------------------------------
class TestReportNeedsAction:
def _make_report(self, issues: list[tuple[str, str, bool]]) -> ConsistencyReport:
"""Build a report from (severity, check_id, fixable) tuples."""
r = ConsistencyReport(repo_slug="test", repo_path="/tmp/test")
for severity, check_id, fixable in issues:
r.add(severity=severity, check_id=check_id, message="test", fixable=fixable)
return r
def test_clean_report_not_behind_is_skipped(self):
r = self._make_report([])
assert _report_needs_action(r, behind_remote=False) is False
def test_behind_remote_always_needs_action(self):
r = self._make_report([])
assert _report_needs_action(r, behind_remote=True) is True
def test_fail_always_needs_action(self):
r = self._make_report([("FAIL", "C-07", False)])
assert _report_needs_action(r, behind_remote=False) is True
def test_c08_only_is_background_noise(self):
"""C-08 (legacy archived workstream) alone should not trigger a fix cycle."""
r = self._make_report([("INFO", "C-08", False)])
assert _report_needs_action(r, behind_remote=False) is False
def test_c08_plus_actionable_warn_needs_action(self):
r = self._make_report([("INFO", "C-08", False), ("WARN", "C-10", True)])
assert _report_needs_action(r, behind_remote=False) is True
def test_fixable_warn_needs_action(self):
r = self._make_report([("WARN", "C-15", True)])
assert _report_needs_action(r, behind_remote=False) is True
def test_non_fixable_warn_needs_action(self):
"""Non-fixable WARNs (e.g. C-07 ghost) still warrant attention."""
r = self._make_report([("WARN", "C-14", False)])
assert _report_needs_action(r, behind_remote=False) is True
def test_background_checks_constant_contains_c08(self):
assert "C-08" in _BACKGROUND_CHECKS
# --- ahead_of_remote (T04 push-seal integration) ---
def test_ahead_of_remote_triggers_action(self):
"""Unpushed commits from a prior failed push must re-trigger a fix cycle."""
r = self._make_report([])
assert _report_needs_action(r, behind_remote=False, ahead_of_remote=3) is True
def test_zero_ahead_clean_is_skipped(self):
"""Fully in-sync repo with no issues is skipped."""
r = self._make_report([])
assert _report_needs_action(r, behind_remote=False, ahead_of_remote=0) is False
def test_ahead_default_is_zero(self):
"""Default value of ahead_of_remote=0 preserves backward compat."""
r = self._make_report([])
assert _report_needs_action(r, behind_remote=False) is False
def test_both_behind_and_ahead_triggers(self):
"""Diverged repo (both behind and ahead) needs action."""
r = self._make_report([])
assert _report_needs_action(r, behind_remote=True, ahead_of_remote=5) is True