537 lines
21 KiB
Python
537 lines
21 KiB
Python
"""DoI engine — evaluates all 14 Repository Definition of Integrated criteria.
|
|
|
|
Shared by the API endpoint (async) and the CLI check script (asyncio.run).
|
|
All checks use only the repo dict from /repos/{slug} + HTTP calls to the API
|
|
+ local filesystem reads. No direct DB access.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import re
|
|
import socket
|
|
import urllib.error
|
|
import urllib.request
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Literal
|
|
|
|
import yaml
|
|
|
|
CriterionStatus = Literal["pass", "fail", "warn", "skip"]
|
|
Tier = Literal["none", "core", "standard", "full"]
|
|
|
|
# Criteria that belong to each tier (in check order)
|
|
CORE_IDS = {"C1", "C2", "C3", "C4"}
|
|
STANDARD_IDS = {"C5a", "C5b", "C5c", "C6", "C7", "C8", "C9"}
|
|
FULL_IDS = {"C10", "C11", "C12", "C13", "C14"}
|
|
|
|
STANDARD_SCOPE_SECTIONS = [
|
|
"One-liner",
|
|
"Core Idea",
|
|
"In Scope",
|
|
"Out of Scope",
|
|
"Relevant When",
|
|
"Not Relevant When",
|
|
"Current State",
|
|
"How It Fits",
|
|
"Terminology",
|
|
"Related / Overlapping",
|
|
"Provided Capabilities",
|
|
]
|
|
|
|
_CAPABILITY_BLOCK_RE = re.compile(r"```capability\s*\n(.*?)```", re.DOTALL | re.IGNORECASE)
|
|
_H2_RE = re.compile(r"^##\s+(.+?)\s*$", re.MULTILINE)
|
|
|
|
|
|
@dataclass
|
|
class CriterionResult:
|
|
id: str
|
|
label: str
|
|
tier: str
|
|
status: CriterionStatus
|
|
detail: str = ""
|
|
|
|
|
|
@dataclass
|
|
class DoIReport:
|
|
repo_slug: str
|
|
tier: Tier
|
|
core_pass: bool
|
|
standard_pass: bool
|
|
full_pass: bool
|
|
criteria: list[CriterionResult] = field(default_factory=list)
|
|
checked_at: str = field(default_factory=lambda: datetime.now(tz=timezone.utc).isoformat())
|
|
|
|
|
|
def evaluate_scope_health(repo: dict) -> list[dict[str, Any]]:
|
|
"""Return machine-readable SCOPE.md health issues for C5a/C5b/C5c.
|
|
|
|
The returned records intentionally mirror DoI criterion IDs while carrying
|
|
section-level hints that downstream repo-scoping can use to refresh only
|
|
the affected parts of SCOPE.md.
|
|
"""
|
|
repo_path = _resolve_path(repo)
|
|
if not repo_path:
|
|
return [
|
|
{
|
|
"id": "C5a",
|
|
"label": "SCOPE.md present",
|
|
"status": "skip",
|
|
"detail": "Local path unavailable",
|
|
"missing_sections": [],
|
|
"invalid_capability_blocks": [],
|
|
"needs_refresh_sections": [],
|
|
},
|
|
{
|
|
"id": "C5b",
|
|
"label": "SCOPE.md standard sections",
|
|
"status": "skip",
|
|
"detail": "Local path unavailable",
|
|
"missing_sections": [],
|
|
"invalid_capability_blocks": [],
|
|
"needs_refresh_sections": [],
|
|
},
|
|
{
|
|
"id": "C5c",
|
|
"label": "SCOPE.md capability blocks",
|
|
"status": "skip",
|
|
"detail": "Local path unavailable",
|
|
"missing_sections": [],
|
|
"invalid_capability_blocks": [],
|
|
"needs_refresh_sections": [],
|
|
},
|
|
]
|
|
|
|
scope_path = Path(repo_path) / "SCOPE.md"
|
|
if not scope_path.exists():
|
|
return [
|
|
{
|
|
"id": "C5a",
|
|
"label": "SCOPE.md present",
|
|
"status": "fail",
|
|
"detail": "SCOPE.md not found at repo root",
|
|
"missing_sections": STANDARD_SCOPE_SECTIONS.copy(),
|
|
"invalid_capability_blocks": [],
|
|
"needs_refresh_sections": STANDARD_SCOPE_SECTIONS.copy(),
|
|
},
|
|
{
|
|
"id": "C5b",
|
|
"label": "SCOPE.md standard sections",
|
|
"status": "skip",
|
|
"detail": "SCOPE.md absent",
|
|
"missing_sections": STANDARD_SCOPE_SECTIONS.copy(),
|
|
"invalid_capability_blocks": [],
|
|
"needs_refresh_sections": STANDARD_SCOPE_SECTIONS.copy(),
|
|
},
|
|
{
|
|
"id": "C5c",
|
|
"label": "SCOPE.md capability blocks",
|
|
"status": "skip",
|
|
"detail": "SCOPE.md absent",
|
|
"missing_sections": [],
|
|
"invalid_capability_blocks": [],
|
|
"needs_refresh_sections": ["Provided Capabilities"],
|
|
},
|
|
]
|
|
|
|
text = scope_path.read_text()
|
|
issues: list[dict[str, Any]] = [{
|
|
"id": "C5a",
|
|
"label": "SCOPE.md present",
|
|
"status": "pass",
|
|
"detail": "",
|
|
"missing_sections": [],
|
|
"invalid_capability_blocks": [],
|
|
"needs_refresh_sections": [],
|
|
}]
|
|
|
|
headings = {h.strip() for h in _H2_RE.findall(text)}
|
|
missing_sections = [section for section in STANDARD_SCOPE_SECTIONS if section not in headings]
|
|
if missing_sections:
|
|
issues.append({
|
|
"id": "C5b",
|
|
"label": "SCOPE.md standard sections",
|
|
"status": "warn",
|
|
"detail": f"Missing H2 section(s): {', '.join(missing_sections)}",
|
|
"missing_sections": missing_sections,
|
|
"invalid_capability_blocks": [],
|
|
"needs_refresh_sections": missing_sections,
|
|
})
|
|
else:
|
|
issues.append({
|
|
"id": "C5b",
|
|
"label": "SCOPE.md standard sections",
|
|
"status": "pass",
|
|
"detail": f"All {len(STANDARD_SCOPE_SECTIONS)} standard sections present",
|
|
"missing_sections": [],
|
|
"invalid_capability_blocks": [],
|
|
"needs_refresh_sections": [],
|
|
})
|
|
|
|
capability_blocks = _CAPABILITY_BLOCK_RE.findall(text)
|
|
valid_blocks = 0
|
|
invalid_blocks: list[dict[str, Any]] = []
|
|
for index, block in enumerate(capability_blocks, start=1):
|
|
try:
|
|
parsed = yaml.safe_load(block) or {}
|
|
if isinstance(parsed, dict) and parsed.get("type") and parsed.get("title"):
|
|
valid_blocks += 1
|
|
else:
|
|
invalid_blocks.append({
|
|
"index": index,
|
|
"reason": "Capability block must be YAML with type and title",
|
|
})
|
|
except yaml.YAMLError as exc:
|
|
invalid_blocks.append({"index": index, "reason": str(exc)})
|
|
|
|
if valid_blocks > 0:
|
|
issues.append({
|
|
"id": "C5c",
|
|
"label": "SCOPE.md capability blocks",
|
|
"status": "pass",
|
|
"detail": f"{valid_blocks} valid capability block(s)",
|
|
"missing_sections": [],
|
|
"invalid_capability_blocks": invalid_blocks,
|
|
"needs_refresh_sections": [],
|
|
})
|
|
else:
|
|
detail = "No fenced capability block found"
|
|
if invalid_blocks:
|
|
detail = "No valid capability block found"
|
|
issues.append({
|
|
"id": "C5c",
|
|
"label": "SCOPE.md capability blocks",
|
|
"status": "warn",
|
|
"detail": detail,
|
|
"missing_sections": [],
|
|
"invalid_capability_blocks": invalid_blocks,
|
|
"needs_refresh_sections": ["Provided Capabilities"],
|
|
})
|
|
|
|
return issues
|
|
|
|
|
|
def compute_fingerprint(
|
|
repo: dict,
|
|
latest_tpsc_snap_at: str | None,
|
|
latest_goal_updated_at: str | None,
|
|
) -> str:
|
|
"""Compute a pipe-joined fingerprint of all inputs that affect DoI criteria.
|
|
|
|
If any component changes, the fingerprint changes and the cache is invalidated:
|
|
- repo.updated_at → covers last_sbom_at, remote_url, host_paths, domain changes
|
|
- latest_tpsc_snap_at → C9 (TPSC snapshot exists)
|
|
- latest_goal_updated_at → C10 (active repo goal)
|
|
- mtime of SCOPE.md, CLAUDE.md, tpsc.yaml → C5, C6, C9, C11, C12
|
|
"""
|
|
parts = [
|
|
str(repo.get("updated_at") or ""),
|
|
str(latest_tpsc_snap_at or ""),
|
|
str(latest_goal_updated_at or ""),
|
|
]
|
|
repo_path = _resolve_path(repo)
|
|
if repo_path:
|
|
for fname in ("SCOPE.md", "CLAUDE.md", "tpsc.yaml"):
|
|
f = Path(repo_path) / fname
|
|
try:
|
|
parts.append(f"{fname}:{f.stat().st_mtime:.3f}")
|
|
except FileNotFoundError:
|
|
parts.append(f"{fname}:absent")
|
|
return "|".join(parts)
|
|
|
|
|
|
def _resolve_path(repo: dict) -> str:
|
|
hostname = socket.gethostname()
|
|
host_paths = repo.get("host_paths") or {}
|
|
candidates = []
|
|
if host_paths.get(hostname):
|
|
candidates.append(host_paths[hostname])
|
|
if repo.get("local_path"):
|
|
candidates.append(repo["local_path"])
|
|
for raw in candidates:
|
|
p = Path(raw).expanduser()
|
|
if p.is_dir():
|
|
return str(p)
|
|
return ""
|
|
|
|
|
|
def resolve_repo_path(repo: dict) -> str:
|
|
"""Resolve the repo path using the same host-aware rules as DoI checks."""
|
|
return _resolve_path(repo)
|
|
|
|
|
|
def _get_sync(api_base: str, path: str, params: dict | None = None) -> object:
|
|
url = f"{api_base}{path}"
|
|
if params:
|
|
q = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
|
|
if q:
|
|
url = f"{url}?{q}"
|
|
req = urllib.request.Request(url, headers={"Accept": "application/json"})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=5) as r:
|
|
return json.loads(r.read())
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
async def _get(api_base: str, path: str, params: dict | None = None) -> object:
|
|
"""Async wrapper — runs blocking urllib in a thread so the event loop stays free."""
|
|
return await asyncio.to_thread(_get_sync, api_base, path, params)
|
|
|
|
|
|
async def _run_consistency(repo_slug: str, api_base: str) -> tuple[int, int, int]:
|
|
"""Run consistency_check.py and return (fail, warn, info) counts."""
|
|
script = Path(__file__).parent.parent / "scripts" / "consistency_check.py"
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"uv", "run", "python", str(script),
|
|
"--repo", repo_slug,
|
|
"--api-base", api_base,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
cwd=str(Path(__file__).parent.parent),
|
|
)
|
|
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30)
|
|
text = stdout.decode()
|
|
fail = warn = info = 0
|
|
for line in text.splitlines():
|
|
if "Summary:" in line:
|
|
parts = line.split("|")
|
|
for p in parts:
|
|
p = p.strip()
|
|
if "fail" in p:
|
|
try: fail = int(p.split()[0])
|
|
except ValueError: pass
|
|
elif "warn" in p:
|
|
try: warn = int(p.split()[0])
|
|
except ValueError: pass
|
|
elif "info" in p:
|
|
try: info = int(p.split()[0])
|
|
except ValueError: pass
|
|
return fail, warn, info
|
|
|
|
|
|
async def evaluate(
|
|
repo: dict,
|
|
api_base: str = "http://127.0.0.1:8000",
|
|
skip_consistency: bool = False,
|
|
prefetch: dict | None = None,
|
|
) -> DoIReport:
|
|
"""Evaluate all 14 DoI criteria for a repo.
|
|
|
|
Args:
|
|
repo: Repo dict (slug, domain_slug, local_path, remote_url, host_paths, last_sbom_at).
|
|
api_base: API base URL — only used when prefetch is absent.
|
|
skip_consistency: Skip C7/C13 subprocess calls (used in summary mode).
|
|
prefetch: Optional pre-fetched bulk data to avoid HTTP self-calls:
|
|
{
|
|
"domain_status": {"custodian": "active", ...}, # slug → status
|
|
"tpsc_snap_counts": {"llm-connect": 1, ...}, # repo_slug → count
|
|
"active_goal_counts": {"llm-connect": 0, ...}, # repo_slug → count
|
|
}
|
|
"""
|
|
slug = repo.get("slug", "unknown")
|
|
results: list[CriterionResult] = []
|
|
|
|
def _r(id: str, label: str, tier: str, status: CriterionStatus, detail: str = "") -> CriterionResult:
|
|
r = CriterionResult(id=id, label=label, tier=tier, status=status, detail=detail)
|
|
results.append(r)
|
|
return r
|
|
|
|
# ── Tier 1: Core ─────────────────────────────────────────────────────────
|
|
|
|
# C1: registered
|
|
_r("C1", "Registered in state-hub", "core", "pass", "Repo record exists")
|
|
|
|
# C2: domain assigned and active
|
|
domain_slug = repo.get("domain_slug") or ""
|
|
if not domain_slug:
|
|
_r("C2", "Domain assigned", "core", "fail", "No domain_slug on repo record")
|
|
else:
|
|
if prefetch and "domain_status" in prefetch:
|
|
dom_status = prefetch["domain_status"].get(domain_slug)
|
|
else:
|
|
d = await _get(api_base, f"/domains/{domain_slug}/")
|
|
dom_status = d.get("status") if d else None
|
|
if dom_status == "active":
|
|
_r("C2", "Domain assigned", "core", "pass", f"domain: {domain_slug}")
|
|
elif dom_status:
|
|
_r("C2", "Domain assigned", "core", "warn", f"Domain '{domain_slug}' status: {dom_status}")
|
|
else:
|
|
_r("C2", "Domain assigned", "core", "fail", f"Domain '{domain_slug}' not found")
|
|
|
|
# C3: local path resolves
|
|
repo_path = _resolve_path(repo)
|
|
if repo_path:
|
|
_r("C3", "Local path resolves", "core", "pass", repo_path)
|
|
else:
|
|
raw = repo.get("local_path") or "(none)"
|
|
_r("C3", "Local path resolves", "core", "fail", f"Path not accessible: {raw}")
|
|
|
|
# C4: remote URL set
|
|
remote = repo.get("remote_url") or ""
|
|
if remote.strip():
|
|
_r("C4", "Remote URL set", "core", "pass", remote)
|
|
else:
|
|
_r("C4", "Remote URL set", "core", "fail", "remote_url is empty")
|
|
|
|
# ── Tier 2: Standard ─────────────────────────────────────────────────────
|
|
|
|
# C5a/C5b/C5c: SCOPE.md structure and capability declarations
|
|
for issue in evaluate_scope_health(repo):
|
|
_r(issue["id"], issue["label"], "standard", issue["status"], issue["detail"])
|
|
|
|
# C6: CLAUDE.md
|
|
if not repo_path:
|
|
_r("C6", "CLAUDE.md present", "standard", "skip", "Local path unavailable")
|
|
elif (Path(repo_path) / "CLAUDE.md").exists():
|
|
_r("C6", "CLAUDE.md present", "standard", "pass")
|
|
else:
|
|
_r("C6", "CLAUDE.md present", "standard", "fail", "CLAUDE.md not found at repo root")
|
|
|
|
# C7: workplan convention — consistency check 0 FAIL
|
|
if skip_consistency:
|
|
_r("C7", "Workplan convention (0 FAIL)", "standard", "skip", "Not checked in summary mode — use /repos/{slug}/doi for full check")
|
|
else:
|
|
try:
|
|
fail, warn, _ = await _run_consistency(slug, api_base)
|
|
if fail == 0:
|
|
_r("C7", "Workplan convention (0 FAIL)", "standard", "pass", f"consistency: {fail} fail / {warn} warn")
|
|
else:
|
|
_r("C7", "Workplan convention (0 FAIL)", "standard", "fail", f"consistency: {fail} fail / {warn} warn")
|
|
except Exception as e:
|
|
_r("C7", "Workplan convention (0 FAIL)", "standard", "skip", f"Could not run consistency check: {e}")
|
|
|
|
# C8: SBOM ingested
|
|
last_sbom = repo.get("last_sbom_at")
|
|
if last_sbom:
|
|
_r("C8", "SBOM ingested", "standard", "pass", f"last ingested: {last_sbom[:10]}")
|
|
else:
|
|
_r("C8", "SBOM ingested", "standard", "fail", "last_sbom_at not set — run make ingest-sbom")
|
|
|
|
# C9: TPSC declared (tpsc.yaml present + snapshot exists)
|
|
tpsc_file_ok = repo_path and (Path(repo_path) / "tpsc.yaml").exists()
|
|
if prefetch and "tpsc_snap_counts" in prefetch:
|
|
has_snap = (prefetch["tpsc_snap_counts"].get(slug, 0) > 0)
|
|
snap_count = prefetch["tpsc_snap_counts"].get(slug, 0)
|
|
else:
|
|
tpsc_snaps = await _get(api_base, "/tpsc/snapshots/", {"repo_slug": slug}) or []
|
|
has_snap = len(tpsc_snaps) > 0
|
|
snap_count = len(tpsc_snaps)
|
|
if not repo_path:
|
|
_r("C9", "TPSC declared", "standard", "skip", "Local path unavailable")
|
|
elif tpsc_file_ok and has_snap:
|
|
_r("C9", "TPSC declared", "standard", "pass", f"{snap_count} snapshot(s)")
|
|
elif tpsc_file_ok and not has_snap:
|
|
_r("C9", "TPSC declared", "standard", "warn", "tpsc.yaml exists but not yet ingested — run make ingest-tpsc")
|
|
elif not tpsc_file_ok:
|
|
_r("C9", "TPSC declared", "standard", "fail", "tpsc.yaml missing at repo root")
|
|
|
|
# ── Tier 3: Full ─────────────────────────────────────────────────────────
|
|
|
|
# C10: active repo goal
|
|
if prefetch and "active_goal_counts" in prefetch:
|
|
active_goal_count = prefetch["active_goal_counts"].get(slug, 0)
|
|
else:
|
|
goals = await _get(api_base, "/repo-goals/", {"repo_slug": slug}) or []
|
|
active_goal_count = sum(1 for g in goals if g.get("status") == "active")
|
|
if active_goal_count > 0:
|
|
_r("C10", "Active repo goal", "full", "pass", f"{active_goal_count} active goal(s)")
|
|
else:
|
|
_r("C10", "Active repo goal", "full", "fail", "No active repo goal — create one with create_repo_goal()")
|
|
|
|
# C11: Provided Capabilities declared in SCOPE.md
|
|
if not repo_path:
|
|
_r("C11", "Provided Capabilities declared", "full", "skip", "Local path unavailable")
|
|
else:
|
|
scope = Path(repo_path) / "SCOPE.md"
|
|
if not scope.exists():
|
|
_r("C11", "Provided Capabilities declared", "full", "skip", "SCOPE.md absent")
|
|
else:
|
|
text = scope.read_text()
|
|
has_cap_block = "```capability" in text
|
|
has_none_explicit = "## Provided Capabilities" in text and (
|
|
"none" in text.lower().split("## provided capabilities")[-1][:200]
|
|
or "no capabilities" in text.lower().split("## provided capabilities")[-1][:200]
|
|
)
|
|
if has_cap_block:
|
|
_r("C11", "Provided Capabilities declared", "full", "pass", "capability block(s) found in SCOPE.md")
|
|
elif has_none_explicit:
|
|
_r("C11", "Provided Capabilities declared", "full", "pass", "Explicitly declared none in SCOPE.md")
|
|
elif "## Provided Capabilities" in text:
|
|
_r("C11", "Provided Capabilities declared", "full", "warn",
|
|
"Section present but no capability block or explicit none — add blocks or state 'none'")
|
|
else:
|
|
_r("C11", "Provided Capabilities declared", "full", "fail",
|
|
"No '## Provided Capabilities' section in SCOPE.md")
|
|
|
|
# C12: agents template applied (CLAUDE.md mentions kaizen)
|
|
if not repo_path:
|
|
_r("C12", "Agents template applied", "full", "skip", "Local path unavailable")
|
|
else:
|
|
claude_md = Path(repo_path) / "CLAUDE.md"
|
|
if not claude_md.exists():
|
|
_r("C12", "Agents template applied", "full", "skip", "CLAUDE.md absent")
|
|
else:
|
|
text = claude_md.read_text()
|
|
if "get_kaizen_agent" in text or "kaizen" in text.lower():
|
|
_r("C12", "Agents template applied", "full", "pass")
|
|
else:
|
|
_r("C12", "Agents template applied", "full", "fail",
|
|
"CLAUDE.md has no kaizen agent reference")
|
|
|
|
# C13: consistency check clean (0 FAIL, 0 WARN — C-12 exempt)
|
|
if skip_consistency:
|
|
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "skip", "Not checked in summary mode — use /repos/{slug}/doi for full check")
|
|
else:
|
|
try:
|
|
fail, warn, _ = await _run_consistency(slug, api_base)
|
|
if fail == 0 and warn == 0:
|
|
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "pass")
|
|
elif fail == 0 and warn > 0:
|
|
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "warn",
|
|
f"{warn} warn(s) — C-12 legacy tasks may be exempt")
|
|
else:
|
|
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "fail",
|
|
f"{fail} fail(s), {warn} warn(s)")
|
|
except Exception as e:
|
|
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "skip", f"Could not run: {e}")
|
|
|
|
# C14: host paths registered
|
|
host_paths = repo.get("host_paths") or {}
|
|
if host_paths:
|
|
_r("C14", "Host paths registered", "full", "pass",
|
|
f"{len(host_paths)} host(s): {', '.join(host_paths.keys())}")
|
|
else:
|
|
_r("C14", "Host paths registered", "full", "fail",
|
|
"host_paths empty — run update_repo_path() for each active machine")
|
|
|
|
# ── Compute tier ─────────────────────────────────────────────────────────
|
|
by_id = {r.id: r for r in results}
|
|
|
|
def _tier_pass(ids: set[str]) -> bool:
|
|
return all(by_id[i].status in ("pass", "warn") for i in ids if i in by_id)
|
|
|
|
core_pass = _tier_pass(CORE_IDS)
|
|
standard_pass = core_pass and _tier_pass(STANDARD_IDS)
|
|
full_pass = standard_pass and _tier_pass(FULL_IDS)
|
|
|
|
if full_pass:
|
|
tier: Tier = "full"
|
|
elif standard_pass:
|
|
tier = "standard"
|
|
elif core_pass:
|
|
tier = "core"
|
|
else:
|
|
tier = "none"
|
|
|
|
return DoIReport(
|
|
repo_slug=slug,
|
|
tier=tier,
|
|
core_pass=core_pass,
|
|
standard_pass=standard_pass,
|
|
full_pass=full_pass,
|
|
criteria=results,
|
|
)
|