diff --git a/api/routers/reconciliation.py b/api/routers/reconciliation.py index bcda7d3..092bbcf 100644 --- a/api/routers/reconciliation.py +++ b/api/routers/reconciliation.py @@ -14,6 +14,7 @@ from api.schemas.reconciliation import StateChangeRequest, StateChangeResponse from api.services.lifecycle import status_value from api.services.reconciliation import ( ReconciliationClass, + StateChangeClassification, classify_task_status_change, classify_workstream_status_change, ) @@ -21,7 +22,10 @@ from api.services.workplan_files import ( find_workplan_for_workstream, patch_task_status, patch_workplan_status, + resolve_repo_path, + task_block_status, task_block_linked, + workplan_status, ) from api.workplan_status import normalize_workstream_status @@ -32,6 +36,14 @@ def _bool_or_default(value: bool | None, default: bool) -> bool: return default if value is None else value +def _conflict(reason: str, follow_up: str) -> StateChangeClassification: + return StateChangeClassification( + ReconciliationClass.DEFERRED, + reason, + follow_up, + ) + + async def _workstream_tasks_terminal(session: AsyncSession, workstream_id: uuid.UUID) -> bool: result = await session.execute(select(Task.status).where(Task.workstream_id == workstream_id)) statuses = [status_value(row[0]) for row in result.all()] @@ -47,6 +59,7 @@ def _deferred_message( reason: str, follow_up: str, workplan_path: str | None, + conflict: bool = False, ) -> AgentMessage: subject = f"Reconcile {body.target_type} state change: {current_status} -> {target_status}" lines = [ @@ -56,9 +69,11 @@ def _deferred_message( f"target_id: {body.target_id}", f"actor: {body.actor}", f"intent: {body.intent or ''}", + f"expected_current_status: {body.expected_current_status or ''}", f"current_status: {current_status}", f"target_status: {target_status}", f"reconciliation_class: {classification.value}", + f"conflict: {str(conflict).lower()}", f"reason: {reason}", f"follow_up: {follow_up}", f"workplan_path: {workplan_path or ''}", @@ -82,7 +97,8 @@ async def classify_state_change( raise HTTPException(status_code=404, detail="Workstream not found") repo = await session.get(ManagedRepo, ws.repo_id) if ws.repo_id else None - workplan_ref = find_workplan_for_workstream(repo, ws.id) + repo_path = resolve_repo_path(repo) + workplan_ref = find_workplan_for_workstream(repo, ws.id) if repo_path else None actual_file_backed = workplan_ref is not None actual_archived_file = bool(workplan_ref and workplan_ref.archived) file_backed = ( @@ -111,13 +127,56 @@ async def classify_state_change( ) write_result = "not_attempted" reconciliation_record_id = None + conflict = False if body.apply: - if classification.reconciliation_class == ReconciliationClass.WRITE_THROUGH and workplan_ref: - patch_workplan_status(workplan_ref.path, target_status) - ws.status = target_status - await session.commit() - write_result = "applied" - else: + expected_status = ( + normalize_workstream_status(body.expected_current_status) + if body.expected_current_status is not None + else None + ) + if expected_status is not None and expected_status != current_status: + classification = _conflict( + f"cached workstream status changed from expected {expected_status!r} to {current_status!r}", + "refresh the dashboard and retry the state change if it is still intended", + ) + conflict = True + elif repo is not None and repo_path is None: + classification = _conflict( + "repo host path is unavailable for this State Hub host", + "register a host path for this machine or retry from a host with the repo checkout", + ) + conflict = True + elif classification.reconciliation_class == ReconciliationClass.WRITE_THROUGH and workplan_ref: + file_status = normalize_workstream_status(workplan_status(workplan_ref.path)) + if file_status and file_status != current_status: + classification = _conflict( + f"workplan file status {file_status!r} differs from cached DB status {current_status!r}", + "run consistency repair or refresh State Hub from files before retrying", + ) + conflict = True + else: + try: + patch_workplan_status(workplan_ref.path, target_status) + patched_status = normalize_workstream_status(workplan_status(workplan_ref.path)) + except OSError as exc: + classification = _conflict( + f"workplan file write failed: {exc}", + "fix repo file access and retry the reconciliation", + ) + conflict = True + else: + if patched_status != target_status: + classification = _conflict( + f"workplan file status could not be patched to {target_status!r}", + "inspect the workplan frontmatter format before retrying", + ) + conflict = True + else: + ws.status = target_status + await session.commit() + write_result = "applied" + + if write_result != "applied": msg = _deferred_message( body=body, current_status=current_status, @@ -126,6 +185,7 @@ async def classify_state_change( reason=classification.reason, follow_up=classification.follow_up, workplan_path=workplan_ref.relative_path if workplan_ref else None, + conflict=conflict, ) session.add(msg) await session.commit() @@ -148,6 +208,7 @@ async def classify_state_change( write_through_result=write_result, workplan_path=workplan_ref.relative_path if workplan_ref else None, reconciliation_record_id=reconciliation_record_id, + conflict=conflict, ) task = await session.get(Task, body.target_id) @@ -156,7 +217,8 @@ async def classify_state_change( ws = await session.get(Workstream, task.workstream_id) repo = await session.get(ManagedRepo, ws.repo_id) if ws and ws.repo_id else None - workplan_ref = find_workplan_for_workstream(repo, ws.id) if ws else None + repo_path = resolve_repo_path(repo) + workplan_ref = find_workplan_for_workstream(repo, ws.id) if ws and repo_path else None actual_file_backed = workplan_ref is not None actual_archived_file = bool(workplan_ref and workplan_ref.archived) file_backed = ( @@ -187,19 +249,62 @@ async def classify_state_change( ) write_result = "not_attempted" reconciliation_record_id = None + conflict = False if body.apply: - if ( + expected_status = ( + status_value(body.expected_current_status) + if body.expected_current_status is not None + else None + ) + if expected_status is not None and expected_status != current_status: + classification = _conflict( + f"cached task status changed from expected {expected_status!r} to {current_status!r}", + "refresh the dashboard and retry the state change if it is still intended", + ) + conflict = True + elif repo is not None and repo_path is None: + classification = _conflict( + "repo host path is unavailable for this State Hub host", + "register a host path for this machine or retry from a host with the repo checkout", + ) + conflict = True + elif ( classification.reconciliation_class == ReconciliationClass.WRITE_THROUGH and workplan_ref and actual_task_linked ): - patch_task_status(workplan_ref.path, task.id, target_status) - task.status = TaskStatus(target_status) - if body.blocking_reason is not None: - task.blocking_reason = body.blocking_reason - await session.commit() - write_result = "applied" - else: + file_status = status_value(task_block_status(workplan_ref.path, task.id)) + if file_status and file_status != current_status: + classification = _conflict( + f"workplan task status {file_status!r} differs from cached DB status {current_status!r}", + "run consistency repair or refresh State Hub from files before retrying", + ) + conflict = True + else: + try: + patch_task_status(workplan_ref.path, task.id, target_status) + patched_status = status_value(task_block_status(workplan_ref.path, task.id)) + except OSError as exc: + classification = _conflict( + f"workplan task write failed: {exc}", + "fix repo file access and retry the reconciliation", + ) + conflict = True + else: + if patched_status != target_status: + classification = _conflict( + f"workplan task block could not be patched to {target_status!r}", + "inspect the task block format before retrying", + ) + conflict = True + else: + task.status = TaskStatus(target_status) + if body.blocking_reason is not None: + task.blocking_reason = body.blocking_reason + await session.commit() + write_result = "applied" + + if write_result != "applied": msg = _deferred_message( body=body, current_status=current_status, @@ -208,6 +313,7 @@ async def classify_state_change( reason=classification.reason, follow_up=classification.follow_up, workplan_path=workplan_ref.relative_path if workplan_ref else None, + conflict=conflict, ) session.add(msg) await session.commit() @@ -230,4 +336,5 @@ async def classify_state_change( write_through_result=write_result, workplan_path=workplan_ref.relative_path if workplan_ref else None, reconciliation_record_id=reconciliation_record_id, + conflict=conflict, ) diff --git a/api/schemas/reconciliation.py b/api/schemas/reconciliation.py index 07ea096..702d5ef 100644 --- a/api/schemas/reconciliation.py +++ b/api/schemas/reconciliation.py @@ -15,6 +15,7 @@ class StateChangeRequest(BaseModel): target_status: str actor: str = "dashboard" intent: str | None = None + expected_current_status: str | None = None file_backed: bool | None = None archived_file: bool | None = None task_linked: bool | None = None @@ -40,3 +41,4 @@ class StateChangeResponse(BaseModel): write_through_result: Literal["not_attempted", "applied", "not_applicable"] = "not_attempted" workplan_path: str | None = None reconciliation_record_id: uuid.UUID | None = None + conflict: bool = False diff --git a/api/services/workplan_files.py b/api/services/workplan_files.py index 9130f59..3abd034 100644 --- a/api/services/workplan_files.py +++ b/api/services/workplan_files.py @@ -66,6 +66,19 @@ def task_block_linked(path: Path, task_id: uuid.UUID) -> bool: return _task_block_for_task(path, task_id) is not None +def workplan_status(path: Path) -> str | None: + status = _frontmatter(path).get("status") + return str(status).strip() if status is not None else None + + +def task_block_status(path: Path, task_id: uuid.UUID) -> str | None: + meta = _task_block_for_task(path, task_id) + if meta is None: + return None + status = meta.get("status") + return str(status).strip() if status is not None else None + + def patch_workplan_status(path: Path, status: str) -> bool: return _patch_frontmatter_field(path, "status", status) @@ -140,7 +153,10 @@ def _patch_frontmatter_field(path: Path, key: str, value: str) -> bool: def _task_block_for_task(path: Path, task_id: uuid.UUID) -> dict[str, Any] | None: - text = path.read_text(encoding="utf-8") + try: + text = path.read_text(encoding="utf-8") + except OSError: + return None for match in _TASK_BLOCK_RE.finditer(text): meta = _parse_task_block(match.group(1)) if str(meta.get("state_hub_task_id", "")).strip().strip('"') == str(task_id): diff --git a/dashboard/package.json b/dashboard/package.json index b5487b4..cbec21c 100644 --- a/dashboard/package.json +++ b/dashboard/package.json @@ -5,6 +5,7 @@ "type": "module", "scripts": { "dev": "observable preview", + "test": "node --test test/*.test.mjs", "build": "observable build", "clean": "rm -rf dist" }, diff --git a/dashboard/src/components/status-control.js b/dashboard/src/components/status-control.js index a6900a9..009df1e 100644 --- a/dashboard/src/components/status-control.js +++ b/dashboard/src/components/status-control.js @@ -65,7 +65,13 @@ async function readError(response) { } } -async function reconcileStatusChange({entity, type, nextStatus, blockingReason = null}) { +async function reconcileStatusChange({ + entity, + type, + currentStatus, + nextStatus, + blockingReason = null, +}) { const response = await apiFetch("/reconciliation/state-change", { method: "POST", headers: {"Content-Type": "application/json"}, @@ -75,6 +81,7 @@ async function reconcileStatusChange({entity, type, nextStatus, blockingReason = target_status: nextStatus, actor: "dashboard", intent: `${type} status change via dashboard`, + expected_current_status: currentStatus, blocking_reason: blockingReason, apply: true, }), @@ -84,6 +91,7 @@ async function reconcileStatusChange({entity, type, nextStatus, blockingReason = } function messageForReconciliation(result) { + if (result.conflict) return {text: "out of sync", kind: "review"}; if (result.write_through_result === "applied") return {text: "synced", kind: "ok"}; if (result.reconciliation_class === "human_confirmation") return {text: "needs review", kind: "review"}; if (result.reconciliation_class === "deferred") return {text: "queued", kind: ""}; @@ -155,7 +163,13 @@ export function statusControl({ select.disabled = true; setMessage("saving"); try { - const result = await reconcileStatusChange({entity, type, nextStatus, blockingReason}); + const result = await reconcileStatusChange({ + entity, + type, + currentStatus, + nextStatus, + blockingReason, + }); const messageResult = messageForReconciliation(result); if (result.write_through_result === "applied") { Object.assign(entity, {status: result.target_status}); diff --git a/dashboard/test/status-control.test.mjs b/dashboard/test/status-control.test.mjs new file mode 100644 index 0000000..1f035fc --- /dev/null +++ b/dashboard/test/status-control.test.mjs @@ -0,0 +1,165 @@ +import assert from "node:assert/strict"; +import test from "node:test"; + +class FakeClassList { + constructor() { + this.values = new Set(); + } + + toggle(name, enabled) { + if (enabled) { + this.values.add(name); + } else { + this.values.delete(name); + } + } +} + +class FakeElement { + constructor(tagName) { + this.tagName = tagName; + this.children = []; + this.listeners = {}; + this.classList = new FakeClassList(); + this.className = ""; + this.textContent = ""; + this.value = ""; + this.disabled = false; + } + + setAttribute(name, value) { + this[name] = value; + } + + append(...children) { + this.children.push(...children); + } + + addEventListener(type, listener) { + this.listeners[type] = listener; + } +} + +function installDom() { + const styles = new Map(); + globalThis.document = { + head: { + append(element) { + if (element.id) styles.set(element.id, element); + }, + }, + getElementById(id) { + return styles.get(id) ?? null; + }, + createElement(tagName) { + return new FakeElement(tagName); + }, + }; + globalThis.window = { + confirm: () => true, + prompt: () => "", + }; + globalThis.setTimeout = () => 0; +} + +installDom(); + +const {statusControl} = await import("../src/components/status-control.js"); + +function okResponse(overrides = {}) { + return { + target_type: "task", + target_id: "00000000-0000-0000-0000-000000000001", + actor: "dashboard", + current_status: "todo", + target_status: "in_progress", + file_backed: true, + archived_file: false, + task_linked: true, + reconciliation_class: "write_through", + reason: "task status can be represented in the workplan task block", + follow_up: "patch task block status and sync the DB from file", + write_through_result: "applied", + workplan_path: "workplans/STATE-WP-9999-demo.md", + reconciliation_record_id: null, + conflict: false, + ...overrides, + }; +} + +test("status control posts dashboard changes through reconciliation", async () => { + const requests = []; + globalThis.fetch = async (url, options) => { + requests.push({url, options, body: JSON.parse(options.body)}); + return { + ok: true, + json: async () => okResponse(), + }; + }; + + const entity = {id: "00000000-0000-0000-0000-000000000001", status: "todo"}; + let saved = null; + const root = statusControl({ + entity, + type: "task", + statuses: ["todo", "in_progress"], + onSaved: (updated, result) => { + saved = {updated, result}; + }, + }); + const [select, message] = root.children; + + select.value = "in_progress"; + await select.listeners.change(); + + assert.equal(requests.length, 1); + assert.equal(requests[0].url, "http://127.0.0.1:8000/reconciliation/state-change"); + assert.equal(requests[0].body.target_type, "task"); + assert.equal(requests[0].body.target_status, "in_progress"); + assert.equal(requests[0].body.expected_current_status, "todo"); + assert.equal(requests[0].body.apply, true); + assert.equal(entity.status, "in_progress"); + assert.equal(message.textContent, "synced"); + assert.equal(saved.result.write_through_result, "applied"); +}); + +test("status control keeps local state on reconciliation conflicts", async () => { + const requests = []; + globalThis.fetch = async (url, options) => { + requests.push({url, options, body: JSON.parse(options.body)}); + return { + ok: true, + json: async () => okResponse({ + current_status: "done", + target_status: "in_progress", + reconciliation_class: "deferred", + reason: "cached task status changed from expected 'todo' to 'done'", + follow_up: "refresh the dashboard and retry the state change if it is still intended", + write_through_result: "not_applicable", + reconciliation_record_id: "00000000-0000-0000-0000-000000000002", + conflict: true, + }), + }; + }; + + const entity = {id: "00000000-0000-0000-0000-000000000001", status: "todo"}; + let saved = null; + const root = statusControl({ + entity, + type: "task", + statuses: ["todo", "in_progress"], + onSaved: () => { + saved = true; + }, + }); + const [select, message] = root.children; + + select.value = "in_progress"; + await select.listeners.change(); + + assert.equal(requests.length, 1); + assert.equal(entity.status, "todo"); + assert.equal(select.value, "todo"); + assert.equal(message.textContent, "out of sync"); + assert.equal(saved, null); +}); diff --git a/tests/test_routers_core.py b/tests/test_routers_core.py index 0e07806..528a99a 100644 --- a/tests/test_routers_core.py +++ b/tests/test_routers_core.py @@ -647,3 +647,110 @@ class TestReconciliationEndpoints: messages = r.json() assert len(messages) == 1 assert "blocking reason" in messages[0]["body"] + + async def test_apply_workstream_stale_expected_status_creates_conflict_message(self, client, tmp_path): + await _create_domain(client) + repo_root = tmp_path / "repo" + workplans = repo_root / "workplans" + workplans.mkdir(parents=True) + repo = await _create_repo(client, local_path=repo_root) + topic = await _create_topic(client) + ws = await _create_workstream(client, topic["id"], repo_id=repo["id"]) + wp = workplans / "STATE-WP-9999-demo.md" + wp.write_text( + "---\n" + "id: STATE-WP-9999\n" + "type: workplan\n" + "title: Demo\n" + "domain: custodian\n" + "repo: state-hub\n" + "status: active\n" + f"state_hub_workstream_id: \"{ws['id']}\"\n" + "---\n", + encoding="utf-8", + ) + await client.patch(f"/workstreams/{ws['id']}", json={"status": "ready"}) + + r = await client.post("/reconciliation/state-change", json={ + "target_type": "workstream", + "target_id": ws["id"], + "target_status": "backlog", + "expected_current_status": "active", + "apply": True, + }) + + assert r.status_code == 200, r.text + body = r.json() + assert body["conflict"] is True + assert body["write_through_result"] == "not_applicable" + assert body["current_status"] == "ready" + assert "expected" in body["reason"] + assert "status: active" in wp.read_text(encoding="utf-8") + + r = await client.get(f"/workstreams/{ws['id']}") + assert r.json()["status"] == "ready" + + async def test_apply_workstream_file_status_drift_creates_conflict_message(self, client, tmp_path): + await _create_domain(client) + repo_root = tmp_path / "repo" + workplans = repo_root / "workplans" + workplans.mkdir(parents=True) + repo = await _create_repo(client, local_path=repo_root) + topic = await _create_topic(client) + ws = await _create_workstream(client, topic["id"], repo_id=repo["id"]) + wp = workplans / "STATE-WP-9999-demo.md" + wp.write_text( + "---\n" + "id: STATE-WP-9999\n" + "type: workplan\n" + "title: Demo\n" + "domain: custodian\n" + "repo: state-hub\n" + "status: ready\n" + f"state_hub_workstream_id: \"{ws['id']}\"\n" + "---\n", + encoding="utf-8", + ) + + r = await client.post("/reconciliation/state-change", json={ + "target_type": "workstream", + "target_id": ws["id"], + "target_status": "backlog", + "expected_current_status": "active", + "apply": True, + }) + + assert r.status_code == 200, r.text + body = r.json() + assert body["conflict"] is True + assert body["write_through_result"] == "not_applicable" + assert "differs from cached DB status" in body["reason"] + assert "status: ready" in wp.read_text(encoding="utf-8") + + r = await client.get(f"/workstreams/{ws['id']}") + assert r.json()["status"] == "active" + + async def test_apply_task_unavailable_host_path_creates_conflict_message(self, client, tmp_path): + await _create_domain(client) + repo = await _create_repo(client, local_path=tmp_path / "missing-repo") + topic = await _create_topic(client) + ws = await _create_workstream(client, topic["id"], repo_id=repo["id"]) + task = await _create_task(client, ws["id"]) + + r = await client.post("/reconciliation/state-change", json={ + "target_type": "task", + "target_id": task["id"], + "target_status": "in_progress", + "expected_current_status": "todo", + "apply": True, + }) + + assert r.status_code == 200, r.text + body = r.json() + assert body["conflict"] is True + assert body["write_through_result"] == "not_applicable" + assert "host path" in body["reason"] + assert body["reconciliation_record_id"] + + r = await client.get(f"/tasks/{task['id']}") + assert r.json()["status"] == "todo" diff --git a/workplans/STATE-WP-0048-ui-state-change-reconciliation.md b/workplans/STATE-WP-0048-ui-state-change-reconciliation.md index a6d08ec..7b90e98 100644 --- a/workplans/STATE-WP-0048-ui-state-change-reconciliation.md +++ b/workplans/STATE-WP-0048-ui-state-change-reconciliation.md @@ -4,7 +4,7 @@ type: workplan title: "UI State Change Reconciliation" domain: custodian repo: state-hub -status: active +status: finished owner: codex topic_slug: custodian planning_priority: high @@ -159,7 +159,7 @@ human-confirmation cases. ```task id: STATE-WP-0048-T06 -status: todo +status: done priority: high state_hub_task_id: "b1769ce0-de21-4faf-9db4-75ebc8506044" ``` @@ -170,11 +170,16 @@ missing, the workplan is archived, or the State Hub host path is unavailable. Done when reconciliation failures produce clear, actionable records instead of partially applied state. +Result 2026-05-23: apply-mode reconciliation now blocks stale UI requests, +repo file/cache drift, missing host-path access, and failed file patch attempts +before DB mutation. These cases create reconciliation messages with actionable +reasons and return `conflict: true` for dashboard display. + ## T07 - Tests And Consistency Integration ```task id: STATE-WP-0048-T07 -status: todo +status: done priority: high state_hub_task_id: "7d7e36e8-783d-494f-9691-5213e35c7539" ``` @@ -191,6 +196,12 @@ Progress 2026-05-23: routed dashboard status controls through the reconciliation API so UI-originated changes exercise the same write-through and deferred-record path as API clients. +Result 2026-05-23: added API coverage for stale expected status, workplan +file/cache drift, and unavailable host paths. Added a dependency-free dashboard +component test that verifies status controls post the reconciliation contract, +include `expected_current_status`, keep local state on conflicts, and surface +`out of sync` to the user. + ## Acceptance Criteria - Dashboard state changes never create silent DB/file divergence.