generated from coulomb/repo-seed
1121 lines
44 KiB
Python
1121 lines
44 KiB
Python
"""Unit tests for the stateless / pure-function layer of consistency_check.py.
|
|
|
|
Covers:
|
|
- parse_frontmatter — YAML frontmatter splitting
|
|
- parse_task_blocks — ```task``` block extraction
|
|
- get_tasks_from_workplan — block vs. frontmatter fallback
|
|
- ConsistencyReport — issue accumulation and severity filtering
|
|
- render_text — text rendering smoke tests
|
|
- report_to_dict — serialisation shape
|
|
|
|
No network calls, no DB, no live API — these tests run fully offline.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
# Make scripts/ importable without installing
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
|
|
|
from consistency_check import (
|
|
ConsistencyReport,
|
|
Issue,
|
|
FILE_TO_DB_WORKSTREAM_STATUS,
|
|
RENORMALIZATION_RULES,
|
|
STATUS_ORDER,
|
|
_BACKGROUND_CHECKS,
|
|
_detect_behind_remote,
|
|
_git_pull,
|
|
_patch_frontmatter_field,
|
|
_patch_task_status_in_file,
|
|
_report_needs_action,
|
|
archive_closed_workplans,
|
|
canonical_workplan_filename,
|
|
check_repo,
|
|
consistency_exit_code,
|
|
fix_repo,
|
|
get_tasks_from_workplan,
|
|
iter_workplan_files,
|
|
normalise_workstream_status,
|
|
parse_frontmatter,
|
|
parse_task_blocks,
|
|
render_text,
|
|
render_renormalization_guide,
|
|
report_to_dict,
|
|
)
|
|
from api.workplan_status import ready_review_status
|
|
# _detect_behind_remote and _git_pull are re-exported from consistency_check
|
|
# for backward compat; their canonical implementations live in repo_sync.py.
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# parse_frontmatter
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestParseFrontmatter:
|
|
def test_valid_frontmatter(self):
|
|
text = "---\nid: WP-001\ntitle: Test\nstatus: active\n---\n\n# Body"
|
|
meta, body = parse_frontmatter(text)
|
|
assert meta["id"] == "WP-001"
|
|
assert meta["title"] == "Test"
|
|
assert meta["status"] == "active"
|
|
assert "# Body" in body
|
|
|
|
def test_no_frontmatter_returns_empty_meta(self):
|
|
text = "# Just a heading\n\nSome body text."
|
|
meta, body = parse_frontmatter(text)
|
|
assert meta == {}
|
|
assert body == text
|
|
|
|
def test_empty_frontmatter(self):
|
|
text = "---\n---\n\nBody only."
|
|
meta, body = parse_frontmatter(text)
|
|
assert isinstance(meta, dict)
|
|
assert "Body only." in body
|
|
|
|
def test_frontmatter_with_quoted_strings(self):
|
|
text = '---\nid: "WP-007"\ntitle: "My Workplan"\n---\nBody'
|
|
meta, _ = parse_frontmatter(text)
|
|
assert meta["id"] == "WP-007"
|
|
assert meta["title"] == "My Workplan"
|
|
|
|
def test_incomplete_frontmatter_delimiter(self):
|
|
"""Only one --- means no valid frontmatter block."""
|
|
text = "---\nid: foo\nNo closing delimiter"
|
|
meta, body = parse_frontmatter(text)
|
|
assert meta == {}
|
|
|
|
def test_body_preserved_fully(self):
|
|
text = "---\nid: X\n---\nLine1\nLine2\nLine3"
|
|
_, body = parse_frontmatter(text)
|
|
assert "Line1" in body
|
|
assert "Line2" in body
|
|
assert "Line3" in body
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# parse_task_blocks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestParseTaskBlocks:
|
|
def test_single_task_block(self):
|
|
body = "## T01\n\n```task\nid: WP-T01\nstatus: done\npriority: high\n```\n"
|
|
tasks = parse_task_blocks(body)
|
|
assert len(tasks) == 1
|
|
assert tasks[0]["id"] == "WP-T01"
|
|
assert tasks[0]["status"] == "done"
|
|
|
|
def test_multiple_task_blocks(self):
|
|
body = (
|
|
"## T01\n```task\nid: T01\nstatus: done\n```\n\n"
|
|
"## T02\n```task\nid: T02\nstatus: todo\n```\n"
|
|
)
|
|
tasks = parse_task_blocks(body)
|
|
assert len(tasks) == 2
|
|
assert tasks[0]["id"] == "T01"
|
|
assert tasks[1]["id"] == "T02"
|
|
|
|
def test_no_task_blocks(self):
|
|
body = "# Workplan\n\nNo task blocks here.\n"
|
|
tasks = parse_task_blocks(body)
|
|
assert tasks == []
|
|
|
|
def test_task_block_with_all_fields(self):
|
|
body = (
|
|
"```task\n"
|
|
"id: CUST-WP-0008-T01\n"
|
|
"status: done\n"
|
|
"priority: critical\n"
|
|
"assignee: custodian\n"
|
|
'state_hub_task_id: "abc-123"\n'
|
|
"```\n"
|
|
)
|
|
tasks = parse_task_blocks(body)
|
|
assert len(tasks) == 1
|
|
t = tasks[0]
|
|
assert t["id"] == "CUST-WP-0008-T01"
|
|
assert t["status"] == "done"
|
|
assert t["priority"] == "critical"
|
|
assert t["state_hub_task_id"] == "abc-123"
|
|
|
|
def test_ignores_non_task_fenced_blocks(self):
|
|
body = (
|
|
"```python\nprint('hello')\n```\n\n"
|
|
"```task\nid: T01\nstatus: todo\n```\n"
|
|
"```bash\necho hi\n```\n"
|
|
)
|
|
tasks = parse_task_blocks(body)
|
|
assert len(tasks) == 1
|
|
assert tasks[0]["id"] == "T01"
|
|
|
|
def test_extracts_markdown_after_task_block_as_description(self):
|
|
body = (
|
|
"### T01 — Configure registry\n\n"
|
|
"```task\nid: T01\nstatus: todo\npriority: high\n```\n\n"
|
|
"Update `app.ini` so packages are enabled.\n\n"
|
|
"```ini\n[packages]\nENABLED = true\n```\n\n"
|
|
"**Done when:** a rendered config contains the package settings.\n\n"
|
|
"---\n\n"
|
|
"### T02 — Next task\n\n"
|
|
"```task\nid: T02\nstatus: todo\n```\n"
|
|
)
|
|
tasks = parse_task_blocks(body)
|
|
assert tasks[0]["description"] == (
|
|
"Update `app.ini` so packages are enabled.\n\n"
|
|
"```ini\n[packages]\nENABLED = true\n```\n\n"
|
|
"**Done when:** a rendered config contains the package settings."
|
|
)
|
|
assert "Next task" not in tasks[0]["description"]
|
|
|
|
def test_keeps_nested_headings_inside_task_description(self):
|
|
body = (
|
|
"## T01\n\n"
|
|
"```task\nid: T01\nstatus: todo\n```\n\n"
|
|
"Intro.\n\n"
|
|
"### Notes\n\n"
|
|
"- Keep this nested heading in the task content.\n\n"
|
|
"## T02\n\n"
|
|
"```task\nid: T02\nstatus: todo\n```\n"
|
|
)
|
|
tasks = parse_task_blocks(body)
|
|
assert "### Notes" in tasks[0]["description"]
|
|
assert "## T02" not in tasks[0]["description"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_tasks_from_workplan
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetTasksFromWorkplan:
|
|
def test_prefers_task_blocks_over_frontmatter(self):
|
|
meta = {"tasks": [{"id": "from-frontmatter", "status": "todo"}]}
|
|
body = "```task\nid: from-block\nstatus: done\n```\n"
|
|
tasks = get_tasks_from_workplan(meta, body)
|
|
assert len(tasks) == 1
|
|
assert tasks[0]["id"] == "from-block"
|
|
|
|
def test_falls_back_to_frontmatter_tasks(self):
|
|
meta = {"tasks": [{"id": "FM-01", "status": "todo"}, {"id": "FM-02", "status": "done"}]}
|
|
body = "# No task blocks here\n"
|
|
tasks = get_tasks_from_workplan(meta, body)
|
|
assert len(tasks) == 2
|
|
assert tasks[0]["id"] == "FM-01"
|
|
assert tasks[1]["id"] == "FM-02"
|
|
|
|
def test_returns_empty_when_no_tasks(self):
|
|
meta = {}
|
|
body = "# Just prose\n"
|
|
tasks = get_tasks_from_workplan(meta, body)
|
|
assert tasks == []
|
|
|
|
|
|
class TestArchivedWorkplans:
|
|
def test_canonical_workplan_filename_strips_archive_date_prefix(self):
|
|
assert canonical_workplan_filename(Path("260501-CUST-WP-0001-demo.md")) == "CUST-WP-0001-demo.md"
|
|
assert canonical_workplan_filename(Path("CUST-WP-0001-demo.md")) == "CUST-WP-0001-demo.md"
|
|
|
|
def test_iter_workplan_files_includes_archived_directory(self, tmp_path):
|
|
workplans = tmp_path / "workplans"
|
|
archived = workplans / "archived"
|
|
archived.mkdir(parents=True)
|
|
active_file = workplans / "CUST-WP-0001-active.md"
|
|
archived_file = archived / "260501-CUST-WP-0000-old.md"
|
|
active_file.write_text("---\nid: CUST-WP-0001\n---\n", encoding="utf-8")
|
|
archived_file.write_text("---\nid: CUST-WP-0000\n---\n", encoding="utf-8")
|
|
|
|
assert iter_workplan_files(workplans) == [active_file, archived_file]
|
|
|
|
def test_archive_closed_workplans_moves_done_file_with_date_prefix(self, tmp_path):
|
|
repo = tmp_path / "repo"
|
|
workplans = repo / "workplans"
|
|
workplans.mkdir(parents=True)
|
|
wp = workplans / "CUST-WP-0001-demo.md"
|
|
wp.write_text(
|
|
"---\n"
|
|
"id: CUST-WP-0001\n"
|
|
"type: workplan\n"
|
|
"title: Demo\n"
|
|
"domain: custodian\n"
|
|
"status: done\n"
|
|
"owner: codex\n"
|
|
"created: \"2026-05-01\"\n"
|
|
"---\n"
|
|
"```task\nid: CUST-WP-0001-T01\nstatus: done\npriority: medium\n```\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
moved = archive_closed_workplans(str(repo), completion_date="260501")
|
|
|
|
assert moved == ["workplans/CUST-WP-0001-demo.md -> workplans/archived/260501-CUST-WP-0001-demo.md"]
|
|
assert not wp.exists()
|
|
assert (workplans / "archived" / "260501-CUST-WP-0001-demo.md").exists()
|
|
|
|
def test_archive_closed_workplans_leaves_open_tasks_in_place(self, tmp_path):
|
|
repo = tmp_path / "repo"
|
|
workplans = repo / "workplans"
|
|
workplans.mkdir(parents=True)
|
|
wp = workplans / "CUST-WP-0001-demo.md"
|
|
wp.write_text(
|
|
"---\nid: CUST-WP-0001\ntype: workplan\ntitle: Demo\ndomain: custodian\n"
|
|
"status: done\nowner: codex\ncreated: \"2026-05-01\"\n---\n"
|
|
"```task\nid: CUST-WP-0001-T01\nstatus: todo\npriority: medium\n```\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
assert archive_closed_workplans(str(repo), completion_date="260501") == []
|
|
assert wp.exists()
|
|
|
|
def test_ignores_non_list_frontmatter_tasks(self):
|
|
meta = {"tasks": "not-a-list"}
|
|
body = "# No blocks\n"
|
|
tasks = get_tasks_from_workplan(meta, body)
|
|
assert tasks == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ConsistencyReport — issue accumulation and severity filtering
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestConsistencyReport:
|
|
def _report(self):
|
|
return ConsistencyReport(repo_slug="test-repo", repo_path="/tmp/test-repo")
|
|
|
|
def test_add_fail_issue(self):
|
|
r = self._report()
|
|
r.add(severity="FAIL", check_id="C-01", message="No workplans/ dir")
|
|
assert len(r.issues) == 1
|
|
assert len(r.failures) == 1
|
|
assert len(r.warnings) == 0
|
|
assert len(r.infos) == 0
|
|
|
|
def test_add_warn_issue(self):
|
|
r = self._report()
|
|
r.add(severity="WARN", check_id="C-04", message="Status drift", fixable=True)
|
|
assert len(r.warnings) == 1
|
|
assert r.warnings[0].fixable is True
|
|
|
|
def test_add_info_issue(self):
|
|
r = self._report()
|
|
r.add(severity="INFO", check_id="C-08", message="Completed orphan")
|
|
assert len(r.infos) == 1
|
|
|
|
def test_mixed_severities(self):
|
|
r = self._report()
|
|
r.add(severity="FAIL", check_id="C-01", message="F1")
|
|
r.add(severity="FAIL", check_id="C-03", message="F2")
|
|
r.add(severity="WARN", check_id="C-04", message="W1")
|
|
r.add(severity="INFO", check_id="C-08", message="I1")
|
|
assert len(r.failures) == 2
|
|
assert len(r.warnings) == 1
|
|
assert len(r.infos) == 1
|
|
assert len(r.issues) == 4
|
|
|
|
def test_add_returns_issue(self):
|
|
r = self._report()
|
|
issue = r.add(severity="FAIL", check_id="C-01", message="msg")
|
|
assert isinstance(issue, Issue)
|
|
assert issue.severity == "FAIL"
|
|
|
|
def test_empty_report(self):
|
|
r = self._report()
|
|
assert r.failures == []
|
|
assert r.warnings == []
|
|
assert r.infos == []
|
|
assert r.issues == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# render_text
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestRenderText:
|
|
def _clean_report(self):
|
|
return ConsistencyReport(repo_slug="my-repo", repo_path="/path/to/repo")
|
|
|
|
def test_pass_result_shown(self):
|
|
r = self._clean_report()
|
|
text = render_text(r)
|
|
assert "PASS" in text
|
|
assert "my-repo" in text
|
|
|
|
def test_fail_result_shown(self):
|
|
r = self._clean_report()
|
|
r.add(severity="FAIL", check_id="C-01", message="No workplans/ directory")
|
|
text = render_text(r)
|
|
assert "FAIL" in text
|
|
assert "C-01" in text
|
|
assert "No workplans/ directory" in text
|
|
|
|
def test_warn_result_shown(self):
|
|
r = self._clean_report()
|
|
r.add(severity="WARN", check_id="C-04", message="Status drift", fixable=True)
|
|
text = render_text(r)
|
|
assert "WARN" in text
|
|
assert "[fixable]" in text
|
|
|
|
def test_fix_applied_shown(self):
|
|
r = self._clean_report()
|
|
r.fixes_applied.append("C-04: updated status active→done for WP-001")
|
|
text = render_text(r)
|
|
assert "FIXES APPLIED" in text
|
|
assert "C-04" in text
|
|
|
|
def test_summary_counts_correct(self):
|
|
r = self._clean_report()
|
|
r.add(severity="FAIL", check_id="C-01", message="f")
|
|
r.add(severity="WARN", check_id="C-04", message="w")
|
|
r.add(severity="INFO", check_id="C-08", message="i")
|
|
text = render_text(r)
|
|
assert "1 fail" in text
|
|
assert "1 warn" in text
|
|
assert "1 info" in text
|
|
|
|
def test_info_hidden_when_show_info_false(self):
|
|
r = self._clean_report()
|
|
r.add(severity="INFO", check_id="C-08", message="Completed orphan")
|
|
text = render_text(r, show_info=False)
|
|
assert "C-08" not in text
|
|
|
|
|
|
class TestRepoLookupFailures:
|
|
def test_check_repo_reports_api_error_distinct_from_missing_repo(self, monkeypatch):
|
|
def fake_get(_api_base, _path, params=None, *, return_error=False):
|
|
assert params is None
|
|
return {"_error": "Connection refused"} if return_error else None
|
|
|
|
monkeypatch.setattr("consistency_check._api_get", fake_get)
|
|
|
|
report = check_repo("http://127.0.0.1:8000", "state-hub")
|
|
|
|
assert report.repo_path == "(unknown)"
|
|
assert len(report.failures) == 1
|
|
assert report.failures[0].check_id == "C-00"
|
|
assert "Could not query State Hub API" in report.failures[0].message
|
|
assert "not found in state-hub DB" not in report.failures[0].message
|
|
|
|
def test_fix_repo_does_not_push_when_repo_lookup_failed(self, monkeypatch):
|
|
pushed_paths = []
|
|
|
|
def fake_get(_api_base, _path, params=None, *, return_error=False):
|
|
assert params is None
|
|
return {"_error": "Connection refused"} if return_error else None
|
|
|
|
def fake_push(repo_path):
|
|
pushed_paths.append(repo_path)
|
|
return True, "pushed"
|
|
|
|
monkeypatch.setattr("consistency_check._api_get", fake_get)
|
|
monkeypatch.setattr("consistency_check._git_push", fake_push)
|
|
|
|
report = fix_repo("http://127.0.0.1:8000", "state-hub")
|
|
|
|
assert report.failures[0].check_id == "C-00"
|
|
assert pushed_paths == []
|
|
assert report.fixes_applied == []
|
|
|
|
|
|
class TestRenormalizationGuide:
|
|
def test_registry_contains_c23_rule(self):
|
|
rule = next(rule for rule in RENORMALIZATION_RULES if rule.check_id == "C-23")
|
|
assert "Planning-state" in rule.invariant
|
|
assert "TestLifecycleRenormalization" in rule.test_anchor
|
|
|
|
def test_guide_points_to_next_guard_pattern(self):
|
|
text = render_renormalization_guide()
|
|
assert "C-23" in text
|
|
assert "Add The Next Guard" in text
|
|
assert "Add detection in check_repo" in text
|
|
assert "Add the repair branch in fix_repo" in text
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# report_to_dict
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestReportToDict:
|
|
def test_clean_report_result_is_pass(self):
|
|
r = ConsistencyReport(repo_slug="r", repo_path="/p")
|
|
d = report_to_dict(r)
|
|
assert d["result"] == "pass"
|
|
assert d["summary"] == {"fail": 0, "warn": 0, "info": 0}
|
|
assert d["issues"] == []
|
|
|
|
def test_fail_result(self):
|
|
r = ConsistencyReport(repo_slug="r", repo_path="/p")
|
|
r.add(severity="FAIL", check_id="C-01", message="missing")
|
|
d = report_to_dict(r)
|
|
assert d["result"] == "fail"
|
|
assert d["summary"]["fail"] == 1
|
|
|
|
def test_warn_only_result(self):
|
|
r = ConsistencyReport(repo_slug="r", repo_path="/p")
|
|
r.add(severity="WARN", check_id="C-04", message="drift")
|
|
d = report_to_dict(r)
|
|
assert d["result"] == "warn"
|
|
assert d["summary"]["warn"] == 1
|
|
assert d["summary"]["fail"] == 0
|
|
|
|
def test_issue_fields_serialised(self):
|
|
r = ConsistencyReport(repo_slug="r", repo_path="/p")
|
|
r.add(
|
|
severity="FAIL",
|
|
check_id="C-03",
|
|
message="stale ref",
|
|
file_path="workplans/WP.md",
|
|
db_id="abc-123",
|
|
file_value="abc-123",
|
|
db_value="",
|
|
)
|
|
d = report_to_dict(r)
|
|
issue = d["issues"][0]
|
|
assert issue["severity"] == "FAIL"
|
|
assert issue["check_id"] == "C-03"
|
|
assert issue["file_path"] == "workplans/WP.md"
|
|
assert issue["db_id"] == "abc-123"
|
|
|
|
def test_fixes_applied_in_dict(self):
|
|
r = ConsistencyReport(repo_slug="r", repo_path="/p")
|
|
r.fixes_applied.append("C-04: status fixed")
|
|
d = report_to_dict(r)
|
|
assert "C-04: status fixed" in d["fixes_applied"]
|
|
|
|
def test_repo_slug_and_path_preserved(self):
|
|
r = ConsistencyReport(repo_slug="the-custodian", repo_path="/home/worsch/the-custodian")
|
|
d = report_to_dict(r)
|
|
assert d["repo_slug"] == "the-custodian"
|
|
assert d["repo_path"] == "/home/worsch/the-custodian"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Consistency exit contract
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestConsistencyExitContract:
|
|
def _report(self, severity: str | None = None) -> ConsistencyReport:
|
|
r = ConsistencyReport(repo_slug="r", repo_path="/p")
|
|
if severity:
|
|
r.add(severity=severity, check_id="C-test", message="issue")
|
|
return r
|
|
|
|
def test_strict_cli_exit_code_clean_success(self):
|
|
assert consistency_exit_code([self._report()]) == 0
|
|
|
|
def test_strict_cli_exit_code_warning_only_is_two(self):
|
|
assert consistency_exit_code([self._report("WARN")]) == 2
|
|
|
|
def test_strict_cli_exit_code_failure_is_one(self):
|
|
assert consistency_exit_code([self._report("FAIL")]) == 1
|
|
|
|
def test_remote_all_treats_warning_only_as_success(self):
|
|
assert consistency_exit_code([self._report("WARN")], remote_all=True) == 0
|
|
|
|
|
|
class TestConsistencyMakeTargets:
|
|
CONSISTENCY_TARGETS = [
|
|
("check-consistency", ["REPO=state-hub"]),
|
|
("fix-consistency", ["REPO=state-hub"]),
|
|
("fix-consistency-remote", []),
|
|
("check-consistency-here", []),
|
|
("fix-consistency-here", []),
|
|
("check-consistency-all", []),
|
|
("fix-consistency-all", []),
|
|
]
|
|
|
|
def _fake_uv(self, tmp_path: Path) -> Path:
|
|
fake_uv = tmp_path / "uv"
|
|
fake_uv.write_text('#!/bin/sh\nexit "${FAKE_UV_EXIT:-0}"\n', encoding="utf-8")
|
|
fake_uv.chmod(0o755)
|
|
return fake_uv
|
|
|
|
def _run_make(self, tmp_path: Path, target: str, args: list[str], uv_exit: int):
|
|
if shutil.which("make") is None:
|
|
pytest.skip("make is not installed")
|
|
repo_root = Path(__file__).resolve().parent.parent
|
|
env = os.environ.copy()
|
|
env["FAKE_UV_EXIT"] = str(uv_exit)
|
|
return subprocess.run(
|
|
[
|
|
"make",
|
|
"--no-print-directory",
|
|
"-f",
|
|
"Makefile",
|
|
target,
|
|
*args,
|
|
f"UV={self._fake_uv(tmp_path)}",
|
|
],
|
|
cwd=repo_root,
|
|
env=env,
|
|
text=True,
|
|
capture_output=True,
|
|
check=False,
|
|
)
|
|
|
|
def test_makefile_uv_resolver_checks_local_bin_for_non_login_shells(self):
|
|
repo_root = Path(__file__).resolve().parent.parent
|
|
makefile = (repo_root / "Makefile").read_text(encoding="utf-8")
|
|
assert "UV ?=" in makefile
|
|
assert "$$HOME/.local/bin/uv" in makefile
|
|
|
|
@pytest.mark.parametrize(("target", "args"), CONSISTENCY_TARGETS)
|
|
def test_consistency_targets_treat_warning_exit_as_success(self, tmp_path, target, args):
|
|
result = self._run_make(tmp_path, target, args, uv_exit=2)
|
|
assert result.returncode == 0, result.stdout + result.stderr
|
|
|
|
def test_fix_consistency_target_treats_clean_exit_as_success(self, tmp_path):
|
|
result = self._run_make(tmp_path, "fix-consistency", ["REPO=state-hub"], uv_exit=0)
|
|
assert result.returncode == 0, result.stdout + result.stderr
|
|
|
|
def test_fix_consistency_target_keeps_failure_non_zero(self, tmp_path):
|
|
result = self._run_make(tmp_path, "fix-consistency", ["REPO=state-hub"], uv_exit=1)
|
|
assert result.returncode != 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status vocabulary normalisation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestNormaliseWorkstreamStatus:
|
|
"""Legacy workplan/API vocabulary maps to the canonical lifecycle model."""
|
|
|
|
def test_done_maps_to_finished(self):
|
|
assert normalise_workstream_status("done") == "finished"
|
|
|
|
def test_completed_maps_to_finished(self):
|
|
assert normalise_workstream_status("completed") == "finished"
|
|
|
|
def test_accepted_maps_to_finished(self):
|
|
assert normalise_workstream_status("accepted") == "finished"
|
|
|
|
def test_todo_maps_to_ready_by_default(self):
|
|
assert normalise_workstream_status("todo") == "ready"
|
|
|
|
def test_todo_maps_to_active_when_started(self):
|
|
assert normalise_workstream_status("todo", has_started=True) == "active"
|
|
|
|
def test_active_is_identity(self):
|
|
assert normalise_workstream_status("active") == "active"
|
|
|
|
def test_blocked_is_identity(self):
|
|
assert normalise_workstream_status("blocked") == "blocked"
|
|
|
|
def test_archived_is_identity(self):
|
|
assert normalise_workstream_status("archived") == "archived"
|
|
|
|
def test_unknown_value_returned_as_is(self):
|
|
# Don't crash on unexpected values — return them unchanged
|
|
assert normalise_workstream_status("foobar") == "foobar"
|
|
|
|
def test_map_constant_covers_legacy_aliases(self):
|
|
assert FILE_TO_DB_WORKSTREAM_STATUS["done"] == "finished"
|
|
assert FILE_TO_DB_WORKSTREAM_STATUS["completed"] == "finished"
|
|
|
|
def test_c04_no_spurious_drift_when_done_vs_finished(self):
|
|
"""done (file) vs finished (DB) must NOT be reported as C-04 drift."""
|
|
assert normalise_workstream_status("done") == normalise_workstream_status("completed")
|
|
|
|
def test_c04_real_drift_still_detected(self):
|
|
"""done (file) vs active (DB) IS real drift and must be detected."""
|
|
assert normalise_workstream_status("done") != normalise_workstream_status("active")
|
|
|
|
|
|
class TestReadyReviewStatus:
|
|
def _repo_with_commit(self, tmp_path):
|
|
import subprocess
|
|
repo = tmp_path / "repo"
|
|
repo.mkdir()
|
|
subprocess.run(["git", "-C", str(repo), "init"], check=True, capture_output=True)
|
|
subprocess.run(["git", "-C", str(repo), "config", "user.email", "test@example.invalid"], check=True)
|
|
subprocess.run(["git", "-C", str(repo), "config", "user.name", "Test"], check=True)
|
|
tracked = repo / "src" / "app.py"
|
|
tracked.parent.mkdir()
|
|
tracked.write_text("print('one')\n", encoding="utf-8")
|
|
subprocess.run(["git", "-C", str(repo), "add", "."], check=True, capture_output=True)
|
|
subprocess.run(["git", "-C", str(repo), "commit", "-m", "init"], check=True, capture_output=True)
|
|
base = subprocess.check_output(["git", "-C", str(repo), "rev-parse", "HEAD"], text=True).strip()
|
|
return repo, tracked, base
|
|
|
|
def test_same_commit_is_current(self, tmp_path):
|
|
repo, _tracked, base = self._repo_with_commit(tmp_path)
|
|
|
|
result = ready_review_status(repo, base)
|
|
|
|
assert result.needs_review is False
|
|
|
|
def test_changed_context_path_needs_review(self, tmp_path):
|
|
import subprocess
|
|
repo, tracked, base = self._repo_with_commit(tmp_path)
|
|
tracked.write_text("print('two')\n", encoding="utf-8")
|
|
subprocess.run(["git", "-C", str(repo), "add", "."], check=True, capture_output=True)
|
|
subprocess.run(["git", "-C", str(repo), "commit", "-m", "change app"], check=True, capture_output=True)
|
|
|
|
result = ready_review_status(repo, base, ["src"])
|
|
|
|
assert result.needs_review is True
|
|
assert result.changed_paths == ("src/app.py",)
|
|
|
|
def test_unrelated_context_path_does_not_need_review(self, tmp_path):
|
|
import subprocess
|
|
repo, _tracked, base = self._repo_with_commit(tmp_path)
|
|
docs = repo / "docs" / "note.md"
|
|
docs.parent.mkdir()
|
|
docs.write_text("note\n", encoding="utf-8")
|
|
subprocess.run(["git", "-C", str(repo), "add", "."], check=True, capture_output=True)
|
|
subprocess.run(["git", "-C", str(repo), "commit", "-m", "docs"], check=True, capture_output=True)
|
|
|
|
result = ready_review_status(repo, base, ["src"])
|
|
|
|
assert result.needs_review is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# STATUS_ORDER / no-regress rule (T01 / C-15)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStatusOrder:
|
|
"""STATUS_ORDER drives the no-regress rule: DB-ahead wins, file-ahead syncs."""
|
|
|
|
def test_todo_is_lowest(self):
|
|
assert STATUS_ORDER["todo"] == 0
|
|
|
|
def test_done_and_cancel_are_highest(self):
|
|
assert STATUS_ORDER["done"] == STATUS_ORDER["cancel"] == 2
|
|
|
|
def test_progress_and_wait_are_mid(self):
|
|
assert STATUS_ORDER["progress"] == STATUS_ORDER["wait"] == 1
|
|
|
|
def test_db_ahead_detected(self):
|
|
"""done (DB) vs todo (file) — DB is ahead."""
|
|
assert STATUS_ORDER["done"] > STATUS_ORDER["todo"]
|
|
|
|
def test_file_ahead_detected(self):
|
|
"""done (file) vs todo (DB) — file is ahead, should sync."""
|
|
assert STATUS_ORDER["todo"] < STATUS_ORDER["done"]
|
|
|
|
def test_same_rank_treated_as_db_ahead(self):
|
|
"""progress (DB) vs wait (file) — same rank, no regression."""
|
|
assert STATUS_ORDER["progress"] >= STATUS_ORDER["wait"]
|
|
|
|
def test_todo_to_done_is_regression(self):
|
|
"""Applying file=todo to DB=done would be a regression."""
|
|
db_rank = STATUS_ORDER["done"]
|
|
file_rank = STATUS_ORDER["todo"]
|
|
assert db_rank >= file_rank # → should emit C-15, not C-10
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _detect_behind_remote (T02 / C-16)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDetectBehindRemote:
|
|
"""_detect_behind_remote must return False on any error (best-effort)."""
|
|
|
|
def test_returns_false_for_nonexistent_path(self):
|
|
assert _detect_behind_remote("/nonexistent/path/xyz") is False
|
|
|
|
def test_returns_false_for_non_git_dir(self, tmp_path):
|
|
assert _detect_behind_remote(str(tmp_path)) is False
|
|
|
|
def test_returns_false_for_repo_without_remote(self, tmp_path):
|
|
"""A local-only repo with no remote tracking branch → not behind."""
|
|
import subprocess
|
|
subprocess.run(["git", "init", str(tmp_path)], capture_output=True)
|
|
subprocess.run(
|
|
["git", "-C", str(tmp_path), "commit", "--allow-empty", "-m", "init"],
|
|
capture_output=True,
|
|
)
|
|
# No remote configured → rev-parse @{u} fails → best-effort returns False
|
|
assert _detect_behind_remote(str(tmp_path)) is False
|
|
|
|
def test_returns_false_when_up_to_date(self, tmp_path):
|
|
"""Clone from a local bare repo, no new commits → not behind."""
|
|
import subprocess
|
|
bare = tmp_path / "bare.git"
|
|
clone = tmp_path / "clone"
|
|
bare.mkdir()
|
|
subprocess.run(["git", "init", "--bare", str(bare)], capture_output=True)
|
|
subprocess.run(["git", "clone", str(bare), str(clone)], capture_output=True)
|
|
subprocess.run(
|
|
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "init"],
|
|
capture_output=True,
|
|
)
|
|
subprocess.run(["git", "-C", str(clone), "push", "origin", "HEAD"], capture_output=True)
|
|
assert _detect_behind_remote(str(clone)) is False
|
|
|
|
def test_returns_false_when_local_ahead(self, tmp_path):
|
|
"""Local has unpushed commits — NOT considered behind."""
|
|
import subprocess
|
|
bare = tmp_path / "bare.git"
|
|
clone = tmp_path / "clone"
|
|
bare.mkdir()
|
|
subprocess.run(["git", "init", "--bare", str(bare)], capture_output=True)
|
|
subprocess.run(["git", "clone", str(bare), str(clone)], capture_output=True)
|
|
subprocess.run(
|
|
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "init"],
|
|
capture_output=True,
|
|
)
|
|
subprocess.run(["git", "-C", str(clone), "push", "origin", "HEAD"], capture_output=True)
|
|
# Add a local-only commit (not pushed)
|
|
subprocess.run(
|
|
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "local only"],
|
|
capture_output=True,
|
|
)
|
|
assert _detect_behind_remote(str(clone)) is False
|
|
|
|
def test_returns_true_when_behind(self, tmp_path):
|
|
"""Remote has a commit the clone doesn't → clone is behind."""
|
|
import subprocess
|
|
bare = tmp_path / "bare.git"
|
|
clone = tmp_path / "clone"
|
|
bare.mkdir()
|
|
subprocess.run(["git", "init", "--bare", str(bare)], capture_output=True)
|
|
subprocess.run(["git", "clone", str(bare), str(clone)], capture_output=True)
|
|
# Initial commit pushed so there's an upstream ref
|
|
subprocess.run(
|
|
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "init"],
|
|
capture_output=True,
|
|
)
|
|
subprocess.run(["git", "-C", str(clone), "push", "origin", "HEAD"], capture_output=True)
|
|
|
|
# Add a commit directly in the bare repo (simulates remote-only progress)
|
|
work = tmp_path / "work"
|
|
subprocess.run(["git", "clone", str(bare), str(work)], capture_output=True)
|
|
subprocess.run(
|
|
["git", "-C", str(work), "commit", "--allow-empty", "-m", "remote commit"],
|
|
capture_output=True,
|
|
)
|
|
subprocess.run(["git", "-C", str(work), "push"], capture_output=True)
|
|
|
|
# After fetch the clone should appear behind
|
|
assert _detect_behind_remote(str(clone)) is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _patch_task_status_in_file (T03 writeback)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPatchTaskStatusInFile:
|
|
"""_patch_task_status_in_file must only change the status field in the
|
|
matching task block and leave everything else untouched."""
|
|
|
|
def _make_workplan(self, tmp_path, content: str) -> Path:
|
|
f = tmp_path / "workplan.md"
|
|
f.write_text(content, encoding="utf-8")
|
|
return f
|
|
|
|
def test_patches_matching_task_block(self, tmp_path):
|
|
content = (
|
|
"---\nid: WP-001\n---\n"
|
|
"## My Task\n\n"
|
|
"```task\n"
|
|
"id: T01\n"
|
|
"status: todo\n"
|
|
"priority: high\n"
|
|
"```\n"
|
|
)
|
|
f = self._make_workplan(tmp_path, content)
|
|
result = _patch_task_status_in_file(f, "T01", "done")
|
|
assert result is True
|
|
patched = f.read_text()
|
|
assert "status: done" in patched
|
|
assert "status: todo" not in patched
|
|
assert "priority: high" in patched # other fields untouched
|
|
|
|
def test_does_not_patch_non_matching_block(self, tmp_path):
|
|
content = (
|
|
"---\nid: WP-001\n---\n"
|
|
"```task\n"
|
|
"id: T02\n"
|
|
"status: todo\n"
|
|
"```\n"
|
|
)
|
|
f = self._make_workplan(tmp_path, content)
|
|
result = _patch_task_status_in_file(f, "T01", "done")
|
|
assert result is False
|
|
assert "status: todo" in f.read_text()
|
|
|
|
def test_patches_correct_block_among_multiple(self, tmp_path):
|
|
content = (
|
|
"---\nid: WP-001\n---\n"
|
|
"```task\nid: T01\nstatus: todo\n```\n"
|
|
"```task\nid: T02\nstatus: progress\n```\n"
|
|
)
|
|
f = self._make_workplan(tmp_path, content)
|
|
_patch_task_status_in_file(f, "T02", "done")
|
|
patched = f.read_text()
|
|
assert "id: T01\nstatus: todo" in patched
|
|
assert "id: T02\nstatus: done" in patched
|
|
|
|
def test_does_not_touch_non_status_fields(self, tmp_path):
|
|
content = (
|
|
"---\nid: WP\n---\n"
|
|
"```task\nid: T01\nstatus: todo\npriority: high\nstate_hub_task_id: \"abc\"\n```\n"
|
|
)
|
|
f = self._make_workplan(tmp_path, content)
|
|
_patch_task_status_in_file(f, "T01", "done")
|
|
patched = f.read_text()
|
|
assert "priority: high" in patched
|
|
assert 'state_hub_task_id: "abc"' in patched
|
|
|
|
def test_idempotent_when_already_correct(self, tmp_path):
|
|
content = "---\nid: WP\n---\n```task\nid: T01\nstatus: done\n```\n"
|
|
f = self._make_workplan(tmp_path, content)
|
|
result = _patch_task_status_in_file(f, "T01", "done")
|
|
# status matches, re.sub produces same text → no write → False
|
|
assert result is False
|
|
|
|
|
|
class TestPatchFrontmatterField:
|
|
def test_patches_existing_scalar_field(self, tmp_path):
|
|
f = tmp_path / "workplan.md"
|
|
f.write_text("---\nid: WP-001\nstatus: proposed\n---\nBody\n", encoding="utf-8")
|
|
|
|
assert _patch_frontmatter_field(f, "status", "active") is True
|
|
|
|
patched = f.read_text(encoding="utf-8")
|
|
assert "status: active" in patched
|
|
assert "id: WP-001" in patched
|
|
|
|
def test_is_idempotent_when_field_already_matches(self, tmp_path):
|
|
f = tmp_path / "workplan.md"
|
|
f.write_text("---\nid: WP-001\nstatus: active\n---\nBody\n", encoding="utf-8")
|
|
|
|
assert _patch_frontmatter_field(f, "status", "active") is False
|
|
assert f.read_text(encoding="utf-8").count("status: active") == 1
|
|
|
|
|
|
class TestLifecycleRenormalization:
|
|
def _make_repo(self, tmp_path, status: str = "proposed") -> Path:
|
|
repo = tmp_path / "repo"
|
|
workplans = repo / "workplans"
|
|
workplans.mkdir(parents=True)
|
|
(workplans / "STATE-WP-0001-demo.md").write_text(
|
|
"---\n"
|
|
"id: STATE-WP-0001\n"
|
|
"type: workplan\n"
|
|
"title: Demo\n"
|
|
"domain: custodian\n"
|
|
"repo: state-hub\n"
|
|
f"status: {status}\n"
|
|
"owner: codex\n"
|
|
"state_hub_workstream_id: \"ws-1\"\n"
|
|
"---\n\n"
|
|
"## Implement Demo\n\n"
|
|
"```task\n"
|
|
"id: STATE-WP-0001-T01\n"
|
|
"status: progress\n"
|
|
"priority: high\n"
|
|
"state_hub_task_id: \"task-1\"\n"
|
|
"```\n",
|
|
encoding="utf-8",
|
|
)
|
|
return repo
|
|
|
|
def _api_get_for_repo(self, repo: Path):
|
|
ws = {
|
|
"id": "ws-1",
|
|
"repo_id": "repo-1",
|
|
"topic_id": "topic-1",
|
|
"slug": "state-wp-0001",
|
|
"title": "Demo",
|
|
"status": "proposed",
|
|
"planning_priority": None,
|
|
"planning_order": None,
|
|
}
|
|
task = {
|
|
"id": "task-1",
|
|
"title": "Implement Demo",
|
|
"status": "progress",
|
|
"description": None,
|
|
}
|
|
|
|
def fake_get(_api_base, path, params=None, **_kwargs):
|
|
if path == "/repos/state-hub":
|
|
import socket
|
|
|
|
return {
|
|
"id": "repo-1",
|
|
"slug": "state-hub",
|
|
"local_path": str(repo),
|
|
"host_paths": {socket.gethostname(): str(repo)},
|
|
}
|
|
if path == "/workstreams/ws-1":
|
|
return ws
|
|
if path == "/tasks/task-1":
|
|
return task
|
|
if path == "/tasks" and params == {"workstream_id": "ws-1"}:
|
|
return [task]
|
|
if path == "/workstreams/ws-1/dependencies":
|
|
return []
|
|
if path == "/workstreams" and params == {"repo_id": "repo-1"}:
|
|
return [ws]
|
|
if path == "/workstreams" and params and params.get("topic_id") == "topic-1":
|
|
return []
|
|
return []
|
|
|
|
return fake_get
|
|
|
|
def test_active_task_in_planning_workplan_reports_c23_not_c04(self, tmp_path, monkeypatch):
|
|
repo = self._make_repo(tmp_path)
|
|
monkeypatch.setattr("consistency_check._api_get", self._api_get_for_repo(repo))
|
|
|
|
report = check_repo("http://unused", "state-hub")
|
|
|
|
check_ids = [issue.check_id for issue in report.issues]
|
|
assert "C-23" in check_ids
|
|
assert "C-04" not in check_ids
|
|
issue = next(issue for issue in report.issues if issue.check_id == "C-23")
|
|
assert issue.fixable is True
|
|
assert issue.file_value == "proposed"
|
|
|
|
def test_fix_repo_repairs_planning_workplan_with_active_task(self, tmp_path, monkeypatch):
|
|
repo = self._make_repo(tmp_path)
|
|
wp = repo / "workplans" / "STATE-WP-0001-demo.md"
|
|
patches = []
|
|
|
|
def fake_patch(_api_base, path, body):
|
|
patches.append((path, body))
|
|
return {"ok": True}
|
|
|
|
monkeypatch.setattr("consistency_check._api_get", self._api_get_for_repo(repo))
|
|
monkeypatch.setattr("consistency_check._api_patch", fake_patch)
|
|
monkeypatch.setattr("consistency_check._detect_behind_remote", lambda _repo_path: False)
|
|
monkeypatch.setattr("consistency_check._detect_ahead_of_remote", lambda _repo_path: 0)
|
|
monkeypatch.setattr("consistency_check._git_commit_writeback", lambda *args, **kwargs: True)
|
|
monkeypatch.setattr("consistency_check._write_custodian_brief", lambda *args, **kwargs: False)
|
|
monkeypatch.setattr("consistency_check._git_push", lambda _repo_path: (True, "pushed"))
|
|
|
|
report = fix_repo("http://unused", "state-hub")
|
|
|
|
assert ("/workstreams/ws-1", {"status": "active"}) in patches
|
|
assert "status: active" in wp.read_text(encoding="utf-8")
|
|
assert any("C-23 fixed" in fix for fix in report.fixes_applied)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _git_pull (T02 remote fix helper)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGitPull:
|
|
def test_returns_false_for_nonexistent_path(self):
|
|
ok, msg = _git_pull("/nonexistent/path")
|
|
assert ok is False
|
|
assert msg
|
|
|
|
def test_returns_true_for_up_to_date_repo(self, tmp_path):
|
|
import subprocess
|
|
bare = tmp_path / "bare.git"
|
|
clone = tmp_path / "clone"
|
|
bare.mkdir()
|
|
subprocess.run(["git", "init", "--bare", str(bare)], capture_output=True)
|
|
subprocess.run(["git", "clone", str(bare), str(clone)], capture_output=True)
|
|
subprocess.run(
|
|
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "init"],
|
|
capture_output=True,
|
|
)
|
|
subprocess.run(["git", "-C", str(clone), "push", "origin", "HEAD"], capture_output=True)
|
|
ok, msg = _git_pull(str(clone))
|
|
assert ok is True
|
|
|
|
def test_pulls_new_remote_commit(self, tmp_path):
|
|
import subprocess
|
|
bare = tmp_path / "bare.git"
|
|
clone = tmp_path / "clone"
|
|
work = tmp_path / "work"
|
|
bare.mkdir()
|
|
subprocess.run(["git", "init", "--bare", str(bare)], capture_output=True)
|
|
subprocess.run(["git", "clone", str(bare), str(clone)], capture_output=True)
|
|
subprocess.run(
|
|
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "init"],
|
|
capture_output=True,
|
|
)
|
|
subprocess.run(["git", "-C", str(clone), "push", "origin", "HEAD"], capture_output=True)
|
|
# Push a new commit from a second clone
|
|
subprocess.run(["git", "clone", str(bare), str(work)], capture_output=True)
|
|
subprocess.run(
|
|
["git", "-C", str(work), "commit", "--allow-empty", "-m", "remote work"],
|
|
capture_output=True,
|
|
)
|
|
subprocess.run(["git", "-C", str(work), "push"], capture_output=True)
|
|
# Pull should succeed and bring in the new commit
|
|
ok, msg = _git_pull(str(clone))
|
|
assert ok is True
|
|
head = subprocess.run(
|
|
["git", "-C", str(clone), "log", "--oneline", "-1"],
|
|
capture_output=True, text=True,
|
|
).stdout
|
|
assert "remote work" in head
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _report_needs_action (smart skip logic)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestReportNeedsAction:
|
|
def _make_report(self, issues: list[tuple[str, str, bool]]) -> ConsistencyReport:
|
|
"""Build a report from (severity, check_id, fixable) tuples."""
|
|
r = ConsistencyReport(repo_slug="test", repo_path="/tmp/test")
|
|
for severity, check_id, fixable in issues:
|
|
r.add(severity=severity, check_id=check_id, message="test", fixable=fixable)
|
|
return r
|
|
|
|
def test_clean_report_not_behind_is_skipped(self):
|
|
r = self._make_report([])
|
|
assert _report_needs_action(r, behind_remote=False) is False
|
|
|
|
def test_behind_remote_always_needs_action(self):
|
|
r = self._make_report([])
|
|
assert _report_needs_action(r, behind_remote=True) is True
|
|
|
|
def test_fail_always_needs_action(self):
|
|
r = self._make_report([("FAIL", "C-07", False)])
|
|
assert _report_needs_action(r, behind_remote=False) is True
|
|
|
|
def test_c08_only_is_background_noise(self):
|
|
"""C-08 (legacy archived workstream) alone should not trigger a fix cycle."""
|
|
r = self._make_report([("INFO", "C-08", False)])
|
|
assert _report_needs_action(r, behind_remote=False) is False
|
|
|
|
def test_c08_plus_actionable_warn_needs_action(self):
|
|
r = self._make_report([("INFO", "C-08", False), ("WARN", "C-10", True)])
|
|
assert _report_needs_action(r, behind_remote=False) is True
|
|
|
|
def test_fixable_warn_needs_action(self):
|
|
r = self._make_report([("WARN", "C-15", True)])
|
|
assert _report_needs_action(r, behind_remote=False) is True
|
|
|
|
def test_non_fixable_warn_needs_action(self):
|
|
"""Non-fixable WARNs (e.g. C-07 ghost) still warrant attention."""
|
|
r = self._make_report([("WARN", "C-14", False)])
|
|
assert _report_needs_action(r, behind_remote=False) is True
|
|
|
|
def test_background_checks_constant_contains_c08(self):
|
|
assert "C-08" in _BACKGROUND_CHECKS
|
|
|
|
# --- ahead_of_remote (T04 push-seal integration) ---
|
|
|
|
def test_ahead_of_remote_triggers_action(self):
|
|
"""Unpushed commits from a prior failed push must re-trigger a fix cycle."""
|
|
r = self._make_report([])
|
|
assert _report_needs_action(r, behind_remote=False, ahead_of_remote=3) is True
|
|
|
|
def test_zero_ahead_clean_is_skipped(self):
|
|
"""Fully in-sync repo with no issues is skipped."""
|
|
r = self._make_report([])
|
|
assert _report_needs_action(r, behind_remote=False, ahead_of_remote=0) is False
|
|
|
|
def test_ahead_default_is_zero(self):
|
|
"""Default value of ahead_of_remote=0 preserves backward compat."""
|
|
r = self._make_report([])
|
|
assert _report_needs_action(r, behind_remote=False) is False
|
|
|
|
def test_both_behind_and_ahead_triggers(self):
|
|
"""Diverged repo (both behind and ahead) needs action."""
|
|
r = self._make_report([])
|
|
assert _report_needs_action(r, behind_remote=True, ahead_of_remote=5) is True
|