Harden reconciliation conflict handling

This commit is contained in:
2026-05-23 18:18:44 +02:00
parent 7831673820
commit 706b360736
8 changed files with 445 additions and 22 deletions

View File

@@ -14,6 +14,7 @@ from api.schemas.reconciliation import StateChangeRequest, StateChangeResponse
from api.services.lifecycle import status_value
from api.services.reconciliation import (
ReconciliationClass,
StateChangeClassification,
classify_task_status_change,
classify_workstream_status_change,
)
@@ -21,7 +22,10 @@ from api.services.workplan_files import (
find_workplan_for_workstream,
patch_task_status,
patch_workplan_status,
resolve_repo_path,
task_block_status,
task_block_linked,
workplan_status,
)
from api.workplan_status import normalize_workstream_status
@@ -32,6 +36,14 @@ def _bool_or_default(value: bool | None, default: bool) -> bool:
return default if value is None else value
def _conflict(reason: str, follow_up: str) -> StateChangeClassification:
return StateChangeClassification(
ReconciliationClass.DEFERRED,
reason,
follow_up,
)
async def _workstream_tasks_terminal(session: AsyncSession, workstream_id: uuid.UUID) -> bool:
result = await session.execute(select(Task.status).where(Task.workstream_id == workstream_id))
statuses = [status_value(row[0]) for row in result.all()]
@@ -47,6 +59,7 @@ def _deferred_message(
reason: str,
follow_up: str,
workplan_path: str | None,
conflict: bool = False,
) -> AgentMessage:
subject = f"Reconcile {body.target_type} state change: {current_status} -> {target_status}"
lines = [
@@ -56,9 +69,11 @@ def _deferred_message(
f"target_id: {body.target_id}",
f"actor: {body.actor}",
f"intent: {body.intent or ''}",
f"expected_current_status: {body.expected_current_status or ''}",
f"current_status: {current_status}",
f"target_status: {target_status}",
f"reconciliation_class: {classification.value}",
f"conflict: {str(conflict).lower()}",
f"reason: {reason}",
f"follow_up: {follow_up}",
f"workplan_path: {workplan_path or ''}",
@@ -82,7 +97,8 @@ async def classify_state_change(
raise HTTPException(status_code=404, detail="Workstream not found")
repo = await session.get(ManagedRepo, ws.repo_id) if ws.repo_id else None
workplan_ref = find_workplan_for_workstream(repo, ws.id)
repo_path = resolve_repo_path(repo)
workplan_ref = find_workplan_for_workstream(repo, ws.id) if repo_path else None
actual_file_backed = workplan_ref is not None
actual_archived_file = bool(workplan_ref and workplan_ref.archived)
file_backed = (
@@ -111,13 +127,56 @@ async def classify_state_change(
)
write_result = "not_attempted"
reconciliation_record_id = None
conflict = False
if body.apply:
if classification.reconciliation_class == ReconciliationClass.WRITE_THROUGH and workplan_ref:
patch_workplan_status(workplan_ref.path, target_status)
ws.status = target_status
await session.commit()
write_result = "applied"
else:
expected_status = (
normalize_workstream_status(body.expected_current_status)
if body.expected_current_status is not None
else None
)
if expected_status is not None and expected_status != current_status:
classification = _conflict(
f"cached workstream status changed from expected {expected_status!r} to {current_status!r}",
"refresh the dashboard and retry the state change if it is still intended",
)
conflict = True
elif repo is not None and repo_path is None:
classification = _conflict(
"repo host path is unavailable for this State Hub host",
"register a host path for this machine or retry from a host with the repo checkout",
)
conflict = True
elif classification.reconciliation_class == ReconciliationClass.WRITE_THROUGH and workplan_ref:
file_status = normalize_workstream_status(workplan_status(workplan_ref.path))
if file_status and file_status != current_status:
classification = _conflict(
f"workplan file status {file_status!r} differs from cached DB status {current_status!r}",
"run consistency repair or refresh State Hub from files before retrying",
)
conflict = True
else:
try:
patch_workplan_status(workplan_ref.path, target_status)
patched_status = normalize_workstream_status(workplan_status(workplan_ref.path))
except OSError as exc:
classification = _conflict(
f"workplan file write failed: {exc}",
"fix repo file access and retry the reconciliation",
)
conflict = True
else:
if patched_status != target_status:
classification = _conflict(
f"workplan file status could not be patched to {target_status!r}",
"inspect the workplan frontmatter format before retrying",
)
conflict = True
else:
ws.status = target_status
await session.commit()
write_result = "applied"
if write_result != "applied":
msg = _deferred_message(
body=body,
current_status=current_status,
@@ -126,6 +185,7 @@ async def classify_state_change(
reason=classification.reason,
follow_up=classification.follow_up,
workplan_path=workplan_ref.relative_path if workplan_ref else None,
conflict=conflict,
)
session.add(msg)
await session.commit()
@@ -148,6 +208,7 @@ async def classify_state_change(
write_through_result=write_result,
workplan_path=workplan_ref.relative_path if workplan_ref else None,
reconciliation_record_id=reconciliation_record_id,
conflict=conflict,
)
task = await session.get(Task, body.target_id)
@@ -156,7 +217,8 @@ async def classify_state_change(
ws = await session.get(Workstream, task.workstream_id)
repo = await session.get(ManagedRepo, ws.repo_id) if ws and ws.repo_id else None
workplan_ref = find_workplan_for_workstream(repo, ws.id) if ws else None
repo_path = resolve_repo_path(repo)
workplan_ref = find_workplan_for_workstream(repo, ws.id) if ws and repo_path else None
actual_file_backed = workplan_ref is not None
actual_archived_file = bool(workplan_ref and workplan_ref.archived)
file_backed = (
@@ -187,19 +249,62 @@ async def classify_state_change(
)
write_result = "not_attempted"
reconciliation_record_id = None
conflict = False
if body.apply:
if (
expected_status = (
status_value(body.expected_current_status)
if body.expected_current_status is not None
else None
)
if expected_status is not None and expected_status != current_status:
classification = _conflict(
f"cached task status changed from expected {expected_status!r} to {current_status!r}",
"refresh the dashboard and retry the state change if it is still intended",
)
conflict = True
elif repo is not None and repo_path is None:
classification = _conflict(
"repo host path is unavailable for this State Hub host",
"register a host path for this machine or retry from a host with the repo checkout",
)
conflict = True
elif (
classification.reconciliation_class == ReconciliationClass.WRITE_THROUGH
and workplan_ref
and actual_task_linked
):
patch_task_status(workplan_ref.path, task.id, target_status)
task.status = TaskStatus(target_status)
if body.blocking_reason is not None:
task.blocking_reason = body.blocking_reason
await session.commit()
write_result = "applied"
else:
file_status = status_value(task_block_status(workplan_ref.path, task.id))
if file_status and file_status != current_status:
classification = _conflict(
f"workplan task status {file_status!r} differs from cached DB status {current_status!r}",
"run consistency repair or refresh State Hub from files before retrying",
)
conflict = True
else:
try:
patch_task_status(workplan_ref.path, task.id, target_status)
patched_status = status_value(task_block_status(workplan_ref.path, task.id))
except OSError as exc:
classification = _conflict(
f"workplan task write failed: {exc}",
"fix repo file access and retry the reconciliation",
)
conflict = True
else:
if patched_status != target_status:
classification = _conflict(
f"workplan task block could not be patched to {target_status!r}",
"inspect the task block format before retrying",
)
conflict = True
else:
task.status = TaskStatus(target_status)
if body.blocking_reason is not None:
task.blocking_reason = body.blocking_reason
await session.commit()
write_result = "applied"
if write_result != "applied":
msg = _deferred_message(
body=body,
current_status=current_status,
@@ -208,6 +313,7 @@ async def classify_state_change(
reason=classification.reason,
follow_up=classification.follow_up,
workplan_path=workplan_ref.relative_path if workplan_ref else None,
conflict=conflict,
)
session.add(msg)
await session.commit()
@@ -230,4 +336,5 @@ async def classify_state_change(
write_through_result=write_result,
workplan_path=workplan_ref.relative_path if workplan_ref else None,
reconciliation_record_id=reconciliation_record_id,
conflict=conflict,
)

View File

@@ -15,6 +15,7 @@ class StateChangeRequest(BaseModel):
target_status: str
actor: str = "dashboard"
intent: str | None = None
expected_current_status: str | None = None
file_backed: bool | None = None
archived_file: bool | None = None
task_linked: bool | None = None
@@ -40,3 +41,4 @@ class StateChangeResponse(BaseModel):
write_through_result: Literal["not_attempted", "applied", "not_applicable"] = "not_attempted"
workplan_path: str | None = None
reconciliation_record_id: uuid.UUID | None = None
conflict: bool = False

View File

@@ -66,6 +66,19 @@ def task_block_linked(path: Path, task_id: uuid.UUID) -> bool:
return _task_block_for_task(path, task_id) is not None
def workplan_status(path: Path) -> str | None:
status = _frontmatter(path).get("status")
return str(status).strip() if status is not None else None
def task_block_status(path: Path, task_id: uuid.UUID) -> str | None:
meta = _task_block_for_task(path, task_id)
if meta is None:
return None
status = meta.get("status")
return str(status).strip() if status is not None else None
def patch_workplan_status(path: Path, status: str) -> bool:
return _patch_frontmatter_field(path, "status", status)
@@ -140,7 +153,10 @@ def _patch_frontmatter_field(path: Path, key: str, value: str) -> bool:
def _task_block_for_task(path: Path, task_id: uuid.UUID) -> dict[str, Any] | None:
text = path.read_text(encoding="utf-8")
try:
text = path.read_text(encoding="utf-8")
except OSError:
return None
for match in _TASK_BLOCK_RE.finditer(text):
meta = _parse_task_block(match.group(1))
if str(meta.get("state_hub_task_id", "")).strip().strip('"') == str(task_id):

View File

@@ -5,6 +5,7 @@
"type": "module",
"scripts": {
"dev": "observable preview",
"test": "node --test test/*.test.mjs",
"build": "observable build",
"clean": "rm -rf dist"
},

View File

@@ -65,7 +65,13 @@ async function readError(response) {
}
}
async function reconcileStatusChange({entity, type, nextStatus, blockingReason = null}) {
async function reconcileStatusChange({
entity,
type,
currentStatus,
nextStatus,
blockingReason = null,
}) {
const response = await apiFetch("/reconciliation/state-change", {
method: "POST",
headers: {"Content-Type": "application/json"},
@@ -75,6 +81,7 @@ async function reconcileStatusChange({entity, type, nextStatus, blockingReason =
target_status: nextStatus,
actor: "dashboard",
intent: `${type} status change via dashboard`,
expected_current_status: currentStatus,
blocking_reason: blockingReason,
apply: true,
}),
@@ -84,6 +91,7 @@ async function reconcileStatusChange({entity, type, nextStatus, blockingReason =
}
function messageForReconciliation(result) {
if (result.conflict) return {text: "out of sync", kind: "review"};
if (result.write_through_result === "applied") return {text: "synced", kind: "ok"};
if (result.reconciliation_class === "human_confirmation") return {text: "needs review", kind: "review"};
if (result.reconciliation_class === "deferred") return {text: "queued", kind: ""};
@@ -155,7 +163,13 @@ export function statusControl({
select.disabled = true;
setMessage("saving");
try {
const result = await reconcileStatusChange({entity, type, nextStatus, blockingReason});
const result = await reconcileStatusChange({
entity,
type,
currentStatus,
nextStatus,
blockingReason,
});
const messageResult = messageForReconciliation(result);
if (result.write_through_result === "applied") {
Object.assign(entity, {status: result.target_status});

View File

@@ -0,0 +1,165 @@
import assert from "node:assert/strict";
import test from "node:test";
class FakeClassList {
constructor() {
this.values = new Set();
}
toggle(name, enabled) {
if (enabled) {
this.values.add(name);
} else {
this.values.delete(name);
}
}
}
class FakeElement {
constructor(tagName) {
this.tagName = tagName;
this.children = [];
this.listeners = {};
this.classList = new FakeClassList();
this.className = "";
this.textContent = "";
this.value = "";
this.disabled = false;
}
setAttribute(name, value) {
this[name] = value;
}
append(...children) {
this.children.push(...children);
}
addEventListener(type, listener) {
this.listeners[type] = listener;
}
}
function installDom() {
const styles = new Map();
globalThis.document = {
head: {
append(element) {
if (element.id) styles.set(element.id, element);
},
},
getElementById(id) {
return styles.get(id) ?? null;
},
createElement(tagName) {
return new FakeElement(tagName);
},
};
globalThis.window = {
confirm: () => true,
prompt: () => "",
};
globalThis.setTimeout = () => 0;
}
installDom();
const {statusControl} = await import("../src/components/status-control.js");
function okResponse(overrides = {}) {
return {
target_type: "task",
target_id: "00000000-0000-0000-0000-000000000001",
actor: "dashboard",
current_status: "todo",
target_status: "in_progress",
file_backed: true,
archived_file: false,
task_linked: true,
reconciliation_class: "write_through",
reason: "task status can be represented in the workplan task block",
follow_up: "patch task block status and sync the DB from file",
write_through_result: "applied",
workplan_path: "workplans/STATE-WP-9999-demo.md",
reconciliation_record_id: null,
conflict: false,
...overrides,
};
}
test("status control posts dashboard changes through reconciliation", async () => {
const requests = [];
globalThis.fetch = async (url, options) => {
requests.push({url, options, body: JSON.parse(options.body)});
return {
ok: true,
json: async () => okResponse(),
};
};
const entity = {id: "00000000-0000-0000-0000-000000000001", status: "todo"};
let saved = null;
const root = statusControl({
entity,
type: "task",
statuses: ["todo", "in_progress"],
onSaved: (updated, result) => {
saved = {updated, result};
},
});
const [select, message] = root.children;
select.value = "in_progress";
await select.listeners.change();
assert.equal(requests.length, 1);
assert.equal(requests[0].url, "http://127.0.0.1:8000/reconciliation/state-change");
assert.equal(requests[0].body.target_type, "task");
assert.equal(requests[0].body.target_status, "in_progress");
assert.equal(requests[0].body.expected_current_status, "todo");
assert.equal(requests[0].body.apply, true);
assert.equal(entity.status, "in_progress");
assert.equal(message.textContent, "synced");
assert.equal(saved.result.write_through_result, "applied");
});
test("status control keeps local state on reconciliation conflicts", async () => {
const requests = [];
globalThis.fetch = async (url, options) => {
requests.push({url, options, body: JSON.parse(options.body)});
return {
ok: true,
json: async () => okResponse({
current_status: "done",
target_status: "in_progress",
reconciliation_class: "deferred",
reason: "cached task status changed from expected 'todo' to 'done'",
follow_up: "refresh the dashboard and retry the state change if it is still intended",
write_through_result: "not_applicable",
reconciliation_record_id: "00000000-0000-0000-0000-000000000002",
conflict: true,
}),
};
};
const entity = {id: "00000000-0000-0000-0000-000000000001", status: "todo"};
let saved = null;
const root = statusControl({
entity,
type: "task",
statuses: ["todo", "in_progress"],
onSaved: () => {
saved = true;
},
});
const [select, message] = root.children;
select.value = "in_progress";
await select.listeners.change();
assert.equal(requests.length, 1);
assert.equal(entity.status, "todo");
assert.equal(select.value, "todo");
assert.equal(message.textContent, "out of sync");
assert.equal(saved, null);
});

View File

@@ -647,3 +647,110 @@ class TestReconciliationEndpoints:
messages = r.json()
assert len(messages) == 1
assert "blocking reason" in messages[0]["body"]
async def test_apply_workstream_stale_expected_status_creates_conflict_message(self, client, tmp_path):
await _create_domain(client)
repo_root = tmp_path / "repo"
workplans = repo_root / "workplans"
workplans.mkdir(parents=True)
repo = await _create_repo(client, local_path=repo_root)
topic = await _create_topic(client)
ws = await _create_workstream(client, topic["id"], repo_id=repo["id"])
wp = workplans / "STATE-WP-9999-demo.md"
wp.write_text(
"---\n"
"id: STATE-WP-9999\n"
"type: workplan\n"
"title: Demo\n"
"domain: custodian\n"
"repo: state-hub\n"
"status: active\n"
f"state_hub_workstream_id: \"{ws['id']}\"\n"
"---\n",
encoding="utf-8",
)
await client.patch(f"/workstreams/{ws['id']}", json={"status": "ready"})
r = await client.post("/reconciliation/state-change", json={
"target_type": "workstream",
"target_id": ws["id"],
"target_status": "backlog",
"expected_current_status": "active",
"apply": True,
})
assert r.status_code == 200, r.text
body = r.json()
assert body["conflict"] is True
assert body["write_through_result"] == "not_applicable"
assert body["current_status"] == "ready"
assert "expected" in body["reason"]
assert "status: active" in wp.read_text(encoding="utf-8")
r = await client.get(f"/workstreams/{ws['id']}")
assert r.json()["status"] == "ready"
async def test_apply_workstream_file_status_drift_creates_conflict_message(self, client, tmp_path):
await _create_domain(client)
repo_root = tmp_path / "repo"
workplans = repo_root / "workplans"
workplans.mkdir(parents=True)
repo = await _create_repo(client, local_path=repo_root)
topic = await _create_topic(client)
ws = await _create_workstream(client, topic["id"], repo_id=repo["id"])
wp = workplans / "STATE-WP-9999-demo.md"
wp.write_text(
"---\n"
"id: STATE-WP-9999\n"
"type: workplan\n"
"title: Demo\n"
"domain: custodian\n"
"repo: state-hub\n"
"status: ready\n"
f"state_hub_workstream_id: \"{ws['id']}\"\n"
"---\n",
encoding="utf-8",
)
r = await client.post("/reconciliation/state-change", json={
"target_type": "workstream",
"target_id": ws["id"],
"target_status": "backlog",
"expected_current_status": "active",
"apply": True,
})
assert r.status_code == 200, r.text
body = r.json()
assert body["conflict"] is True
assert body["write_through_result"] == "not_applicable"
assert "differs from cached DB status" in body["reason"]
assert "status: ready" in wp.read_text(encoding="utf-8")
r = await client.get(f"/workstreams/{ws['id']}")
assert r.json()["status"] == "active"
async def test_apply_task_unavailable_host_path_creates_conflict_message(self, client, tmp_path):
await _create_domain(client)
repo = await _create_repo(client, local_path=tmp_path / "missing-repo")
topic = await _create_topic(client)
ws = await _create_workstream(client, topic["id"], repo_id=repo["id"])
task = await _create_task(client, ws["id"])
r = await client.post("/reconciliation/state-change", json={
"target_type": "task",
"target_id": task["id"],
"target_status": "in_progress",
"expected_current_status": "todo",
"apply": True,
})
assert r.status_code == 200, r.text
body = r.json()
assert body["conflict"] is True
assert body["write_through_result"] == "not_applicable"
assert "host path" in body["reason"]
assert body["reconciliation_record_id"]
r = await client.get(f"/tasks/{task['id']}")
assert r.json()["status"] == "todo"

View File

@@ -4,7 +4,7 @@ type: workplan
title: "UI State Change Reconciliation"
domain: custodian
repo: state-hub
status: active
status: finished
owner: codex
topic_slug: custodian
planning_priority: high
@@ -159,7 +159,7 @@ human-confirmation cases.
```task
id: STATE-WP-0048-T06
status: todo
status: done
priority: high
state_hub_task_id: "b1769ce0-de21-4faf-9db4-75ebc8506044"
```
@@ -170,11 +170,16 @@ missing, the workplan is archived, or the State Hub host path is unavailable.
Done when reconciliation failures produce clear, actionable records instead of
partially applied state.
Result 2026-05-23: apply-mode reconciliation now blocks stale UI requests,
repo file/cache drift, missing host-path access, and failed file patch attempts
before DB mutation. These cases create reconciliation messages with actionable
reasons and return `conflict: true` for dashboard display.
## T07 - Tests And Consistency Integration
```task
id: STATE-WP-0048-T07
status: todo
status: done
priority: high
state_hub_task_id: "7d7e36e8-783d-494f-9691-5213e35c7539"
```
@@ -191,6 +196,12 @@ Progress 2026-05-23: routed dashboard status controls through the
reconciliation API so UI-originated changes exercise the same write-through and
deferred-record path as API clients.
Result 2026-05-23: added API coverage for stale expected status, workplan
file/cache drift, and unavailable host paths. Added a dependency-free dashboard
component test that verifies status controls post the reconciliation contract,
include `expected_current_status`, keep local state on conflicts, and surface
`out of sync` to the user.
## Acceptance Criteria
- Dashboard state changes never create silent DB/file divergence.