generated from coulomb/repo-seed
170 lines
5.4 KiB
Python
170 lines
5.4 KiB
Python
from __future__ import annotations
|
|
|
|
import fnmatch
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
CANONICAL_WORKSTREAM_STATUSES: tuple[str, ...] = (
|
|
"proposed",
|
|
"ready",
|
|
"active",
|
|
"blocked",
|
|
"backlog",
|
|
"finished",
|
|
"archived",
|
|
)
|
|
|
|
LEGACY_WORKSTREAM_STATUS_ALIASES: dict[str, str] = {
|
|
"todo": "ready",
|
|
"done": "finished",
|
|
"completed": "finished",
|
|
"accepted": "finished",
|
|
}
|
|
|
|
SUPPORTED_WORKSTREAM_STATUSES: tuple[str, ...] = (
|
|
*CANONICAL_WORKSTREAM_STATUSES,
|
|
*LEGACY_WORKSTREAM_STATUS_ALIASES.keys(),
|
|
)
|
|
|
|
OPEN_WORKSTREAM_STATUSES: tuple[str, ...] = ("ready", "active", "blocked")
|
|
CURRENT_WORKSTREAM_STATUSES: tuple[str, ...] = ("active", "blocked")
|
|
CLOSED_WORKSTREAM_STATUSES: tuple[str, ...] = ("finished", "archived")
|
|
PLANNING_WORKSTREAM_STATUSES: tuple[str, ...] = ("proposed", "ready", "backlog")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ReadyReviewStatus:
|
|
needs_review: bool
|
|
reason: str = ""
|
|
changed_paths: tuple[str, ...] = ()
|
|
|
|
|
|
def normalize_workstream_status(status: Any, *, has_started: bool | None = None) -> str:
|
|
"""Return the canonical lifecycle status for a stored or legacy value."""
|
|
value = _status_value(status)
|
|
if value == "todo" and has_started:
|
|
return "active"
|
|
return LEGACY_WORKSTREAM_STATUS_ALIASES.get(value, value)
|
|
|
|
|
|
def is_canonical_workstream_status(status: Any) -> bool:
|
|
return _status_value(status) in CANONICAL_WORKSTREAM_STATUSES
|
|
|
|
|
|
def is_supported_workstream_status(status: Any) -> bool:
|
|
return _status_value(status) in SUPPORTED_WORKSTREAM_STATUSES
|
|
|
|
|
|
def workstream_has_started(task_statuses: list[Any] | tuple[Any, ...]) -> bool:
|
|
return any(_status_value(status) not in {"", "todo"} for status in task_statuses)
|
|
|
|
|
|
def ready_review_status(
|
|
repo_dir: str | Path,
|
|
reviewed_against_commit: Any,
|
|
context_paths: Any = None,
|
|
) -> ReadyReviewStatus:
|
|
"""Return whether a ready workplan needs review against current repo state.
|
|
|
|
When context paths are supplied, only changes under those paths matter.
|
|
Missing or invalid git metadata is treated conservatively as needs review.
|
|
"""
|
|
reviewed = str(reviewed_against_commit or "").strip().strip("\"'")
|
|
if not reviewed:
|
|
return ReadyReviewStatus(False)
|
|
|
|
repo = Path(repo_dir)
|
|
head = _git_output(repo, ["rev-parse", "HEAD"])
|
|
if not head:
|
|
return ReadyReviewStatus(True, "could not determine repository HEAD")
|
|
if reviewed == head:
|
|
return ReadyReviewStatus(False)
|
|
|
|
if not _git_commit_exists(repo, reviewed):
|
|
return ReadyReviewStatus(True, f"review commit {reviewed[:12]} is not available")
|
|
|
|
patterns = _as_list(context_paths)
|
|
if not patterns:
|
|
return ReadyReviewStatus(
|
|
True,
|
|
f"reviewed against {reviewed[:12]}, current HEAD is {head[:12]}",
|
|
)
|
|
|
|
changed = _changed_paths_since(repo, reviewed)
|
|
if changed is None:
|
|
return ReadyReviewStatus(True, "could not compare reviewed commit with HEAD")
|
|
|
|
matching = tuple(path for path in changed if _matches_any_context(path, patterns))
|
|
if not matching:
|
|
return ReadyReviewStatus(False)
|
|
|
|
return ReadyReviewStatus(
|
|
True,
|
|
f"{len(matching)} context path(s) changed since {reviewed[:12]}",
|
|
matching,
|
|
)
|
|
|
|
|
|
def _status_value(status: Any) -> str:
|
|
if hasattr(status, "value") and not isinstance(status, (str, bytes, bytearray)):
|
|
status = status.value
|
|
return str(status or "").strip().lower()
|
|
|
|
|
|
def _as_list(value: Any) -> list[str]:
|
|
if value is None:
|
|
return []
|
|
if isinstance(value, (list, tuple, set)):
|
|
return [str(item).strip().replace("\\", "/") for item in value if str(item).strip()]
|
|
if isinstance(value, str):
|
|
return [item.strip().replace("\\", "/") for item in value.split(",") if item.strip()]
|
|
return [str(value).strip().replace("\\", "/")]
|
|
|
|
|
|
def _git_output(repo: Path, args: list[str]) -> str | None:
|
|
try:
|
|
return subprocess.check_output(
|
|
["git", "-C", str(repo), *args],
|
|
stderr=subprocess.DEVNULL,
|
|
text=True,
|
|
).strip()
|
|
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
|
|
return None
|
|
|
|
|
|
def _git_commit_exists(repo: Path, commit: str) -> bool:
|
|
try:
|
|
subprocess.run(
|
|
["git", "-C", str(repo), "cat-file", "-e", f"{commit}^{{commit}}"],
|
|
check=True,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
return True
|
|
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
|
|
return False
|
|
|
|
|
|
def _changed_paths_since(repo: Path, commit: str) -> tuple[str, ...] | None:
|
|
output = _git_output(repo, ["diff", "--name-only", f"{commit}..HEAD", "--"])
|
|
if output is None:
|
|
return None
|
|
return tuple(path.strip().replace("\\", "/") for path in output.splitlines() if path.strip())
|
|
|
|
|
|
def _matches_any_context(path: str, patterns: list[str]) -> bool:
|
|
norm_path = path.strip().replace("\\", "/").lstrip("./")
|
|
for raw_pattern in patterns:
|
|
pattern = raw_pattern.strip().replace("\\", "/").lstrip("./")
|
|
if not pattern:
|
|
continue
|
|
if any(char in pattern for char in "*?[]"):
|
|
if fnmatch.fnmatch(norm_path, pattern):
|
|
return True
|
|
elif norm_path == pattern or norm_path.startswith(f"{pattern.rstrip('/')}/"):
|
|
return True
|
|
return False
|