from __future__ import annotations import fnmatch import subprocess from dataclasses import dataclass from pathlib import Path from typing import Any CANONICAL_WORKPLAN_STATUSES: tuple[str, ...] = ( "proposed", "ready", "active", "blocked", "backlog", "finished", "archived", ) LEGACY_WORKPLAN_STATUS_ALIASES: dict[str, str] = { "todo": "ready", "done": "finished", "completed": "finished", "accepted": "finished", } SUPPORTED_WORKPLAN_STATUSES: tuple[str, ...] = ( *CANONICAL_WORKPLAN_STATUSES, *LEGACY_WORKPLAN_STATUS_ALIASES.keys(), ) OPEN_WORKPLAN_STATUSES: tuple[str, ...] = ("ready", "active", "blocked") CURRENT_WORKPLAN_STATUSES: tuple[str, ...] = ("active", "blocked") CLOSED_WORKPLAN_STATUSES: tuple[str, ...] = ("finished", "archived") PLANNING_WORKPLAN_STATUSES: tuple[str, ...] = ("proposed", "ready", "backlog") # Legacy aliases (workstream terminology) CANONICAL_WORKSTREAM_STATUSES = CANONICAL_WORKPLAN_STATUSES LEGACY_WORKSTREAM_STATUS_ALIASES = LEGACY_WORKPLAN_STATUS_ALIASES SUPPORTED_WORKSTREAM_STATUSES = SUPPORTED_WORKPLAN_STATUSES OPEN_WORKSTREAM_STATUSES = OPEN_WORKPLAN_STATUSES CURRENT_WORKSTREAM_STATUSES = CURRENT_WORKPLAN_STATUSES CLOSED_WORKSTREAM_STATUSES = CLOSED_WORKPLAN_STATUSES PLANNING_WORKSTREAM_STATUSES = PLANNING_WORKPLAN_STATUSES @dataclass(frozen=True) class ReadyReviewStatus: needs_review: bool reason: str = "" changed_paths: tuple[str, ...] = () def normalize_workplan_status(status: Any, *, has_started: bool | None = None) -> str: """Return the canonical lifecycle status for a stored or legacy value.""" value = _status_value(status) if value == "todo" and has_started: return "active" return LEGACY_WORKPLAN_STATUS_ALIASES.get(value, value) normalize_workstream_status = normalize_workplan_status def is_canonical_workplan_status(status: Any) -> bool: return _status_value(status) in CANONICAL_WORKPLAN_STATUSES is_canonical_workstream_status = is_canonical_workplan_status def is_supported_workplan_status(status: Any) -> bool: return _status_value(status) in SUPPORTED_WORKPLAN_STATUSES is_supported_workstream_status = is_supported_workplan_status def workplan_has_started(task_statuses: list[Any] | tuple[Any, ...]) -> bool: return any(_status_value(status) not in {"", "todo"} for status in task_statuses) workstream_has_started = workplan_has_started def ready_review_status( repo_dir: str | Path, reviewed_against_commit: Any, context_paths: Any = None, ) -> ReadyReviewStatus: """Return whether a ready workplan needs review against current repo state. When context paths are supplied, only changes under those paths matter. Missing or invalid git metadata is treated conservatively as needs review. """ reviewed = str(reviewed_against_commit or "").strip().strip("\"'") if not reviewed: return ReadyReviewStatus(False) repo = Path(repo_dir) head = _git_output(repo, ["rev-parse", "HEAD"]) if not head: return ReadyReviewStatus(True, "could not determine repository HEAD") if reviewed == head: return ReadyReviewStatus(False) if not _git_commit_exists(repo, reviewed): return ReadyReviewStatus(True, f"review commit {reviewed[:12]} is not available") patterns = _as_list(context_paths) if not patterns: return ReadyReviewStatus( True, f"reviewed against {reviewed[:12]}, current HEAD is {head[:12]}", ) changed = _changed_paths_since(repo, reviewed) if changed is None: return ReadyReviewStatus(True, "could not compare reviewed commit with HEAD") matching = tuple(path for path in changed if _matches_any_context(path, patterns)) if not matching: return ReadyReviewStatus(False) return ReadyReviewStatus( True, f"{len(matching)} context path(s) changed since {reviewed[:12]}", matching, ) def _status_value(status: Any) -> str: if hasattr(status, "value") and not isinstance(status, (str, bytes, bytearray)): status = status.value return str(status or "").strip().lower() def _as_list(value: Any) -> list[str]: if value is None: return [] if isinstance(value, (list, tuple, set)): return [str(item).strip().replace("\\", "/") for item in value if str(item).strip()] if isinstance(value, str): return [item.strip().replace("\\", "/") for item in value.split(",") if item.strip()] return [str(value).strip().replace("\\", "/")] def _git_output(repo: Path, args: list[str]) -> str | None: try: return subprocess.check_output( ["git", "-C", str(repo), *args], stderr=subprocess.DEVNULL, text=True, ).strip() except (subprocess.CalledProcessError, FileNotFoundError, OSError): return None def _git_commit_exists(repo: Path, commit: str) -> bool: try: subprocess.run( ["git", "-C", str(repo), "cat-file", "-e", f"{commit}^{{commit}}"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return True except (subprocess.CalledProcessError, FileNotFoundError, OSError): return False def _changed_paths_since(repo: Path, commit: str) -> tuple[str, ...] | None: output = _git_output(repo, ["diff", "--name-only", f"{commit}..HEAD", "--"]) if output is None: return None return tuple(path.strip().replace("\\", "/") for path in output.splitlines() if path.strip()) def _matches_any_context(path: str, patterns: list[str]) -> bool: norm_path = path.strip().replace("\\", "/").lstrip("./") for raw_pattern in patterns: pattern = raw_pattern.strip().replace("\\", "/").lstrip("./") if not pattern: continue if any(char in pattern for char in "*?[]"): if fnmatch.fnmatch(norm_path, pattern): return True elif norm_path == pattern or norm_path.startswith(f"{pattern.rstrip('/')}/"): return True return False