Files
state-hub/api/doi_engine.py
2026-05-01 01:47:14 +02:00

537 lines
21 KiB
Python

"""DoI engine — evaluates all 14 Repository Definition of Integrated criteria.
Shared by the API endpoint (async) and the CLI check script (asyncio.run).
All checks use only the repo dict from /repos/{slug} + HTTP calls to the API
+ local filesystem reads. No direct DB access.
"""
from __future__ import annotations
import asyncio
import json
import re
import socket
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Literal
import yaml
CriterionStatus = Literal["pass", "fail", "warn", "skip"]
Tier = Literal["none", "core", "standard", "full"]
# Criteria that belong to each tier (in check order)
CORE_IDS = {"C1", "C2", "C3", "C4"}
STANDARD_IDS = {"C5a", "C5b", "C5c", "C6", "C7", "C8", "C9"}
FULL_IDS = {"C10", "C11", "C12", "C13", "C14"}
STANDARD_SCOPE_SECTIONS = [
"One-liner",
"Core Idea",
"In Scope",
"Out of Scope",
"Relevant When",
"Not Relevant When",
"Current State",
"How It Fits",
"Terminology",
"Related / Overlapping",
"Provided Capabilities",
]
_CAPABILITY_BLOCK_RE = re.compile(r"```capability\s*\n(.*?)```", re.DOTALL | re.IGNORECASE)
_H2_RE = re.compile(r"^##\s+(.+?)\s*$", re.MULTILINE)
@dataclass
class CriterionResult:
id: str
label: str
tier: str
status: CriterionStatus
detail: str = ""
@dataclass
class DoIReport:
repo_slug: str
tier: Tier
core_pass: bool
standard_pass: bool
full_pass: bool
criteria: list[CriterionResult] = field(default_factory=list)
checked_at: str = field(default_factory=lambda: datetime.now(tz=timezone.utc).isoformat())
def evaluate_scope_health(repo: dict) -> list[dict[str, Any]]:
"""Return machine-readable SCOPE.md health issues for C5a/C5b/C5c.
The returned records intentionally mirror DoI criterion IDs while carrying
section-level hints that downstream repo-scoping can use to refresh only
the affected parts of SCOPE.md.
"""
repo_path = _resolve_path(repo)
if not repo_path:
return [
{
"id": "C5a",
"label": "SCOPE.md present",
"status": "skip",
"detail": "Local path unavailable",
"missing_sections": [],
"invalid_capability_blocks": [],
"needs_refresh_sections": [],
},
{
"id": "C5b",
"label": "SCOPE.md standard sections",
"status": "skip",
"detail": "Local path unavailable",
"missing_sections": [],
"invalid_capability_blocks": [],
"needs_refresh_sections": [],
},
{
"id": "C5c",
"label": "SCOPE.md capability blocks",
"status": "skip",
"detail": "Local path unavailable",
"missing_sections": [],
"invalid_capability_blocks": [],
"needs_refresh_sections": [],
},
]
scope_path = Path(repo_path) / "SCOPE.md"
if not scope_path.exists():
return [
{
"id": "C5a",
"label": "SCOPE.md present",
"status": "fail",
"detail": "SCOPE.md not found at repo root",
"missing_sections": STANDARD_SCOPE_SECTIONS.copy(),
"invalid_capability_blocks": [],
"needs_refresh_sections": STANDARD_SCOPE_SECTIONS.copy(),
},
{
"id": "C5b",
"label": "SCOPE.md standard sections",
"status": "skip",
"detail": "SCOPE.md absent",
"missing_sections": STANDARD_SCOPE_SECTIONS.copy(),
"invalid_capability_blocks": [],
"needs_refresh_sections": STANDARD_SCOPE_SECTIONS.copy(),
},
{
"id": "C5c",
"label": "SCOPE.md capability blocks",
"status": "skip",
"detail": "SCOPE.md absent",
"missing_sections": [],
"invalid_capability_blocks": [],
"needs_refresh_sections": ["Provided Capabilities"],
},
]
text = scope_path.read_text()
issues: list[dict[str, Any]] = [{
"id": "C5a",
"label": "SCOPE.md present",
"status": "pass",
"detail": "",
"missing_sections": [],
"invalid_capability_blocks": [],
"needs_refresh_sections": [],
}]
headings = {h.strip() for h in _H2_RE.findall(text)}
missing_sections = [section for section in STANDARD_SCOPE_SECTIONS if section not in headings]
if missing_sections:
issues.append({
"id": "C5b",
"label": "SCOPE.md standard sections",
"status": "warn",
"detail": f"Missing H2 section(s): {', '.join(missing_sections)}",
"missing_sections": missing_sections,
"invalid_capability_blocks": [],
"needs_refresh_sections": missing_sections,
})
else:
issues.append({
"id": "C5b",
"label": "SCOPE.md standard sections",
"status": "pass",
"detail": f"All {len(STANDARD_SCOPE_SECTIONS)} standard sections present",
"missing_sections": [],
"invalid_capability_blocks": [],
"needs_refresh_sections": [],
})
capability_blocks = _CAPABILITY_BLOCK_RE.findall(text)
valid_blocks = 0
invalid_blocks: list[dict[str, Any]] = []
for index, block in enumerate(capability_blocks, start=1):
try:
parsed = yaml.safe_load(block) or {}
if isinstance(parsed, dict) and parsed.get("type") and parsed.get("title"):
valid_blocks += 1
else:
invalid_blocks.append({
"index": index,
"reason": "Capability block must be YAML with type and title",
})
except yaml.YAMLError as exc:
invalid_blocks.append({"index": index, "reason": str(exc)})
if valid_blocks > 0:
issues.append({
"id": "C5c",
"label": "SCOPE.md capability blocks",
"status": "pass",
"detail": f"{valid_blocks} valid capability block(s)",
"missing_sections": [],
"invalid_capability_blocks": invalid_blocks,
"needs_refresh_sections": [],
})
else:
detail = "No fenced capability block found"
if invalid_blocks:
detail = "No valid capability block found"
issues.append({
"id": "C5c",
"label": "SCOPE.md capability blocks",
"status": "warn",
"detail": detail,
"missing_sections": [],
"invalid_capability_blocks": invalid_blocks,
"needs_refresh_sections": ["Provided Capabilities"],
})
return issues
def compute_fingerprint(
repo: dict,
latest_tpsc_snap_at: str | None,
latest_goal_updated_at: str | None,
) -> str:
"""Compute a pipe-joined fingerprint of all inputs that affect DoI criteria.
If any component changes, the fingerprint changes and the cache is invalidated:
- repo.updated_at → covers last_sbom_at, remote_url, host_paths, domain changes
- latest_tpsc_snap_at → C9 (TPSC snapshot exists)
- latest_goal_updated_at → C10 (active repo goal)
- mtime of SCOPE.md, CLAUDE.md, tpsc.yaml → C5, C6, C9, C11, C12
"""
parts = [
str(repo.get("updated_at") or ""),
str(latest_tpsc_snap_at or ""),
str(latest_goal_updated_at or ""),
]
repo_path = _resolve_path(repo)
if repo_path:
for fname in ("SCOPE.md", "CLAUDE.md", "tpsc.yaml"):
f = Path(repo_path) / fname
try:
parts.append(f"{fname}:{f.stat().st_mtime:.3f}")
except FileNotFoundError:
parts.append(f"{fname}:absent")
return "|".join(parts)
def _resolve_path(repo: dict) -> str:
hostname = socket.gethostname()
host_paths = repo.get("host_paths") or {}
candidates = []
if host_paths.get(hostname):
candidates.append(host_paths[hostname])
if repo.get("local_path"):
candidates.append(repo["local_path"])
for raw in candidates:
p = Path(raw).expanduser()
if p.is_dir():
return str(p)
return ""
def resolve_repo_path(repo: dict) -> str:
"""Resolve the repo path using the same host-aware rules as DoI checks."""
return _resolve_path(repo)
def _get_sync(api_base: str, path: str, params: dict | None = None) -> object:
url = f"{api_base}{path}"
if params:
q = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
if q:
url = f"{url}?{q}"
req = urllib.request.Request(url, headers={"Accept": "application/json"})
try:
with urllib.request.urlopen(req, timeout=5) as r:
return json.loads(r.read())
except Exception:
return None
async def _get(api_base: str, path: str, params: dict | None = None) -> object:
"""Async wrapper — runs blocking urllib in a thread so the event loop stays free."""
return await asyncio.to_thread(_get_sync, api_base, path, params)
async def _run_consistency(repo_slug: str, api_base: str) -> tuple[int, int, int]:
"""Run consistency_check.py and return (fail, warn, info) counts."""
script = Path(__file__).parent.parent / "scripts" / "consistency_check.py"
proc = await asyncio.create_subprocess_exec(
"uv", "run", "python", str(script),
"--repo", repo_slug,
"--api-base", api_base,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(Path(__file__).parent.parent),
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30)
text = stdout.decode()
fail = warn = info = 0
for line in text.splitlines():
if "Summary:" in line:
parts = line.split("|")
for p in parts:
p = p.strip()
if "fail" in p:
try: fail = int(p.split()[0])
except ValueError: pass
elif "warn" in p:
try: warn = int(p.split()[0])
except ValueError: pass
elif "info" in p:
try: info = int(p.split()[0])
except ValueError: pass
return fail, warn, info
async def evaluate(
repo: dict,
api_base: str = "http://127.0.0.1:8000",
skip_consistency: bool = False,
prefetch: dict | None = None,
) -> DoIReport:
"""Evaluate all 14 DoI criteria for a repo.
Args:
repo: Repo dict (slug, domain_slug, local_path, remote_url, host_paths, last_sbom_at).
api_base: API base URL — only used when prefetch is absent.
skip_consistency: Skip C7/C13 subprocess calls (used in summary mode).
prefetch: Optional pre-fetched bulk data to avoid HTTP self-calls:
{
"domain_status": {"custodian": "active", ...}, # slug → status
"tpsc_snap_counts": {"llm-connect": 1, ...}, # repo_slug → count
"active_goal_counts": {"llm-connect": 0, ...}, # repo_slug → count
}
"""
slug = repo.get("slug", "unknown")
results: list[CriterionResult] = []
def _r(id: str, label: str, tier: str, status: CriterionStatus, detail: str = "") -> CriterionResult:
r = CriterionResult(id=id, label=label, tier=tier, status=status, detail=detail)
results.append(r)
return r
# ── Tier 1: Core ─────────────────────────────────────────────────────────
# C1: registered
_r("C1", "Registered in state-hub", "core", "pass", "Repo record exists")
# C2: domain assigned and active
domain_slug = repo.get("domain_slug") or ""
if not domain_slug:
_r("C2", "Domain assigned", "core", "fail", "No domain_slug on repo record")
else:
if prefetch and "domain_status" in prefetch:
dom_status = prefetch["domain_status"].get(domain_slug)
else:
d = await _get(api_base, f"/domains/{domain_slug}/")
dom_status = d.get("status") if d else None
if dom_status == "active":
_r("C2", "Domain assigned", "core", "pass", f"domain: {domain_slug}")
elif dom_status:
_r("C2", "Domain assigned", "core", "warn", f"Domain '{domain_slug}' status: {dom_status}")
else:
_r("C2", "Domain assigned", "core", "fail", f"Domain '{domain_slug}' not found")
# C3: local path resolves
repo_path = _resolve_path(repo)
if repo_path:
_r("C3", "Local path resolves", "core", "pass", repo_path)
else:
raw = repo.get("local_path") or "(none)"
_r("C3", "Local path resolves", "core", "fail", f"Path not accessible: {raw}")
# C4: remote URL set
remote = repo.get("remote_url") or ""
if remote.strip():
_r("C4", "Remote URL set", "core", "pass", remote)
else:
_r("C4", "Remote URL set", "core", "fail", "remote_url is empty")
# ── Tier 2: Standard ─────────────────────────────────────────────────────
# C5a/C5b/C5c: SCOPE.md structure and capability declarations
for issue in evaluate_scope_health(repo):
_r(issue["id"], issue["label"], "standard", issue["status"], issue["detail"])
# C6: CLAUDE.md
if not repo_path:
_r("C6", "CLAUDE.md present", "standard", "skip", "Local path unavailable")
elif (Path(repo_path) / "CLAUDE.md").exists():
_r("C6", "CLAUDE.md present", "standard", "pass")
else:
_r("C6", "CLAUDE.md present", "standard", "fail", "CLAUDE.md not found at repo root")
# C7: workplan convention — consistency check 0 FAIL
if skip_consistency:
_r("C7", "Workplan convention (0 FAIL)", "standard", "skip", "Not checked in summary mode — use /repos/{slug}/doi for full check")
else:
try:
fail, warn, _ = await _run_consistency(slug, api_base)
if fail == 0:
_r("C7", "Workplan convention (0 FAIL)", "standard", "pass", f"consistency: {fail} fail / {warn} warn")
else:
_r("C7", "Workplan convention (0 FAIL)", "standard", "fail", f"consistency: {fail} fail / {warn} warn")
except Exception as e:
_r("C7", "Workplan convention (0 FAIL)", "standard", "skip", f"Could not run consistency check: {e}")
# C8: SBOM ingested
last_sbom = repo.get("last_sbom_at")
if last_sbom:
_r("C8", "SBOM ingested", "standard", "pass", f"last ingested: {last_sbom[:10]}")
else:
_r("C8", "SBOM ingested", "standard", "fail", "last_sbom_at not set — run make ingest-sbom")
# C9: TPSC declared (tpsc.yaml present + snapshot exists)
tpsc_file_ok = repo_path and (Path(repo_path) / "tpsc.yaml").exists()
if prefetch and "tpsc_snap_counts" in prefetch:
has_snap = (prefetch["tpsc_snap_counts"].get(slug, 0) > 0)
snap_count = prefetch["tpsc_snap_counts"].get(slug, 0)
else:
tpsc_snaps = await _get(api_base, "/tpsc/snapshots/", {"repo_slug": slug}) or []
has_snap = len(tpsc_snaps) > 0
snap_count = len(tpsc_snaps)
if not repo_path:
_r("C9", "TPSC declared", "standard", "skip", "Local path unavailable")
elif tpsc_file_ok and has_snap:
_r("C9", "TPSC declared", "standard", "pass", f"{snap_count} snapshot(s)")
elif tpsc_file_ok and not has_snap:
_r("C9", "TPSC declared", "standard", "warn", "tpsc.yaml exists but not yet ingested — run make ingest-tpsc")
elif not tpsc_file_ok:
_r("C9", "TPSC declared", "standard", "fail", "tpsc.yaml missing at repo root")
# ── Tier 3: Full ─────────────────────────────────────────────────────────
# C10: active repo goal
if prefetch and "active_goal_counts" in prefetch:
active_goal_count = prefetch["active_goal_counts"].get(slug, 0)
else:
goals = await _get(api_base, "/repo-goals/", {"repo_slug": slug}) or []
active_goal_count = sum(1 for g in goals if g.get("status") == "active")
if active_goal_count > 0:
_r("C10", "Active repo goal", "full", "pass", f"{active_goal_count} active goal(s)")
else:
_r("C10", "Active repo goal", "full", "fail", "No active repo goal — create one with create_repo_goal()")
# C11: Provided Capabilities declared in SCOPE.md
if not repo_path:
_r("C11", "Provided Capabilities declared", "full", "skip", "Local path unavailable")
else:
scope = Path(repo_path) / "SCOPE.md"
if not scope.exists():
_r("C11", "Provided Capabilities declared", "full", "skip", "SCOPE.md absent")
else:
text = scope.read_text()
has_cap_block = "```capability" in text
has_none_explicit = "## Provided Capabilities" in text and (
"none" in text.lower().split("## provided capabilities")[-1][:200]
or "no capabilities" in text.lower().split("## provided capabilities")[-1][:200]
)
if has_cap_block:
_r("C11", "Provided Capabilities declared", "full", "pass", "capability block(s) found in SCOPE.md")
elif has_none_explicit:
_r("C11", "Provided Capabilities declared", "full", "pass", "Explicitly declared none in SCOPE.md")
elif "## Provided Capabilities" in text:
_r("C11", "Provided Capabilities declared", "full", "warn",
"Section present but no capability block or explicit none — add blocks or state 'none'")
else:
_r("C11", "Provided Capabilities declared", "full", "fail",
"No '## Provided Capabilities' section in SCOPE.md")
# C12: agents template applied (CLAUDE.md mentions kaizen)
if not repo_path:
_r("C12", "Agents template applied", "full", "skip", "Local path unavailable")
else:
claude_md = Path(repo_path) / "CLAUDE.md"
if not claude_md.exists():
_r("C12", "Agents template applied", "full", "skip", "CLAUDE.md absent")
else:
text = claude_md.read_text()
if "get_kaizen_agent" in text or "kaizen" in text.lower():
_r("C12", "Agents template applied", "full", "pass")
else:
_r("C12", "Agents template applied", "full", "fail",
"CLAUDE.md has no kaizen agent reference")
# C13: consistency check clean (0 FAIL, 0 WARN — C-12 exempt)
if skip_consistency:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "skip", "Not checked in summary mode — use /repos/{slug}/doi for full check")
else:
try:
fail, warn, _ = await _run_consistency(slug, api_base)
if fail == 0 and warn == 0:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "pass")
elif fail == 0 and warn > 0:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "warn",
f"{warn} warn(s) — C-12 legacy tasks may be exempt")
else:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "fail",
f"{fail} fail(s), {warn} warn(s)")
except Exception as e:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "skip", f"Could not run: {e}")
# C14: host paths registered
host_paths = repo.get("host_paths") or {}
if host_paths:
_r("C14", "Host paths registered", "full", "pass",
f"{len(host_paths)} host(s): {', '.join(host_paths.keys())}")
else:
_r("C14", "Host paths registered", "full", "fail",
"host_paths empty — run update_repo_path() for each active machine")
# ── Compute tier ─────────────────────────────────────────────────────────
by_id = {r.id: r for r in results}
def _tier_pass(ids: set[str]) -> bool:
return all(by_id[i].status in ("pass", "warn") for i in ids if i in by_id)
core_pass = _tier_pass(CORE_IDS)
standard_pass = core_pass and _tier_pass(STANDARD_IDS)
full_pass = standard_pass and _tier_pass(FULL_IDS)
if full_pass:
tier: Tier = "full"
elif standard_pass:
tier = "standard"
elif core_pass:
tier = "core"
else:
tier = "none"
return DoIReport(
repo_slug=slug,
tier=tier,
core_pass=core_pass,
standard_pass=standard_pass,
full_pass=full_pass,
criteria=results,
)