from __future__ import annotations import re import socket import uuid from dataclasses import dataclass from pathlib import Path from typing import Any import yaml from api.models.managed_repo import ManagedRepo _TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL) @dataclass(frozen=True) class WorkplanFileRef: repo_path: Path path: Path archived: bool @property def relative_path(self) -> str: return str(self.path.relative_to(self.repo_path)) def resolve_repo_path(repo: ManagedRepo | None) -> Path | None: if repo is None: return None hostname = socket.gethostname() host_paths = repo.host_paths or {} candidates = [host_paths.get(hostname), repo.local_path] for raw in candidates: if not raw: continue path = Path(raw).expanduser() if path.is_dir(): return path return None def find_workplan_for_workstream( repo: ManagedRepo | None, workstream_id: uuid.UUID, ) -> WorkplanFileRef | None: repo_path = resolve_repo_path(repo) if repo_path is None: return None workplans_dir = repo_path / "workplans" for directory, archived in ( (workplans_dir, False), (workplans_dir / "archived", True), ): if not directory.is_dir(): continue for path in sorted(directory.glob("*.md")): meta = _frontmatter(path) if str(meta.get("state_hub_workstream_id", "")).strip().strip('"') == str(workstream_id): return WorkplanFileRef(repo_path=repo_path, path=path, archived=archived) return None def task_block_linked(path: Path, task_id: uuid.UUID) -> bool: return _task_block_for_task(path, task_id) is not None def workplan_status(path: Path) -> str | None: status = _frontmatter(path).get("status") return str(status).strip() if status is not None else None def task_block_status(path: Path, task_id: uuid.UUID) -> str | None: meta = _task_block_for_task(path, task_id) if meta is None: return None status = meta.get("status") return str(status).strip() if status is not None else None def patch_workplan_status(path: Path, status: str) -> bool: return _patch_frontmatter_field(path, "status", status) def patch_task_status(path: Path, task_id: uuid.UUID, status: str) -> bool: text = path.read_text(encoding="utf-8") def _replace(match: re.Match) -> str: block = match.group(0) meta = _parse_task_block(match.group(1)) if str(meta.get("state_hub_task_id", "")).strip().strip('"') != str(task_id): return block replaced = re.sub( r"^(status:\s*)\S+", rf"\g<1>{status}", block, count=1, flags=re.MULTILINE, ) if replaced != block: return replaced return block.replace("\n```", f"\nstatus: {status}\n```", 1) new_text = _TASK_BLOCK_RE.sub(_replace, text) if new_text == text: return False path.write_text(new_text, encoding="utf-8") return True def _frontmatter(path: Path) -> dict[str, Any]: try: text = path.read_text(encoding="utf-8") except OSError: return {} if not text.startswith("---"): return {} parts = text.split("---", 2) if len(parts) < 3: return {} try: return yaml.safe_load(parts[1].strip()) or {} except yaml.YAMLError: return {} def _patch_frontmatter_field(path: Path, key: str, value: str) -> bool: text = path.read_text(encoding="utf-8") if not text.startswith("---"): return False lines = text.split("\n") close_idx = None for i, line in enumerate(lines[1:], 1): if line.strip() == "---": close_idx = i break if close_idx is None: return False new_line = f"{key}: {value}" for i in range(1, close_idx): if re.match(rf"^\s*{re.escape(key)}\s*:", lines[i]): if lines[i] == new_line: return False lines[i] = new_line path.write_text("\n".join(lines), encoding="utf-8") return True lines.insert(close_idx, new_line) path.write_text("\n".join(lines), encoding="utf-8") return True def _task_block_for_task(path: Path, task_id: uuid.UUID) -> dict[str, Any] | None: try: text = path.read_text(encoding="utf-8") except OSError: return None for match in _TASK_BLOCK_RE.finditer(text): meta = _parse_task_block(match.group(1)) if str(meta.get("state_hub_task_id", "")).strip().strip('"') == str(task_id): return meta return None def _parse_task_block(raw: str) -> dict[str, Any]: try: return yaml.safe_load(raw.strip()) or {} except yaml.YAMLError: return {}