diff --git a/api/routers/reconciliation.py b/api/routers/reconciliation.py index 2637b4c..ddab071 100644 --- a/api/routers/reconciliation.py +++ b/api/routers/reconciliation.py @@ -5,14 +5,23 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from api.database import get_session +from api.models.managed_repo import ManagedRepo from api.models.task import Task +from api.models.task import TaskStatus from api.models.workstream import Workstream from api.schemas.reconciliation import StateChangeRequest, StateChangeResponse from api.services.lifecycle import status_value from api.services.reconciliation import ( + ReconciliationClass, classify_task_status_change, classify_workstream_status_change, ) +from api.services.workplan_files import ( + find_workplan_for_workstream, + patch_task_status, + patch_workplan_status, + task_block_linked, +) from api.workplan_status import normalize_workstream_status router = APIRouter(prefix="/reconciliation", tags=["reconciliation"]) @@ -38,33 +47,58 @@ async def classify_state_change( if ws is None: raise HTTPException(status_code=404, detail="Workstream not found") - file_backed = _bool_or_default(body.file_backed, ws.repo_id is not None) - archived_file = _bool_or_default(body.archived_file, False) + repo = await session.get(ManagedRepo, ws.repo_id) if ws.repo_id else None + workplan_ref = find_workplan_for_workstream(repo, ws.id) + actual_file_backed = workplan_ref is not None + actual_archived_file = bool(workplan_ref and workplan_ref.archived) + file_backed = ( + actual_file_backed + if body.apply + else _bool_or_default(body.file_backed, actual_file_backed) + ) + archived_file = ( + actual_archived_file + if body.apply + else _bool_or_default(body.archived_file, actual_archived_file) + ) tasks_terminal = ( body.tasks_terminal if body.tasks_terminal is not None else await _workstream_tasks_terminal(session, ws.id) ) + current_status = normalize_workstream_status(ws.status) + target_status = normalize_workstream_status(body.target_status) classification = classify_workstream_status_change( - current_status=ws.status, - target_status=body.target_status, + current_status=current_status, + target_status=target_status, file_backed=file_backed, archived_file=archived_file, tasks_terminal=tasks_terminal, ) + write_result = "not_attempted" + if body.apply: + if classification.reconciliation_class == ReconciliationClass.WRITE_THROUGH and workplan_ref: + patch_workplan_status(workplan_ref.path, target_status) + ws.status = target_status + await session.commit() + write_result = "applied" + else: + write_result = "not_applicable" return StateChangeResponse( target_type=body.target_type, target_id=body.target_id, actor=body.actor, intent=body.intent, - current_status=normalize_workstream_status(ws.status), - target_status=normalize_workstream_status(body.target_status), + current_status=current_status, + target_status=target_status, file_backed=file_backed, archived_file=archived_file, tasks_terminal=tasks_terminal, reconciliation_class=classification.reconciliation_class, reason=classification.reason, follow_up=classification.follow_up, + write_through_result=write_result, + workplan_path=workplan_ref.relative_path if workplan_ref else None, ) task = await session.get(Task, body.target_id) @@ -72,28 +106,64 @@ async def classify_state_change( raise HTTPException(status_code=404, detail="Task not found") ws = await session.get(Workstream, task.workstream_id) - file_backed = _bool_or_default(body.file_backed, bool(ws and ws.repo_id)) - archived_file = _bool_or_default(body.archived_file, False) - task_linked = _bool_or_default(body.task_linked, True) + repo = await session.get(ManagedRepo, ws.repo_id) if ws and ws.repo_id else None + workplan_ref = find_workplan_for_workstream(repo, ws.id) if ws else None + actual_file_backed = workplan_ref is not None + actual_archived_file = bool(workplan_ref and workplan_ref.archived) + file_backed = ( + actual_file_backed + if body.apply + else _bool_or_default(body.file_backed, actual_file_backed) + ) + archived_file = ( + actual_archived_file + if body.apply + else _bool_or_default(body.archived_file, actual_archived_file) + ) + actual_task_linked = bool(workplan_ref and task_block_linked(workplan_ref.path, task.id)) + task_linked = ( + actual_task_linked + if body.apply + else _bool_or_default(body.task_linked, actual_task_linked) + ) + current_status = status_value(task.status) + target_status = status_value(body.target_status) classification = classify_task_status_change( - current_status=task.status, - target_status=body.target_status, + current_status=current_status, + target_status=target_status, file_backed=file_backed, archived_file=archived_file, task_linked=task_linked, blocking_reason=body.blocking_reason, ) + write_result = "not_attempted" + if body.apply: + if ( + classification.reconciliation_class == ReconciliationClass.WRITE_THROUGH + and workplan_ref + and actual_task_linked + ): + patch_task_status(workplan_ref.path, task.id, target_status) + task.status = TaskStatus(target_status) + if body.blocking_reason is not None: + task.blocking_reason = body.blocking_reason + await session.commit() + write_result = "applied" + else: + write_result = "not_applicable" return StateChangeResponse( target_type=body.target_type, target_id=body.target_id, actor=body.actor, intent=body.intent, - current_status=status_value(task.status), - target_status=status_value(body.target_status), + current_status=current_status, + target_status=target_status, file_backed=file_backed, archived_file=archived_file, task_linked=task_linked, reconciliation_class=classification.reconciliation_class, reason=classification.reason, follow_up=classification.follow_up, + write_through_result=write_result, + workplan_path=workplan_ref.relative_path if workplan_ref else None, ) diff --git a/api/schemas/reconciliation.py b/api/schemas/reconciliation.py index 47e8067..d2c4b6d 100644 --- a/api/schemas/reconciliation.py +++ b/api/schemas/reconciliation.py @@ -20,6 +20,7 @@ class StateChangeRequest(BaseModel): task_linked: bool | None = None tasks_terminal: bool | None = None blocking_reason: str | None = None + apply: bool = False class StateChangeResponse(BaseModel): @@ -36,4 +37,5 @@ class StateChangeResponse(BaseModel): reconciliation_class: ReconciliationClass reason: str follow_up: str - write_through_result: Literal["not_attempted"] = "not_attempted" + write_through_result: Literal["not_attempted", "applied", "not_applicable"] = "not_attempted" + workplan_path: str | None = None diff --git a/api/services/workplan_files.py b/api/services/workplan_files.py new file mode 100644 index 0000000..9130f59 --- /dev/null +++ b/api/services/workplan_files.py @@ -0,0 +1,155 @@ +from __future__ import annotations + +import re +import socket +import uuid +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import yaml + +from api.models.managed_repo import ManagedRepo + + +_TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL) + + +@dataclass(frozen=True) +class WorkplanFileRef: + repo_path: Path + path: Path + archived: bool + + @property + def relative_path(self) -> str: + return str(self.path.relative_to(self.repo_path)) + + +def resolve_repo_path(repo: ManagedRepo | None) -> Path | None: + if repo is None: + return None + hostname = socket.gethostname() + host_paths = repo.host_paths or {} + candidates = [host_paths.get(hostname), repo.local_path] + for raw in candidates: + if not raw: + continue + path = Path(raw).expanduser() + if path.is_dir(): + return path + return None + + +def find_workplan_for_workstream( + repo: ManagedRepo | None, + workstream_id: uuid.UUID, +) -> WorkplanFileRef | None: + repo_path = resolve_repo_path(repo) + if repo_path is None: + return None + workplans_dir = repo_path / "workplans" + for directory, archived in ( + (workplans_dir, False), + (workplans_dir / "archived", True), + ): + if not directory.is_dir(): + continue + for path in sorted(directory.glob("*.md")): + meta = _frontmatter(path) + if str(meta.get("state_hub_workstream_id", "")).strip().strip('"') == str(workstream_id): + return WorkplanFileRef(repo_path=repo_path, path=path, archived=archived) + return None + + +def task_block_linked(path: Path, task_id: uuid.UUID) -> bool: + return _task_block_for_task(path, task_id) is not None + + +def patch_workplan_status(path: Path, status: str) -> bool: + return _patch_frontmatter_field(path, "status", status) + + +def patch_task_status(path: Path, task_id: uuid.UUID, status: str) -> bool: + text = path.read_text(encoding="utf-8") + + def _replace(match: re.Match) -> str: + block = match.group(0) + meta = _parse_task_block(match.group(1)) + if str(meta.get("state_hub_task_id", "")).strip().strip('"') != str(task_id): + return block + replaced = re.sub( + r"^(status:\s*)\S+", + rf"\g<1>{status}", + block, + count=1, + flags=re.MULTILINE, + ) + if replaced != block: + return replaced + return block.replace("\n```", f"\nstatus: {status}\n```", 1) + + new_text = _TASK_BLOCK_RE.sub(_replace, text) + if new_text == text: + return False + path.write_text(new_text, encoding="utf-8") + return True + + +def _frontmatter(path: Path) -> dict[str, Any]: + try: + text = path.read_text(encoding="utf-8") + except OSError: + return {} + if not text.startswith("---"): + return {} + parts = text.split("---", 2) + if len(parts) < 3: + return {} + try: + return yaml.safe_load(parts[1].strip()) or {} + except yaml.YAMLError: + return {} + + +def _patch_frontmatter_field(path: Path, key: str, value: str) -> bool: + text = path.read_text(encoding="utf-8") + if not text.startswith("---"): + return False + lines = text.split("\n") + close_idx = None + for i, line in enumerate(lines[1:], 1): + if line.strip() == "---": + close_idx = i + break + if close_idx is None: + return False + + new_line = f"{key}: {value}" + for i in range(1, close_idx): + if re.match(rf"^\s*{re.escape(key)}\s*:", lines[i]): + if lines[i] == new_line: + return False + lines[i] = new_line + path.write_text("\n".join(lines), encoding="utf-8") + return True + + lines.insert(close_idx, new_line) + path.write_text("\n".join(lines), encoding="utf-8") + return True + + +def _task_block_for_task(path: Path, task_id: uuid.UUID) -> dict[str, Any] | None: + text = path.read_text(encoding="utf-8") + for match in _TASK_BLOCK_RE.finditer(text): + meta = _parse_task_block(match.group(1)) + if str(meta.get("state_hub_task_id", "")).strip().strip('"') == str(task_id): + return meta + return None + + +def _parse_task_block(raw: str) -> dict[str, Any]: + try: + return yaml.safe_load(raw.strip()) or {} + except yaml.YAMLError: + return {} diff --git a/tests/test_routers_core.py b/tests/test_routers_core.py index ad2c3dc..243a3df 100644 --- a/tests/test_routers_core.py +++ b/tests/test_routers_core.py @@ -28,10 +28,12 @@ async def _create_topic(client, domain_slug="testdomain", slug="testtopic", titl return r.json() -async def _create_workstream(client, topic_id, slug="test-ws", title="Test WS", status="active"): - r = await client.post("/workstreams/", json={ +async def _create_workstream(client, topic_id, slug="test-ws", title="Test WS", status="active", **extra): + payload = { "topic_id": topic_id, "slug": slug, "title": title, "status": status, - }) + } + payload.update(extra) + r = await client.post("/workstreams/", json=payload) assert r.status_code == 201, r.text return r.json() @@ -44,6 +46,17 @@ async def _create_task(client, workstream_id, title="Test task"): return r.json() +async def _create_repo(client, domain_slug="testdomain", slug="test-repo", local_path=None): + r = await client.post("/repos/", json={ + "domain_slug": domain_slug, + "slug": slug, + "name": "Test Repo", + "local_path": str(local_path) if local_path else None, + }) + assert r.status_code == 201, r.text + return r.json() + + # --------------------------------------------------------------------------- # Domain tests # --------------------------------------------------------------------------- @@ -464,3 +477,113 @@ class TestReconciliationEndpoints: }) assert r.status_code == 404 + + async def test_apply_workstream_write_through_patches_file_then_db(self, client, tmp_path): + await _create_domain(client) + repo_root = tmp_path / "repo" + workplans = repo_root / "workplans" + workplans.mkdir(parents=True) + repo = await _create_repo(client, local_path=repo_root) + topic = await _create_topic(client) + ws = await _create_workstream(client, topic["id"], repo_id=repo["id"]) + wp = workplans / "STATE-WP-9999-demo.md" + wp.write_text( + "---\n" + "id: STATE-WP-9999\n" + "type: workplan\n" + "title: Demo\n" + "domain: custodian\n" + "repo: state-hub\n" + "status: active\n" + f"state_hub_workstream_id: \"{ws['id']}\"\n" + "---\n", + encoding="utf-8", + ) + + r = await client.post("/reconciliation/state-change", json={ + "target_type": "workstream", + "target_id": ws["id"], + "target_status": "backlog", + "actor": "dashboard", + "apply": True, + }) + + assert r.status_code == 200, r.text + body = r.json() + assert body["reconciliation_class"] == "write_through" + assert body["write_through_result"] == "applied" + assert body["workplan_path"] == "workplans/STATE-WP-9999-demo.md" + assert "status: backlog" in wp.read_text(encoding="utf-8") + + r = await client.get(f"/workstreams/{ws['id']}") + assert r.json()["status"] == "backlog" + + async def test_apply_workstream_without_file_does_not_mutate_db(self, client): + await _create_domain(client) + topic = await _create_topic(client) + ws = await _create_workstream(client, topic["id"]) + + r = await client.post("/reconciliation/state-change", json={ + "target_type": "workstream", + "target_id": ws["id"], + "target_status": "backlog", + "file_backed": True, + "apply": True, + }) + + assert r.status_code == 200, r.text + body = r.json() + assert body["file_backed"] is False + assert body["reconciliation_class"] == "deferred" + assert body["write_through_result"] == "not_applicable" + + r = await client.get(f"/workstreams/{ws['id']}") + assert r.json()["status"] == "active" + + async def test_apply_task_write_through_patches_task_block_then_db(self, client, tmp_path): + await _create_domain(client) + repo_root = tmp_path / "repo" + workplans = repo_root / "workplans" + workplans.mkdir(parents=True) + repo = await _create_repo(client, local_path=repo_root) + topic = await _create_topic(client) + ws = await _create_workstream(client, topic["id"], repo_id=repo["id"]) + task = await _create_task(client, ws["id"]) + wp = workplans / "STATE-WP-9999-demo.md" + wp.write_text( + "---\n" + "id: STATE-WP-9999\n" + "type: workplan\n" + "title: Demo\n" + "domain: custodian\n" + "repo: state-hub\n" + "status: active\n" + f"state_hub_workstream_id: \"{ws['id']}\"\n" + "---\n\n" + "## Demo Task\n\n" + "```task\n" + "id: STATE-WP-9999-T01\n" + "status: todo\n" + "priority: high\n" + f"state_hub_task_id: \"{task['id']}\"\n" + "```\n", + encoding="utf-8", + ) + + r = await client.post("/reconciliation/state-change", json={ + "target_type": "task", + "target_id": task["id"], + "target_status": "in_progress", + "actor": "dashboard", + "apply": True, + }) + + assert r.status_code == 200, r.text + body = r.json() + assert body["reconciliation_class"] == "write_through" + assert body["write_through_result"] == "applied" + assert body["workplan_path"] == "workplans/STATE-WP-9999-demo.md" + assert "status: in_progress" in wp.read_text(encoding="utf-8") + + r = await client.get(f"/tasks/{task['id']}") + assert r.json()["status"] == "in_progress" diff --git a/workplans/STATE-WP-0048-ui-state-change-reconciliation.md b/workplans/STATE-WP-0048-ui-state-change-reconciliation.md index 724cf17..866c255 100644 --- a/workplans/STATE-WP-0048-ui-state-change-reconciliation.md +++ b/workplans/STATE-WP-0048-ui-state-change-reconciliation.md @@ -72,7 +72,7 @@ pin the write-through, deferred, and human-confirmation decisions. ```task id: STATE-WP-0048-T02 -status: in_progress +status: done priority: high state_hub_task_id: "50c20ddf-f039-418b-a763-7a8f581be5b0" ``` @@ -90,11 +90,15 @@ current/target status, file-backed flags, reconciliation class, reason, follow-up action, and `write_through_result: not_attempted`. Dashboard wiring and write-through execution remain for the next slice. +Result 2026-05-23: the API contract now also supports `apply: true` for safe +write-through changes. Unsafe classifications return `not_applicable` instead +of mutating files or DB state. + ## T03 - Implement File Write-Through For Safe Changes ```task id: STATE-WP-0048-T03 -status: todo +status: done priority: high state_hub_task_id: "c0a4e976-81fb-4fe3-a8a9-b8262e2c1c85" ``` @@ -106,6 +110,11 @@ the repo file, then sync the DB. Preserve formatting and existing Done when common dashboard transitions such as `active -> backlog` or task status updates can update the file representation deterministically. +Result 2026-05-23: added workplan-file helpers and write-through support for +file-backed workstream status and linked task status changes. The endpoint +patches the workplan file first, then updates the DB status, and returns the +relative workplan path plus `write_through_result: applied`. + ## T04 - Queue Reconciliation For Deferred Changes ```task