Files
state-hub/api/doi_engine.py
tegwick 27e755815f perf(doi): 13x speedup for /repos/doi/summary (108s → ~6s)
Two fixes:
1. skip_consistency=True in summary mode — omits C7/C13 subprocess calls
   (consistency_check.py) which were the main bottleneck (32 spawns for 16 repos).
   Full check still available per-repo via GET /repos/{slug}/doi.
2. asyncio.gather — all repos evaluated in parallel instead of sequentially.

Also: rename Repositories page title from "Repos" to "Repositories".

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 01:29:27 +01:00

313 lines
13 KiB
Python

"""DoI engine — evaluates all 14 Repository Definition of Integrated criteria.
Shared by the API endpoint (async) and the CLI check script (asyncio.run).
All checks use only the repo dict from /repos/{slug} + HTTP calls to the API
+ local filesystem reads. No direct DB access.
"""
from __future__ import annotations
import asyncio
import json
import socket
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
CriterionStatus = Literal["pass", "fail", "warn", "skip"]
Tier = Literal["none", "core", "standard", "full"]
# Criteria that belong to each tier (in check order)
CORE_IDS = {"C1", "C2", "C3", "C4"}
STANDARD_IDS = {"C5", "C6", "C7", "C8", "C9"}
FULL_IDS = {"C10", "C11", "C12", "C13", "C14"}
@dataclass
class CriterionResult:
id: str
label: str
tier: str
status: CriterionStatus
detail: str = ""
@dataclass
class DoIReport:
repo_slug: str
tier: Tier
core_pass: bool
standard_pass: bool
full_pass: bool
criteria: list[CriterionResult] = field(default_factory=list)
checked_at: str = field(default_factory=lambda: datetime.now(tz=timezone.utc).isoformat())
def _resolve_path(repo: dict) -> str:
hostname = socket.gethostname()
host_paths = repo.get("host_paths") or {}
candidates = []
if host_paths.get(hostname):
candidates.append(host_paths[hostname])
if repo.get("local_path"):
candidates.append(repo["local_path"])
for raw in candidates:
p = Path(raw).expanduser()
if p.is_dir():
return str(p)
return ""
def _get_sync(api_base: str, path: str, params: dict | None = None) -> object:
url = f"{api_base}{path}"
if params:
q = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
if q:
url = f"{url}?{q}"
req = urllib.request.Request(url, headers={"Accept": "application/json"})
try:
with urllib.request.urlopen(req, timeout=5) as r:
return json.loads(r.read())
except Exception:
return None
async def _get(api_base: str, path: str, params: dict | None = None) -> object:
"""Async wrapper — runs blocking urllib in a thread so the event loop stays free."""
return await asyncio.to_thread(_get_sync, api_base, path, params)
async def _run_consistency(repo_slug: str, api_base: str) -> tuple[int, int, int]:
"""Run consistency_check.py and return (fail, warn, info) counts."""
script = Path(__file__).parent.parent / "scripts" / "consistency_check.py"
proc = await asyncio.create_subprocess_exec(
"uv", "run", "python", str(script),
"--repo", repo_slug,
"--api-base", api_base,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(Path(__file__).parent.parent),
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30)
text = stdout.decode()
fail = warn = info = 0
for line in text.splitlines():
if "Summary:" in line:
parts = line.split("|")
for p in parts:
p = p.strip()
if "fail" in p:
try: fail = int(p.split()[0])
except ValueError: pass
elif "warn" in p:
try: warn = int(p.split()[0])
except ValueError: pass
elif "info" in p:
try: info = int(p.split()[0])
except ValueError: pass
return fail, warn, info
async def evaluate(
repo: dict,
api_base: str = "http://127.0.0.1:8000",
skip_consistency: bool = False,
) -> DoIReport:
slug = repo.get("slug", "unknown")
results: list[CriterionResult] = []
def _r(id: str, label: str, tier: str, status: CriterionStatus, detail: str = "") -> CriterionResult:
r = CriterionResult(id=id, label=label, tier=tier, status=status, detail=detail)
results.append(r)
return r
# ── Tier 1: Core ─────────────────────────────────────────────────────────
# C1: registered
_r("C1", "Registered in state-hub", "core", "pass", "Repo record exists")
# C2: domain assigned and active
domain_slug = repo.get("domain_slug") or ""
if not domain_slug:
_r("C2", "Domain assigned", "core", "fail", "No domain_slug on repo record")
else:
domain = await _get(api_base, f"/domains/{domain_slug}/")
if domain and domain.get("status") == "active":
_r("C2", "Domain assigned", "core", "pass", f"domain: {domain_slug}")
elif domain:
_r("C2", "Domain assigned", "core", "warn", f"Domain '{domain_slug}' status: {domain.get('status')}")
else:
_r("C2", "Domain assigned", "core", "fail", f"Domain '{domain_slug}' not found")
# C3: local path resolves
repo_path = _resolve_path(repo)
if repo_path:
_r("C3", "Local path resolves", "core", "pass", repo_path)
else:
raw = repo.get("local_path") or "(none)"
_r("C3", "Local path resolves", "core", "fail", f"Path not accessible: {raw}")
# C4: remote URL set
remote = repo.get("remote_url") or ""
if remote.strip():
_r("C4", "Remote URL set", "core", "pass", remote)
else:
_r("C4", "Remote URL set", "core", "fail", "remote_url is empty")
# ── Tier 2: Standard ─────────────────────────────────────────────────────
# C5: SCOPE.md
if not repo_path:
_r("C5", "SCOPE.md present", "standard", "skip", "Local path unavailable")
elif (Path(repo_path) / "SCOPE.md").exists():
_r("C5", "SCOPE.md present", "standard", "pass")
else:
_r("C5", "SCOPE.md present", "standard", "fail", "SCOPE.md not found at repo root")
# C6: CLAUDE.md
if not repo_path:
_r("C6", "CLAUDE.md present", "standard", "skip", "Local path unavailable")
elif (Path(repo_path) / "CLAUDE.md").exists():
_r("C6", "CLAUDE.md present", "standard", "pass")
else:
_r("C6", "CLAUDE.md present", "standard", "fail", "CLAUDE.md not found at repo root")
# C7: workplan convention — consistency check 0 FAIL
if skip_consistency:
_r("C7", "Workplan convention (0 FAIL)", "standard", "skip", "Not checked in summary mode — use /repos/{slug}/doi for full check")
else:
try:
fail, warn, _ = await _run_consistency(slug, api_base)
if fail == 0:
_r("C7", "Workplan convention (0 FAIL)", "standard", "pass", f"consistency: {fail} fail / {warn} warn")
else:
_r("C7", "Workplan convention (0 FAIL)", "standard", "fail", f"consistency: {fail} fail / {warn} warn")
except Exception as e:
_r("C7", "Workplan convention (0 FAIL)", "standard", "skip", f"Could not run consistency check: {e}")
# C8: SBOM ingested
last_sbom = repo.get("last_sbom_at")
if last_sbom:
_r("C8", "SBOM ingested", "standard", "pass", f"last ingested: {last_sbom[:10]}")
else:
_r("C8", "SBOM ingested", "standard", "fail", "last_sbom_at not set — run make ingest-sbom")
# C9: TPSC declared (tpsc.yaml present + snapshot exists)
tpsc_file_ok = repo_path and (Path(repo_path) / "tpsc.yaml").exists()
tpsc_snaps = await _get(api_base, "/tpsc/snapshots/", {"repo_slug": slug}) or []
has_snap = len(tpsc_snaps) > 0
if not repo_path:
_r("C9", "TPSC declared", "standard", "skip", "Local path unavailable")
elif tpsc_file_ok and has_snap:
_r("C9", "TPSC declared", "standard", "pass", f"{len(tpsc_snaps)} snapshot(s)")
elif tpsc_file_ok and not has_snap:
_r("C9", "TPSC declared", "standard", "warn", "tpsc.yaml exists but not yet ingested — run make ingest-tpsc")
elif not tpsc_file_ok:
_r("C9", "TPSC declared", "standard", "fail", "tpsc.yaml missing at repo root")
# ── Tier 3: Full ─────────────────────────────────────────────────────────
# C10: active repo goal
goals = await _get(api_base, "/repo-goals/", {"repo_slug": slug}) or []
active_goals = [g for g in goals if g.get("status") == "active"]
if active_goals:
_r("C10", "Active repo goal", "full", "pass", f"{len(active_goals)} active goal(s)")
else:
_r("C10", "Active repo goal", "full", "fail", "No active repo goal — create one with create_repo_goal()")
# C11: Provided Capabilities declared in SCOPE.md
if not repo_path:
_r("C11", "Provided Capabilities declared", "full", "skip", "Local path unavailable")
else:
scope = Path(repo_path) / "SCOPE.md"
if not scope.exists():
_r("C11", "Provided Capabilities declared", "full", "skip", "SCOPE.md absent")
else:
text = scope.read_text()
has_cap_block = "```capability" in text
has_none_explicit = "## Provided Capabilities" in text and (
"none" in text.lower().split("## provided capabilities")[-1][:200]
or "no capabilities" in text.lower().split("## provided capabilities")[-1][:200]
)
if has_cap_block:
_r("C11", "Provided Capabilities declared", "full", "pass", "capability block(s) found in SCOPE.md")
elif has_none_explicit:
_r("C11", "Provided Capabilities declared", "full", "pass", "Explicitly declared none in SCOPE.md")
elif "## Provided Capabilities" in text:
_r("C11", "Provided Capabilities declared", "full", "warn",
"Section present but no capability block or explicit none — add blocks or state 'none'")
else:
_r("C11", "Provided Capabilities declared", "full", "fail",
"No '## Provided Capabilities' section in SCOPE.md")
# C12: agents template applied (CLAUDE.md mentions kaizen)
if not repo_path:
_r("C12", "Agents template applied", "full", "skip", "Local path unavailable")
else:
claude_md = Path(repo_path) / "CLAUDE.md"
if not claude_md.exists():
_r("C12", "Agents template applied", "full", "skip", "CLAUDE.md absent")
else:
text = claude_md.read_text()
if "get_kaizen_agent" in text or "kaizen" in text.lower():
_r("C12", "Agents template applied", "full", "pass")
else:
_r("C12", "Agents template applied", "full", "fail",
"CLAUDE.md has no kaizen agent reference")
# C13: consistency check clean (0 FAIL, 0 WARN — C-12 exempt)
if skip_consistency:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "skip", "Not checked in summary mode — use /repos/{slug}/doi for full check")
else:
try:
fail, warn, _ = await _run_consistency(slug, api_base)
if fail == 0 and warn == 0:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "pass")
elif fail == 0 and warn > 0:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "warn",
f"{warn} warn(s) — C-12 legacy tasks may be exempt")
else:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "fail",
f"{fail} fail(s), {warn} warn(s)")
except Exception as e:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "skip", f"Could not run: {e}")
# C14: host paths registered
host_paths = repo.get("host_paths") or {}
if host_paths:
_r("C14", "Host paths registered", "full", "pass",
f"{len(host_paths)} host(s): {', '.join(host_paths.keys())}")
else:
_r("C14", "Host paths registered", "full", "fail",
"host_paths empty — run update_repo_path() for each active machine")
# ── Compute tier ─────────────────────────────────────────────────────────
by_id = {r.id: r for r in results}
def _tier_pass(ids: set[str]) -> bool:
return all(by_id[i].status in ("pass", "warn") for i in ids if i in by_id)
core_pass = _tier_pass(CORE_IDS)
standard_pass = core_pass and _tier_pass(STANDARD_IDS)
full_pass = standard_pass and _tier_pass(FULL_IDS)
if full_pass:
tier: Tier = "full"
elif standard_pass:
tier = "standard"
elif core_pass:
tier = "core"
else:
tier = "none"
return DoIReport(
repo_slug=slug,
tier=tier,
core_pass=core_pass,
standard_pass=standard_pass,
full_pass=full_pass,
criteria=results,
)