generated from coulomb/repo-seed
Add reconciliation file write-through
This commit is contained in:
155
api/services/workplan_files.py
Normal file
155
api/services/workplan_files.py
Normal file
@@ -0,0 +1,155 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import socket
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
from api.models.managed_repo import ManagedRepo
|
||||
|
||||
|
||||
_TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class WorkplanFileRef:
|
||||
repo_path: Path
|
||||
path: Path
|
||||
archived: bool
|
||||
|
||||
@property
|
||||
def relative_path(self) -> str:
|
||||
return str(self.path.relative_to(self.repo_path))
|
||||
|
||||
|
||||
def resolve_repo_path(repo: ManagedRepo | None) -> Path | None:
|
||||
if repo is None:
|
||||
return None
|
||||
hostname = socket.gethostname()
|
||||
host_paths = repo.host_paths or {}
|
||||
candidates = [host_paths.get(hostname), repo.local_path]
|
||||
for raw in candidates:
|
||||
if not raw:
|
||||
continue
|
||||
path = Path(raw).expanduser()
|
||||
if path.is_dir():
|
||||
return path
|
||||
return None
|
||||
|
||||
|
||||
def find_workplan_for_workstream(
|
||||
repo: ManagedRepo | None,
|
||||
workstream_id: uuid.UUID,
|
||||
) -> WorkplanFileRef | None:
|
||||
repo_path = resolve_repo_path(repo)
|
||||
if repo_path is None:
|
||||
return None
|
||||
workplans_dir = repo_path / "workplans"
|
||||
for directory, archived in (
|
||||
(workplans_dir, False),
|
||||
(workplans_dir / "archived", True),
|
||||
):
|
||||
if not directory.is_dir():
|
||||
continue
|
||||
for path in sorted(directory.glob("*.md")):
|
||||
meta = _frontmatter(path)
|
||||
if str(meta.get("state_hub_workstream_id", "")).strip().strip('"') == str(workstream_id):
|
||||
return WorkplanFileRef(repo_path=repo_path, path=path, archived=archived)
|
||||
return None
|
||||
|
||||
|
||||
def task_block_linked(path: Path, task_id: uuid.UUID) -> bool:
|
||||
return _task_block_for_task(path, task_id) is not None
|
||||
|
||||
|
||||
def patch_workplan_status(path: Path, status: str) -> bool:
|
||||
return _patch_frontmatter_field(path, "status", status)
|
||||
|
||||
|
||||
def patch_task_status(path: Path, task_id: uuid.UUID, status: str) -> bool:
|
||||
text = path.read_text(encoding="utf-8")
|
||||
|
||||
def _replace(match: re.Match) -> str:
|
||||
block = match.group(0)
|
||||
meta = _parse_task_block(match.group(1))
|
||||
if str(meta.get("state_hub_task_id", "")).strip().strip('"') != str(task_id):
|
||||
return block
|
||||
replaced = re.sub(
|
||||
r"^(status:\s*)\S+",
|
||||
rf"\g<1>{status}",
|
||||
block,
|
||||
count=1,
|
||||
flags=re.MULTILINE,
|
||||
)
|
||||
if replaced != block:
|
||||
return replaced
|
||||
return block.replace("\n```", f"\nstatus: {status}\n```", 1)
|
||||
|
||||
new_text = _TASK_BLOCK_RE.sub(_replace, text)
|
||||
if new_text == text:
|
||||
return False
|
||||
path.write_text(new_text, encoding="utf-8")
|
||||
return True
|
||||
|
||||
|
||||
def _frontmatter(path: Path) -> dict[str, Any]:
|
||||
try:
|
||||
text = path.read_text(encoding="utf-8")
|
||||
except OSError:
|
||||
return {}
|
||||
if not text.startswith("---"):
|
||||
return {}
|
||||
parts = text.split("---", 2)
|
||||
if len(parts) < 3:
|
||||
return {}
|
||||
try:
|
||||
return yaml.safe_load(parts[1].strip()) or {}
|
||||
except yaml.YAMLError:
|
||||
return {}
|
||||
|
||||
|
||||
def _patch_frontmatter_field(path: Path, key: str, value: str) -> bool:
|
||||
text = path.read_text(encoding="utf-8")
|
||||
if not text.startswith("---"):
|
||||
return False
|
||||
lines = text.split("\n")
|
||||
close_idx = None
|
||||
for i, line in enumerate(lines[1:], 1):
|
||||
if line.strip() == "---":
|
||||
close_idx = i
|
||||
break
|
||||
if close_idx is None:
|
||||
return False
|
||||
|
||||
new_line = f"{key}: {value}"
|
||||
for i in range(1, close_idx):
|
||||
if re.match(rf"^\s*{re.escape(key)}\s*:", lines[i]):
|
||||
if lines[i] == new_line:
|
||||
return False
|
||||
lines[i] = new_line
|
||||
path.write_text("\n".join(lines), encoding="utf-8")
|
||||
return True
|
||||
|
||||
lines.insert(close_idx, new_line)
|
||||
path.write_text("\n".join(lines), encoding="utf-8")
|
||||
return True
|
||||
|
||||
|
||||
def _task_block_for_task(path: Path, task_id: uuid.UUID) -> dict[str, Any] | None:
|
||||
text = path.read_text(encoding="utf-8")
|
||||
for match in _TASK_BLOCK_RE.finditer(text):
|
||||
meta = _parse_task_block(match.group(1))
|
||||
if str(meta.get("state_hub_task_id", "")).strip().strip('"') == str(task_id):
|
||||
return meta
|
||||
return None
|
||||
|
||||
|
||||
def _parse_task_block(raw: str) -> dict[str, Any]:
|
||||
try:
|
||||
return yaml.safe_load(raw.strip()) or {}
|
||||
except yaml.YAMLError:
|
||||
return {}
|
||||
Reference in New Issue
Block a user