Files
state-hub/api/doi_engine.py
tegwick 5eeeeeb6c4 feat(doi): Repository DoI automated gate and dashboard integration (CUST-WP-0024)
Implements the 14-criterion DoI checklist as a runnable gate with API,
MCP tools, CLI script, and dashboard integration.

Core components:
- api/doi_engine.py — async engine evaluating all 14 criteria (asyncio.to_thread
  for non-blocking HTTP self-calls), shared by API and CLI
- api/schemas/doi.py — DoICriterion, DoIReport, DoISummaryEntry schemas
- api/routers/repos.py — GET /repos/{slug}/doi + GET /repos/doi/summary
- scripts/check_doi.py — CLI: make check-doi REPO=<slug> / check-doi-all
- mcp_server/server.py — check_repo_doi(), get_doi_summary() tools

Dashboard (repos.md):
- DoI tier badge per repo (None/Core/Standard/Full) colour-coded red→green
- Domain block shows lowest DoI tier across its repos
- DoI KPI card in summary row
- DoI filter in All Repos Table
- Link to Repository DoI policy page

Also fixes: TPSC snapshots 500 error (missing nested selectinload for
catalog_entry relationship in list_snapshots endpoint).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 01:08:18 +01:00

306 lines
13 KiB
Python

"""DoI engine — evaluates all 14 Repository Definition of Integrated criteria.
Shared by the API endpoint (async) and the CLI check script (asyncio.run).
All checks use only the repo dict from /repos/{slug} + HTTP calls to the API
+ local filesystem reads. No direct DB access.
"""
from __future__ import annotations
import asyncio
import json
import socket
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
CriterionStatus = Literal["pass", "fail", "warn", "skip"]
Tier = Literal["none", "core", "standard", "full"]
# Criteria that belong to each tier (in check order)
CORE_IDS = {"C1", "C2", "C3", "C4"}
STANDARD_IDS = {"C5", "C6", "C7", "C8", "C9"}
FULL_IDS = {"C10", "C11", "C12", "C13", "C14"}
@dataclass
class CriterionResult:
id: str
label: str
tier: str
status: CriterionStatus
detail: str = ""
@dataclass
class DoIReport:
repo_slug: str
tier: Tier
core_pass: bool
standard_pass: bool
full_pass: bool
criteria: list[CriterionResult] = field(default_factory=list)
checked_at: str = field(default_factory=lambda: datetime.now(tz=timezone.utc).isoformat())
def _resolve_path(repo: dict) -> str:
hostname = socket.gethostname()
host_paths = repo.get("host_paths") or {}
candidates = []
if host_paths.get(hostname):
candidates.append(host_paths[hostname])
if repo.get("local_path"):
candidates.append(repo["local_path"])
for raw in candidates:
p = Path(raw).expanduser()
if p.is_dir():
return str(p)
return ""
def _get_sync(api_base: str, path: str, params: dict | None = None) -> object:
url = f"{api_base}{path}"
if params:
q = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
if q:
url = f"{url}?{q}"
req = urllib.request.Request(url, headers={"Accept": "application/json"})
try:
with urllib.request.urlopen(req, timeout=5) as r:
return json.loads(r.read())
except Exception:
return None
async def _get(api_base: str, path: str, params: dict | None = None) -> object:
"""Async wrapper — runs blocking urllib in a thread so the event loop stays free."""
return await asyncio.to_thread(_get_sync, api_base, path, params)
async def _run_consistency(repo_slug: str, api_base: str) -> tuple[int, int, int]:
"""Run consistency_check.py and return (fail, warn, info) counts."""
script = Path(__file__).parent.parent / "scripts" / "consistency_check.py"
proc = await asyncio.create_subprocess_exec(
"uv", "run", "python", str(script),
"--repo", repo_slug,
"--api-base", api_base,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(Path(__file__).parent.parent),
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30)
text = stdout.decode()
fail = warn = info = 0
for line in text.splitlines():
if "Summary:" in line:
parts = line.split("|")
for p in parts:
p = p.strip()
if "fail" in p:
try: fail = int(p.split()[0])
except ValueError: pass
elif "warn" in p:
try: warn = int(p.split()[0])
except ValueError: pass
elif "info" in p:
try: info = int(p.split()[0])
except ValueError: pass
return fail, warn, info
async def evaluate(repo: dict, api_base: str = "http://127.0.0.1:8000") -> DoIReport:
slug = repo.get("slug", "unknown")
results: list[CriterionResult] = []
def _r(id: str, label: str, tier: str, status: CriterionStatus, detail: str = "") -> CriterionResult:
r = CriterionResult(id=id, label=label, tier=tier, status=status, detail=detail)
results.append(r)
return r
# ── Tier 1: Core ─────────────────────────────────────────────────────────
# C1: registered
_r("C1", "Registered in state-hub", "core", "pass", "Repo record exists")
# C2: domain assigned and active
domain_slug = repo.get("domain_slug") or ""
if not domain_slug:
_r("C2", "Domain assigned", "core", "fail", "No domain_slug on repo record")
else:
domain = await _get(api_base, f"/domains/{domain_slug}/")
if domain and domain.get("status") == "active":
_r("C2", "Domain assigned", "core", "pass", f"domain: {domain_slug}")
elif domain:
_r("C2", "Domain assigned", "core", "warn", f"Domain '{domain_slug}' status: {domain.get('status')}")
else:
_r("C2", "Domain assigned", "core", "fail", f"Domain '{domain_slug}' not found")
# C3: local path resolves
repo_path = _resolve_path(repo)
if repo_path:
_r("C3", "Local path resolves", "core", "pass", repo_path)
else:
raw = repo.get("local_path") or "(none)"
_r("C3", "Local path resolves", "core", "fail", f"Path not accessible: {raw}")
# C4: remote URL set
remote = repo.get("remote_url") or ""
if remote.strip():
_r("C4", "Remote URL set", "core", "pass", remote)
else:
_r("C4", "Remote URL set", "core", "fail", "remote_url is empty")
# ── Tier 2: Standard ─────────────────────────────────────────────────────
# C5: SCOPE.md
if not repo_path:
_r("C5", "SCOPE.md present", "standard", "skip", "Local path unavailable")
elif (Path(repo_path) / "SCOPE.md").exists():
_r("C5", "SCOPE.md present", "standard", "pass")
else:
_r("C5", "SCOPE.md present", "standard", "fail", "SCOPE.md not found at repo root")
# C6: CLAUDE.md
if not repo_path:
_r("C6", "CLAUDE.md present", "standard", "skip", "Local path unavailable")
elif (Path(repo_path) / "CLAUDE.md").exists():
_r("C6", "CLAUDE.md present", "standard", "pass")
else:
_r("C6", "CLAUDE.md present", "standard", "fail", "CLAUDE.md not found at repo root")
# C7: workplan convention — consistency check 0 FAIL
try:
fail, warn, _ = await _run_consistency(slug, api_base)
if fail == 0:
_r("C7", "Workplan convention (0 FAIL)", "standard", "pass", f"consistency: {fail} fail / {warn} warn")
else:
_r("C7", "Workplan convention (0 FAIL)", "standard", "fail", f"consistency: {fail} fail / {warn} warn")
except Exception as e:
_r("C7", "Workplan convention (0 FAIL)", "standard", "skip", f"Could not run consistency check: {e}")
# C8: SBOM ingested
last_sbom = repo.get("last_sbom_at")
if last_sbom:
_r("C8", "SBOM ingested", "standard", "pass", f"last ingested: {last_sbom[:10]}")
else:
_r("C8", "SBOM ingested", "standard", "fail", "last_sbom_at not set — run make ingest-sbom")
# C9: TPSC declared (tpsc.yaml present + snapshot exists)
tpsc_file_ok = repo_path and (Path(repo_path) / "tpsc.yaml").exists()
tpsc_snaps = await _get(api_base, "/tpsc/snapshots/", {"repo_slug": slug}) or []
has_snap = len(tpsc_snaps) > 0
if not repo_path:
_r("C9", "TPSC declared", "standard", "skip", "Local path unavailable")
elif tpsc_file_ok and has_snap:
_r("C9", "TPSC declared", "standard", "pass", f"{len(tpsc_snaps)} snapshot(s)")
elif tpsc_file_ok and not has_snap:
_r("C9", "TPSC declared", "standard", "warn", "tpsc.yaml exists but not yet ingested — run make ingest-tpsc")
elif not tpsc_file_ok:
_r("C9", "TPSC declared", "standard", "fail", "tpsc.yaml missing at repo root")
# ── Tier 3: Full ─────────────────────────────────────────────────────────
# C10: active repo goal
goals = await _get(api_base, "/repo-goals/", {"repo_slug": slug}) or []
active_goals = [g for g in goals if g.get("status") == "active"]
if active_goals:
_r("C10", "Active repo goal", "full", "pass", f"{len(active_goals)} active goal(s)")
else:
_r("C10", "Active repo goal", "full", "fail", "No active repo goal — create one with create_repo_goal()")
# C11: Provided Capabilities declared in SCOPE.md
if not repo_path:
_r("C11", "Provided Capabilities declared", "full", "skip", "Local path unavailable")
else:
scope = Path(repo_path) / "SCOPE.md"
if not scope.exists():
_r("C11", "Provided Capabilities declared", "full", "skip", "SCOPE.md absent")
else:
text = scope.read_text()
has_cap_block = "```capability" in text
has_none_explicit = "## Provided Capabilities" in text and (
"none" in text.lower().split("## provided capabilities")[-1][:200]
or "no capabilities" in text.lower().split("## provided capabilities")[-1][:200]
)
if has_cap_block:
_r("C11", "Provided Capabilities declared", "full", "pass", "capability block(s) found in SCOPE.md")
elif has_none_explicit:
_r("C11", "Provided Capabilities declared", "full", "pass", "Explicitly declared none in SCOPE.md")
elif "## Provided Capabilities" in text:
_r("C11", "Provided Capabilities declared", "full", "warn",
"Section present but no capability block or explicit none — add blocks or state 'none'")
else:
_r("C11", "Provided Capabilities declared", "full", "fail",
"No '## Provided Capabilities' section in SCOPE.md")
# C12: agents template applied (CLAUDE.md mentions kaizen)
if not repo_path:
_r("C12", "Agents template applied", "full", "skip", "Local path unavailable")
else:
claude_md = Path(repo_path) / "CLAUDE.md"
if not claude_md.exists():
_r("C12", "Agents template applied", "full", "skip", "CLAUDE.md absent")
else:
text = claude_md.read_text()
if "get_kaizen_agent" in text or "kaizen" in text.lower():
_r("C12", "Agents template applied", "full", "pass")
else:
_r("C12", "Agents template applied", "full", "fail",
"CLAUDE.md has no kaizen agent reference")
# C13: consistency check clean (0 FAIL, 0 WARN — C-12 exempt)
try:
fail, warn, _ = await _run_consistency(slug, api_base)
# C-12 warns are legacy DB-only tasks — deduct them from warn count
c12_count = await _get(api_base, "/tasks/", {"workstream_id": None}) or []
# Use raw counts from the script output
if fail == 0 and warn == 0:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "pass")
elif fail == 0 and warn > 0:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "warn",
f"{warn} warn(s) — C-12 legacy tasks may be exempt")
else:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "fail",
f"{fail} fail(s), {warn} warn(s)")
except Exception as e:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "skip", f"Could not run: {e}")
# C14: host paths registered
host_paths = repo.get("host_paths") or {}
if host_paths:
_r("C14", "Host paths registered", "full", "pass",
f"{len(host_paths)} host(s): {', '.join(host_paths.keys())}")
else:
_r("C14", "Host paths registered", "full", "fail",
"host_paths empty — run update_repo_path() for each active machine")
# ── Compute tier ─────────────────────────────────────────────────────────
by_id = {r.id: r for r in results}
def _tier_pass(ids: set[str]) -> bool:
return all(by_id[i].status in ("pass", "warn") for i in ids if i in by_id)
core_pass = _tier_pass(CORE_IDS)
standard_pass = core_pass and _tier_pass(STANDARD_IDS)
full_pass = standard_pass and _tier_pass(FULL_IDS)
if full_pass:
tier: Tier = "full"
elif standard_pass:
tier = "standard"
elif core_pass:
tier = "core"
else:
tier = "none"
return DoIReport(
repo_slug=slug,
tier=tier,
core_pass=core_pass,
standard_pass=standard_pass,
full_pass=full_pass,
criteria=results,
)