generated from coulomb/repo-seed
172 lines
4.8 KiB
Python
172 lines
4.8 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
import socket
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
from api.models.managed_repo import ManagedRepo
|
|
|
|
|
|
_TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class WorkplanFileRef:
|
|
repo_path: Path
|
|
path: Path
|
|
archived: bool
|
|
|
|
@property
|
|
def relative_path(self) -> str:
|
|
return str(self.path.relative_to(self.repo_path))
|
|
|
|
|
|
def resolve_repo_path(repo: ManagedRepo | None) -> Path | None:
|
|
if repo is None:
|
|
return None
|
|
hostname = socket.gethostname()
|
|
host_paths = repo.host_paths or {}
|
|
candidates = [host_paths.get(hostname), repo.local_path]
|
|
for raw in candidates:
|
|
if not raw:
|
|
continue
|
|
path = Path(raw).expanduser()
|
|
if path.is_dir():
|
|
return path
|
|
return None
|
|
|
|
|
|
def find_workplan_for_workstream(
|
|
repo: ManagedRepo | None,
|
|
workstream_id: uuid.UUID,
|
|
) -> WorkplanFileRef | None:
|
|
repo_path = resolve_repo_path(repo)
|
|
if repo_path is None:
|
|
return None
|
|
workplans_dir = repo_path / "workplans"
|
|
for directory, archived in (
|
|
(workplans_dir, False),
|
|
(workplans_dir / "archived", True),
|
|
):
|
|
if not directory.is_dir():
|
|
continue
|
|
for path in sorted(directory.glob("*.md")):
|
|
meta = _frontmatter(path)
|
|
if str(meta.get("state_hub_workstream_id", "")).strip().strip('"') == str(workstream_id):
|
|
return WorkplanFileRef(repo_path=repo_path, path=path, archived=archived)
|
|
return None
|
|
|
|
|
|
def task_block_linked(path: Path, task_id: uuid.UUID) -> bool:
|
|
return _task_block_for_task(path, task_id) is not None
|
|
|
|
|
|
def workplan_status(path: Path) -> str | None:
|
|
status = _frontmatter(path).get("status")
|
|
return str(status).strip() if status is not None else None
|
|
|
|
|
|
def task_block_status(path: Path, task_id: uuid.UUID) -> str | None:
|
|
meta = _task_block_for_task(path, task_id)
|
|
if meta is None:
|
|
return None
|
|
status = meta.get("status")
|
|
return str(status).strip() if status is not None else None
|
|
|
|
|
|
def patch_workplan_status(path: Path, status: str) -> bool:
|
|
return _patch_frontmatter_field(path, "status", status)
|
|
|
|
|
|
def patch_task_status(path: Path, task_id: uuid.UUID, status: str) -> bool:
|
|
text = path.read_text(encoding="utf-8")
|
|
|
|
def _replace(match: re.Match) -> str:
|
|
block = match.group(0)
|
|
meta = _parse_task_block(match.group(1))
|
|
if str(meta.get("state_hub_task_id", "")).strip().strip('"') != str(task_id):
|
|
return block
|
|
replaced = re.sub(
|
|
r"^(status:\s*)\S+",
|
|
rf"\g<1>{status}",
|
|
block,
|
|
count=1,
|
|
flags=re.MULTILINE,
|
|
)
|
|
if replaced != block:
|
|
return replaced
|
|
return block.replace("\n```", f"\nstatus: {status}\n```", 1)
|
|
|
|
new_text = _TASK_BLOCK_RE.sub(_replace, text)
|
|
if new_text == text:
|
|
return False
|
|
path.write_text(new_text, encoding="utf-8")
|
|
return True
|
|
|
|
|
|
def _frontmatter(path: Path) -> dict[str, Any]:
|
|
try:
|
|
text = path.read_text(encoding="utf-8")
|
|
except OSError:
|
|
return {}
|
|
if not text.startswith("---"):
|
|
return {}
|
|
parts = text.split("---", 2)
|
|
if len(parts) < 3:
|
|
return {}
|
|
try:
|
|
return yaml.safe_load(parts[1].strip()) or {}
|
|
except yaml.YAMLError:
|
|
return {}
|
|
|
|
|
|
def _patch_frontmatter_field(path: Path, key: str, value: str) -> bool:
|
|
text = path.read_text(encoding="utf-8")
|
|
if not text.startswith("---"):
|
|
return False
|
|
lines = text.split("\n")
|
|
close_idx = None
|
|
for i, line in enumerate(lines[1:], 1):
|
|
if line.strip() == "---":
|
|
close_idx = i
|
|
break
|
|
if close_idx is None:
|
|
return False
|
|
|
|
new_line = f"{key}: {value}"
|
|
for i in range(1, close_idx):
|
|
if re.match(rf"^\s*{re.escape(key)}\s*:", lines[i]):
|
|
if lines[i] == new_line:
|
|
return False
|
|
lines[i] = new_line
|
|
path.write_text("\n".join(lines), encoding="utf-8")
|
|
return True
|
|
|
|
lines.insert(close_idx, new_line)
|
|
path.write_text("\n".join(lines), encoding="utf-8")
|
|
return True
|
|
|
|
|
|
def _task_block_for_task(path: Path, task_id: uuid.UUID) -> dict[str, Any] | None:
|
|
try:
|
|
text = path.read_text(encoding="utf-8")
|
|
except OSError:
|
|
return None
|
|
for match in _TASK_BLOCK_RE.finditer(text):
|
|
meta = _parse_task_block(match.group(1))
|
|
if str(meta.get("state_hub_task_id", "")).strip().strip('"') == str(task_id):
|
|
return meta
|
|
return None
|
|
|
|
|
|
def _parse_task_block(raw: str) -> dict[str, Any]:
|
|
try:
|
|
return yaml.safe_load(raw.strip()) or {}
|
|
except yaml.YAMLError:
|
|
return {}
|