chore: add local consistency sync cli

This commit is contained in:
2026-07-02 00:15:16 +02:00
parent 1f61008837
commit a361ce8731
15 changed files with 422 additions and 33 deletions

View File

@@ -92,12 +92,12 @@ curl -s -X PATCH "http://127.0.0.1:8000/tasks/<task_id>" \
**Close:**
1. Update workplan file task statuses to reflect progress
2. Log: `POST /progress/` with a summary of what changed
3. Note for the custodian operator: after workplan file changes, run from
`~/state-hub`:
3. After workplan file changes, run:
```bash
make fix-consistency REPO=state-hub
statehub fix-consistency
```
This syncs task status from files into the hub DB.
Coding agents should run this directly; ask the operator only if the CLI or
State Hub API is unavailable. This syncs task status from files into the hub DB.
---
@@ -215,5 +215,5 @@ Status progression: `todo` → `progress` → `done`; use `wait` for waiting/blo
To create a new workplan:
1. Write the file following the format above
2. Notify the custodian operator to run `make fix-consistency REPO=state-hub`
(or send a message to the hub agent via `POST /messages/`)
2. Run `statehub fix-consistency` locally; ask the operator only if the CLI or
State Hub API is unavailable.

View File

@@ -365,6 +365,57 @@ def cmd_ingest_sbom(args: argparse.Namespace) -> None:
sys.exit(result.returncode)
def cmd_fix_consistency(args: argparse.Namespace) -> None:
"""Run ADR-001 consistency repair from any registered repo checkout."""
checker = STATE_HUB_DIR / "scripts" / "consistency_check.py"
if not checker.exists():
print(f"ERROR: consistency checker not found at {checker}")
print(" Run this command from an editable state-hub install or the state-hub repo.")
sys.exit(1)
if args.remote and not (args.repo or args.all):
print("ERROR: --remote requires --repo or --all.")
print(" From a local checkout, run: statehub fix-consistency")
print(" For pull-before-fix, run: statehub fix-consistency --repo <slug> --remote")
sys.exit(1)
cmd = [sys.executable, str(checker)]
if args.all:
cmd.append("--all")
elif args.repo:
cmd.extend(["--repo", args.repo])
if args.repo_path:
cmd.extend(["--repo-path", str(Path(args.repo_path).expanduser().resolve())])
else:
cmd.append("--here")
if args.path:
cmd.append(str(Path(args.path).expanduser().resolve()))
cmd.append("--fix")
if args.remote:
cmd.append("--remote")
if args.no_writeback:
cmd.append("--no-writeback")
if args.archive_closed:
cmd.append("--archive-closed")
if args.archive_workplan:
cmd.extend(["--archive-workplan", args.archive_workplan])
if args.archive_date:
cmd.extend(["--archive-date", args.archive_date])
if args.api_base:
cmd.extend(["--api-base", args.api_base])
if args.as_json:
cmd.append("--json")
if args.max_seconds is not None:
cmd.extend(["--max-seconds", str(args.max_seconds)])
result = subprocess.run(cmd)
exit_code = result.returncode
if exit_code == 2 and not args.strict_warnings:
exit_code = 0
sys.exit(exit_code)
def cmd_create_workstream(args: argparse.Namespace) -> None:
"""Create a workstream under a domain's topic."""
_api_get("/state/health")
@@ -582,6 +633,30 @@ def main() -> None:
ing.add_argument("--slug", default=None, help="Repo slug (auto-detected from path if omitted)")
ing.add_argument("--dry-run", action="store_true", help="Parse lockfiles but do not submit to API")
# fix-consistency
fix = sub.add_parser(
"fix-consistency",
help="Reconcile workplan files with State Hub from the current repo",
)
target = fix.add_mutually_exclusive_group()
target.add_argument("--repo", default=None, help="Registered repo slug; defaults to inferring from --path")
target.add_argument("--all", action="store_true", help="Fix all registered repos with a visible path")
fix.add_argument("--path", default=os.getcwd(), help="Repo checkout to infer from (defaults to cwd)")
fix.add_argument("--repo-path", default=None, help="Override repo path when using --repo")
fix.add_argument("--remote", action="store_true", help="Pull before fixing; requires --repo or --all")
fix.add_argument("--max-seconds", type=int, default=None, help="Wall-clock budget for --remote --all")
fix.add_argument("--no-writeback", action="store_true", help="Disable DB-to-file status writeback")
fix.add_argument("--archive-closed", action="store_true", help="Archive closed root workplans after fixing")
fix.add_argument("--archive-workplan", default=None, help="Archive only the matching workplan id or filename")
fix.add_argument("--archive-date", default=None, help="YYMMDD archive prefix for --archive-closed")
fix.add_argument("--api-base", default=API_BASE, help="State Hub API base URL")
fix.add_argument("--json", action="store_true", dest="as_json", help="Output JSON from the checker")
fix.add_argument(
"--strict-warnings",
action="store_true",
help="Preserve checker exit code 2 for warnings-only runs",
)
# create-workstream
cws = sub.add_parser("create-workstream", help="Create a workstream under a domain topic")
cws.add_argument("--domain", required=True, help="Domain slug to create the workstream under")
@@ -644,6 +719,8 @@ def main() -> None:
cmd_register(args)
elif args.command == "ingest-sbom":
cmd_ingest_sbom(args)
elif args.command == "fix-consistency":
cmd_fix_consistency(args)
elif args.command == "create-workstream":
cmd_create_workstream(args)
elif args.command == "create-task":

View File

@@ -85,5 +85,5 @@ Use the **Add Repo** form or:
# 1. Author classification file in the repo
# 2. Register / reclassify
make register-from-classification PATH=/path/to/repo
make fix-consistency REPO=<slug>
statehub fix-consistency
```

View File

@@ -82,7 +82,7 @@ Invalidation). The practical consequence:
| `uv.lock`, `package-lock.json`, etc. | SBOM entries + licence risk | `make ingest-sbom REPO=` |
| `tpsc.yaml` | Third-party service declarations + GDPR warnings | `make ingest-tpsc REPO=` |
| `SCOPE.md` capability blocks | Capability catalog | `make ingest-capabilities REPO=` |
| `workplans/*.md` | Workstream + task status | `make fix-consistency REPO=` |
| `workplans/*.md` | Workstream + task status | `statehub fix-consistency` |
| Repo files + DB records | DoI compliance tier | Fingerprint cache, auto-refreshed on read |
---

View File

@@ -17,7 +17,7 @@ keeps the underlying scripts; only the *scheduling* moves.
| - | ------------------- | -------------------------------------------------------- | -------------------------------------------------------- | -------------------------------------------------------------------------------------------------- |
| 1 | activity-core cron | every 15 min (Railiance01) | `POST /consistency/sweep/remote-all``consistency_check.py --remote --all` | Pull every registered repo, reconcile workplan files ↔ DB, run C-15 writeback + C-16 pull gate |
| 2 | manual / daily cron | `make cleanup-stale` (suggested `0 3 * * *`) | `scripts/cleanup_stale_tasks.py` | Cancel tasks still open in finished/archived workstreams; emits `org.statehub.task.stale` |
| 3 | git post-commit | every commit in a registered repo | `make fix-consistency REPO=<slug>` | Per-repo workplan ↔ DB sync immediately after a commit |
| 3 | git post-commit | every commit in a registered repo | `statehub fix-consistency` | Per-repo workplan ↔ DB sync immediately after a commit |
Honourable mentions (not currently scheduled, on-demand only — listed for
completeness so they don't get mistakenly picked up):

View File

@@ -90,6 +90,6 @@ or secret-looking JSON fields. Payloads over 64 KiB are rejected.
2. Run statehub outbox status on each host that may have queued writes.
3. Run statehub outbox replay until no due queued envelopes remain.
4. Review conflict envelopes manually.
5. Run make fix-consistency REPO=state-hub so file-backed workplan/task state
5. Run `statehub fix-consistency` so file-backed workplan/task state
remains canonical after replay.
6. Record a progress note with non-secret replay counts.

View File

@@ -31,7 +31,15 @@ build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["api", "mcp_server", "task_flow_engine"]
artifacts = ["custodian_cli.py", "statehub_register.py"]
artifacts = [
"custodian_cli.py",
"statehub_register.py",
"scripts/consistency_check.py",
"scripts/repo_sync.py",
"scripts/mcp_registration.py",
"scripts/project_claude_md.template",
"scripts/project_rules/*.template",
]
[tool.uv.sources]
llm-connect = { path = "/home/worsch/llm-connect", editable = true }

View File

@@ -16,7 +16,7 @@ Checks:
C-09 workstream-repo-mismatch FAIL Yes DB workstream repo_id != file location
C-10 task-status-drift WARN Yes Task status differs between file and DB
C-11 task-unlinked WARN Yes Task block has no state_hub_task_id
C-12 orphan-db-task WARN No DB task in workstream has no file backing
C-12 orphan-db-task WARN Yes DB task in workstream has no file backing unless terminal in a closed workstream
C-13 workstream-auto-complete WARN Yes All DB tasks done but workstream still active
C-14 ghost-duplicate WARN No Active topic workstream with no repo_id matches a file-backed title — probable ghost from premature create_workstream() call
C-15 task-db-ahead WARN Yes DB task status is ahead of file — regression prevented; writeback syncs file
@@ -1204,9 +1204,14 @@ def check_repo(api_base: str, repo_slug: str, repo_path_override: str | None = N
ws_finished = normalise_workstream_status(ws_status) in CLOSED_WORKSTREAM_STATUSES
for db_t in db_tasks:
if db_t["id"] not in file_task_sh_ids:
db_t_status = db_t.get("status", "")
db_t_status = normalise_task_status(db_t.get("status", "todo"))
open_task = db_t_status not in TERMINAL_TASK_STATUSES
# Auto-cancel fixable when workstream is finished and task is open
# Closed workstreams can legitimately retain terminal historical
# DB tasks from earlier duplicates. The public task DELETE route
# is a cancel operation, so these are not further actionable.
if ws_finished and not open_task:
continue
# Auto-cancel fixable when workstream is finished and task is open.
fixable = ws_finished and open_task
report.add(
severity="WARN", check_id="C-12",

View File

@@ -7,8 +7,9 @@
# ./install_hooks.sh --repo <slug> --remove # remove hook from one repo
# ./install_hooks.sh --all --remove # remove hook from all repos
#
# The hook runs `make fix-consistency REPO=<slug>` in the state-hub after each
# commit, keeping the hub in sync with workplan file changes automatically.
# The hook runs `statehub fix-consistency --repo <slug>` after each commit,
# keeping the hub in sync with workplan file changes automatically. It falls
# back to the state-hub Make target when the CLI is not installed.
#
# Idempotent: the hook block is guarded by a marker comment. Running twice is safe.
@@ -79,7 +80,11 @@ install_hook() {
hook_block=$(cat <<BLOCK
${MARKER} — managed by custodian, do not edit this block
if curl -sf ${API_BASE}/state/health >/dev/null 2>&1; then
(cd "${STATEHUB_DIR}" && make fix-consistency REPO=${slug} >/dev/null 2>&1 &)
if command -v statehub >/dev/null 2>&1; then
(cd "${repo_path}" && statehub fix-consistency --repo ${slug} >/dev/null 2>&1 &)
else
(cd "${STATEHUB_DIR}" && make fix-consistency REPO=${slug} >/dev/null 2>&1 &)
fi
fi
${MARKER}-end
BLOCK

View File

@@ -98,12 +98,12 @@ curl -s -X PATCH "http://127.0.0.1:8000/tasks/<task_id>" \
**Close:**
1. Update workplan file task statuses to reflect progress
2. Log: `POST /progress/` with a summary of what changed
3. Note for the custodian operator: after workplan file changes, run from
`~/state-hub`:
3. After workplan file changes, run:
```bash
make fix-consistency REPO={REPO_SLUG}
statehub fix-consistency
```
This syncs task status from files into the hub DB.
Coding agents should run this directly; ask the operator only if the CLI or
State Hub API is unavailable. This syncs task status from files into the hub DB.
---
@@ -172,5 +172,5 @@ Status progression: `todo` → `progress` → `done`; use `wait` for waiting/blo
To create a new workplan:
1. Write the file following the format above
2. Notify the custodian operator to run `make fix-consistency REPO={REPO_SLUG}`
(or send a message to the hub agent via `POST /messages/`)
2. Run `statehub fix-consistency` locally; ask the operator only if the CLI or
State Hub API is unavailable.

View File

@@ -70,15 +70,16 @@ curl -s -X POST http://127.0.0.1:8000/progress/ \
-H "Content-Type: application/json" \
-d '{"topic_id":"{TOPIC_ID}","workstream_id":"<uuid>","event_type":"note","summary":"what changed","author":"codex"}'
```
If workplan files were modified, ensure the local copy is up to date first:
If workplan files were modified, ensure the local copy is up to date first,
then sync from the repo checkout:
```bash
git -C <repo_path> pull --ff-only
cd ~/state-hub && make fix-consistency REPO={REPO_SLUG}
git pull --ff-only
statehub fix-consistency
```
For repos where implementation runs on a remote machine (e.g. CoulombCore),
use the combined target which pulls before fixing:
use the pull-before-fix mode from any shell with the State Hub CLI:
```bash
cd ~/state-hub && make fix-consistency-remote REPO={REPO_SLUG}
statehub fix-consistency --repo {REPO_SLUG} --remote
```
**C-15** (DB task ahead of file) is normal in multi-machine workflows — writeback
will sync the file to match DB. **C-16** (repo behind remote) blocks all writes

View File

@@ -132,8 +132,8 @@ def run_register(args: argparse.Namespace) -> None:
print(f" Repo ID: {repo.get('id', '(existing)') if isinstance(repo, dict) else '(unknown)'}")
print()
print("Next:")
print(f" cd {STATE_HUB_DIR}")
print(f" make fix-consistency REPO={repo_slug}")
print(f" statehub fix-consistency --repo {repo_slug}")
print(" # or from the repo checkout: statehub fix-consistency")
def collect_repo_snapshot(project_path: Path) -> RepoSnapshot:
@@ -584,10 +584,11 @@ priority: medium
```
Create the first implementation workplan for the repository's most important
next change. After workplan file updates, run from `~/state-hub`:
next change. After workplan file updates, run the sync locally from this repo
checkout:
```bash
make fix-consistency REPO={repo_slug}
statehub fix-consistency
```
"""
)

View File

@@ -1016,6 +1016,124 @@ class TestLifecycleRenormalization:
class TestC12OrphanDbTasks:
def _make_repo(self, tmp_path: Path, status: str = "finished") -> Path:
repo = tmp_path / "repo"
workplans = repo / "workplans"
workplans.mkdir(parents=True)
(workplans / "STATE-WP-0001-demo.md").write_text(
"---\n"
"id: STATE-WP-0001\n"
"type: workplan\n"
"title: Demo\n"
"domain: infotech\n"
"repo: state-hub\n"
f"status: {status}\n"
"owner: codex\n"
"state_hub_workstream_id: \"ws-1\"\n"
"---\n\n"
"## Keep Task\n\n"
"```task\n"
"id: STATE-WP-0001-T01\n"
"status: done\n"
"priority: high\n"
"state_hub_task_id: \"task-linked\"\n"
"```\n",
encoding="utf-8",
)
return repo
def _api_get_for_repo(self, repo: Path, orphan_status: str):
ws = {
"id": "ws-1",
"repo_id": "repo-1",
"topic_id": "topic-1",
"slug": "state-wp-0001",
"title": "Demo",
"status": "finished",
"planning_priority": None,
"planning_order": None,
}
linked = {
"id": "task-linked",
"title": "Keep Task",
"status": "done",
"description": None,
}
orphan = {
"id": "task-orphan",
"title": "Legacy Duplicate",
"status": orphan_status,
"description": None,
}
def fake_get(_api_base, path, params=None, **_kwargs):
if path == "/repos/state-hub":
import socket
return {
"id": "repo-1",
"slug": "state-hub",
"domain_slug": "infotech",
"local_path": str(repo),
"host_paths": {socket.gethostname(): str(repo)},
}
if path == "/workstreams/ws-1":
return ws
if path == "/tasks/task-linked":
return linked
if path == "/tasks" and params == {"workstream_id": "ws-1"}:
return [linked, orphan]
if path == "/workstreams/ws-1/dependencies":
return []
if path == "/workstreams" and params == {"repo_id": "repo-1"}:
return [ws]
if path == "/workstreams" and params and params.get("topic_id") == "topic-1":
return []
return []
return fake_get
def _quiet_classification(self, monkeypatch):
monkeypatch.setattr("consistency_check.load_classification_file", lambda _repo_dir: ({}, [], []))
def test_closed_workstream_suppresses_terminal_orphan_task(self, tmp_path, monkeypatch):
repo = self._make_repo(tmp_path)
self._quiet_classification(monkeypatch)
monkeypatch.setattr("consistency_check._api_get", self._api_get_for_repo(repo, "cancel"))
report = check_repo("http://unused", "state-hub")
assert [issue for issue in report.issues if issue.check_id == "C-12"] == []
def test_closed_workstream_reports_open_orphan_task_as_fixable(self, tmp_path, monkeypatch):
repo = self._make_repo(tmp_path)
self._quiet_classification(monkeypatch)
monkeypatch.setattr("consistency_check._api_get", self._api_get_for_repo(repo, "todo"))
report = check_repo("http://unused", "state-hub")
issue = next(issue for issue in report.issues if issue.check_id == "C-12")
assert issue.db_id == "task-orphan"
assert issue.fixable is True
def test_fix_repo_cancels_open_orphan_task_in_closed_workstream(self, tmp_path, monkeypatch):
repo = self._make_repo(tmp_path)
patches = []
self._quiet_classification(monkeypatch)
monkeypatch.setattr("consistency_check._api_get", self._api_get_for_repo(repo, "todo"))
monkeypatch.setattr("consistency_check._api_patch", lambda _api_base, path, body: patches.append((path, body)) or {"ok": True})
monkeypatch.setattr("consistency_check._detect_behind_remote", lambda _repo_path: False)
monkeypatch.setattr("consistency_check._detect_ahead_of_remote", lambda _repo_path: 0)
monkeypatch.setattr("consistency_check._write_custodian_brief", lambda *args, **kwargs: False)
monkeypatch.setattr("consistency_check._git_push", lambda _repo_path: (True, "pushed"))
report = fix_repo("http://unused", "state-hub")
assert ("/tasks/task-orphan", {"status": "cancel"}) in patches
assert any("C-12 fixed: orphan task task-or" in fix for fix in report.fixes_applied)
class TestC20DependencyDetection:
def test_canonical_dependency_fields_satisfy_workplan_dependency(self, tmp_path, monkeypatch):
repo = tmp_path / "repo"

View File

@@ -2,8 +2,13 @@ from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
import pytest
import custodian_cli
from custodian_cli import cmd_fix_consistency
from statehub_register import (
RegisterInference,
_invoke_llm,
@@ -91,7 +96,7 @@ def test_write_registration_files_primes_codex_repo(tmp_path: Path):
workplan = (tmp_path / "workplans" / "DEMO-WP-0001-statehub-bootstrap.md").read_text()
assert "id: DEMO-WP-0001" in workplan
assert "id: DEMO-WP-0001-T01" in workplan
assert "make fix-consistency REPO=demo-service" in workplan
assert "statehub fix-consistency" in workplan
def test_write_registration_files_is_idempotent_without_force(tmp_path: Path):
@@ -111,3 +116,129 @@ def test_write_registration_files_is_idempotent_without_force(tmp_path: Path):
assert write_registration_files(**kwargs)
assert write_registration_files(**kwargs) == []
def _fix_args(**overrides):
values = {
"repo": None,
"all": False,
"path": None,
"repo_path": None,
"remote": False,
"max_seconds": None,
"no_writeback": False,
"archive_closed": False,
"archive_workplan": None,
"archive_date": None,
"api_base": "http://statehub.test",
"as_json": False,
"strict_warnings": False,
}
values.update(overrides)
return argparse.Namespace(**values)
def _install_fake_checker(monkeypatch, tmp_path: Path) -> Path:
checker = tmp_path / "scripts" / "consistency_check.py"
checker.parent.mkdir()
checker.write_text("#!/usr/bin/env python3\n", encoding="utf-8")
monkeypatch.setattr(custodian_cli, "STATE_HUB_DIR", tmp_path)
return checker
def test_fix_consistency_defaults_to_here_and_normalises_warning_exit(monkeypatch, tmp_path: Path):
checker = _install_fake_checker(monkeypatch, tmp_path)
repo = tmp_path / "repo"
repo.mkdir()
calls = []
def fake_run(cmd):
calls.append(cmd)
return argparse.Namespace(returncode=2)
monkeypatch.setattr(custodian_cli.subprocess, "run", fake_run)
with pytest.raises(SystemExit) as exc:
cmd_fix_consistency(_fix_args(path=str(repo)))
assert exc.value.code == 0
assert calls == [[
sys.executable,
str(checker),
"--here",
str(repo.resolve()),
"--fix",
"--api-base",
"http://statehub.test",
]]
def test_fix_consistency_strict_warnings_preserves_exit_two(monkeypatch, tmp_path: Path):
_install_fake_checker(monkeypatch, tmp_path)
repo = tmp_path / "repo"
repo.mkdir()
monkeypatch.setattr(
custodian_cli.subprocess,
"run",
lambda _cmd: argparse.Namespace(returncode=2),
)
with pytest.raises(SystemExit) as exc:
cmd_fix_consistency(_fix_args(path=str(repo), strict_warnings=True))
assert exc.value.code == 2
def test_fix_consistency_repo_remote_passes_pull_before_fix_options(monkeypatch, tmp_path: Path):
checker = _install_fake_checker(monkeypatch, tmp_path)
repo = tmp_path / "repo"
repo.mkdir()
calls = []
def fake_run(cmd):
calls.append(cmd)
return argparse.Namespace(returncode=0)
monkeypatch.setattr(custodian_cli.subprocess, "run", fake_run)
with pytest.raises(SystemExit) as exc:
cmd_fix_consistency(
_fix_args(
repo="demo-service",
repo_path=str(repo),
remote=True,
no_writeback=True,
as_json=True,
max_seconds=12,
)
)
assert exc.value.code == 0
assert calls == [[
sys.executable,
str(checker),
"--repo",
"demo-service",
"--repo-path",
str(repo.resolve()),
"--fix",
"--remote",
"--no-writeback",
"--api-base",
"http://statehub.test",
"--json",
"--max-seconds",
"12",
]]
def test_fix_consistency_remote_requires_explicit_repo_or_all(monkeypatch, tmp_path: Path):
_install_fake_checker(monkeypatch, tmp_path)
calls = []
monkeypatch.setattr(custodian_cli.subprocess, "run", lambda cmd: calls.append(cmd))
with pytest.raises(SystemExit) as exc:
cmd_fix_consistency(_fix_args(remote=True, path=str(tmp_path)))
assert exc.value.code == 1
assert calls == []

View File

@@ -0,0 +1,43 @@
---
id: ADHOC-2026-07-01
type: workplan
title: "Ad hoc fixes - 2026-07-01"
domain: infotech
repo: state-hub
status: finished
owner: codex
topic_slug: custodian
created: "2026-07-01"
updated: "2026-07-01"
state_hub_workstream_id: "1cdb288a-9bcb-4e53-8c4f-cb43e3abe9c5"
---
# Ad hoc fixes - 2026-07-01
## Add Local State Hub Consistency CLI
```task
id: ADHOC-2026-07-01-T01
status: done
priority: medium
state_hub_task_id: "060e6e63-b456-418e-83c5-7660fa206800"
```
Add `statehub fix-consistency` so agents can reconcile ADR-001 workplan files
from the attached repo checkout instead of asking the operator to run
`make fix-consistency REPO=<slug>` inside the State Hub repo. Update generated
agent instructions to make the CLI sync a normal agent close-out step.
## Clear Closed-Workstream C-12 Orphan Warnings
```task
id: ADHOC-2026-07-01-T02
status: done
priority: medium
state_hub_task_id: "076b97bd-fb97-47f9-9733-c8eac6cd6355"
```
Treat terminal DB-only tasks in closed workstreams as non-actionable historical
cache rows in the consistency checker. Open orphan tasks in closed workstreams
remain fixable and are still auto-canceled, but already-terminal duplicates no
longer leave permanent C-12 warnings.