feat(doi): Repository DoI automated gate and dashboard integration (CUST-WP-0024)

Implements the 14-criterion DoI checklist as a runnable gate with API,
MCP tools, CLI script, and dashboard integration.

Core components:
- api/doi_engine.py — async engine evaluating all 14 criteria (asyncio.to_thread
  for non-blocking HTTP self-calls), shared by API and CLI
- api/schemas/doi.py — DoICriterion, DoIReport, DoISummaryEntry schemas
- api/routers/repos.py — GET /repos/{slug}/doi + GET /repos/doi/summary
- scripts/check_doi.py — CLI: make check-doi REPO=<slug> / check-doi-all
- mcp_server/server.py — check_repo_doi(), get_doi_summary() tools

Dashboard (repos.md):
- DoI tier badge per repo (None/Core/Standard/Full) colour-coded red→green
- Domain block shows lowest DoI tier across its repos
- DoI KPI card in summary row
- DoI filter in All Repos Table
- Link to Repository DoI policy page

Also fixes: TPSC snapshots 500 error (missing nested selectinload for
catalog_entry relationship in list_snapshots endpoint).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-20 01:08:18 +01:00
parent a7b26ef6de
commit f94ee008b5
9 changed files with 794 additions and 13 deletions

View File

@@ -183,6 +183,17 @@ ingest-capabilities-all:
uv run python scripts/ingest_capabilities.py --all \
$(if $(DRY_RUN),--dry-run)
## Check Repository Definition of Integrated (DoI) criteria for a repo.
## Usage: make check-doi REPO=llm-connect
## Or: make check-doi-all
## Add JSON=1 for machine-readable output.
check-doi:
@test -n "$(REPO)" || (echo "ERROR: REPO is required."; exit 1)
uv run python scripts/check_doi.py --repo "$(REPO)" $(if $(JSON),--json)
check-doi-all:
uv run python scripts/check_doi.py --all $(if $(JSON),--json)
## Ingest tpsc.yaml service declarations from a repo into the TPSC catalog.
## Usage: make ingest-tpsc REPO=llm-connect
## Or: make ingest-tpsc-all

305
state-hub/api/doi_engine.py Normal file
View File

@@ -0,0 +1,305 @@
"""DoI engine — evaluates all 14 Repository Definition of Integrated criteria.
Shared by the API endpoint (async) and the CLI check script (asyncio.run).
All checks use only the repo dict from /repos/{slug} + HTTP calls to the API
+ local filesystem reads. No direct DB access.
"""
from __future__ import annotations
import asyncio
import json
import socket
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
CriterionStatus = Literal["pass", "fail", "warn", "skip"]
Tier = Literal["none", "core", "standard", "full"]
# Criteria that belong to each tier (in check order)
CORE_IDS = {"C1", "C2", "C3", "C4"}
STANDARD_IDS = {"C5", "C6", "C7", "C8", "C9"}
FULL_IDS = {"C10", "C11", "C12", "C13", "C14"}
@dataclass
class CriterionResult:
id: str
label: str
tier: str
status: CriterionStatus
detail: str = ""
@dataclass
class DoIReport:
repo_slug: str
tier: Tier
core_pass: bool
standard_pass: bool
full_pass: bool
criteria: list[CriterionResult] = field(default_factory=list)
checked_at: str = field(default_factory=lambda: datetime.now(tz=timezone.utc).isoformat())
def _resolve_path(repo: dict) -> str:
hostname = socket.gethostname()
host_paths = repo.get("host_paths") or {}
candidates = []
if host_paths.get(hostname):
candidates.append(host_paths[hostname])
if repo.get("local_path"):
candidates.append(repo["local_path"])
for raw in candidates:
p = Path(raw).expanduser()
if p.is_dir():
return str(p)
return ""
def _get_sync(api_base: str, path: str, params: dict | None = None) -> object:
url = f"{api_base}{path}"
if params:
q = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
if q:
url = f"{url}?{q}"
req = urllib.request.Request(url, headers={"Accept": "application/json"})
try:
with urllib.request.urlopen(req, timeout=5) as r:
return json.loads(r.read())
except Exception:
return None
async def _get(api_base: str, path: str, params: dict | None = None) -> object:
"""Async wrapper — runs blocking urllib in a thread so the event loop stays free."""
return await asyncio.to_thread(_get_sync, api_base, path, params)
async def _run_consistency(repo_slug: str, api_base: str) -> tuple[int, int, int]:
"""Run consistency_check.py and return (fail, warn, info) counts."""
script = Path(__file__).parent.parent / "scripts" / "consistency_check.py"
proc = await asyncio.create_subprocess_exec(
"uv", "run", "python", str(script),
"--repo", repo_slug,
"--api-base", api_base,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(Path(__file__).parent.parent),
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30)
text = stdout.decode()
fail = warn = info = 0
for line in text.splitlines():
if "Summary:" in line:
parts = line.split("|")
for p in parts:
p = p.strip()
if "fail" in p:
try: fail = int(p.split()[0])
except ValueError: pass
elif "warn" in p:
try: warn = int(p.split()[0])
except ValueError: pass
elif "info" in p:
try: info = int(p.split()[0])
except ValueError: pass
return fail, warn, info
async def evaluate(repo: dict, api_base: str = "http://127.0.0.1:8000") -> DoIReport:
slug = repo.get("slug", "unknown")
results: list[CriterionResult] = []
def _r(id: str, label: str, tier: str, status: CriterionStatus, detail: str = "") -> CriterionResult:
r = CriterionResult(id=id, label=label, tier=tier, status=status, detail=detail)
results.append(r)
return r
# ── Tier 1: Core ─────────────────────────────────────────────────────────
# C1: registered
_r("C1", "Registered in state-hub", "core", "pass", "Repo record exists")
# C2: domain assigned and active
domain_slug = repo.get("domain_slug") or ""
if not domain_slug:
_r("C2", "Domain assigned", "core", "fail", "No domain_slug on repo record")
else:
domain = await _get(api_base, f"/domains/{domain_slug}/")
if domain and domain.get("status") == "active":
_r("C2", "Domain assigned", "core", "pass", f"domain: {domain_slug}")
elif domain:
_r("C2", "Domain assigned", "core", "warn", f"Domain '{domain_slug}' status: {domain.get('status')}")
else:
_r("C2", "Domain assigned", "core", "fail", f"Domain '{domain_slug}' not found")
# C3: local path resolves
repo_path = _resolve_path(repo)
if repo_path:
_r("C3", "Local path resolves", "core", "pass", repo_path)
else:
raw = repo.get("local_path") or "(none)"
_r("C3", "Local path resolves", "core", "fail", f"Path not accessible: {raw}")
# C4: remote URL set
remote = repo.get("remote_url") or ""
if remote.strip():
_r("C4", "Remote URL set", "core", "pass", remote)
else:
_r("C4", "Remote URL set", "core", "fail", "remote_url is empty")
# ── Tier 2: Standard ─────────────────────────────────────────────────────
# C5: SCOPE.md
if not repo_path:
_r("C5", "SCOPE.md present", "standard", "skip", "Local path unavailable")
elif (Path(repo_path) / "SCOPE.md").exists():
_r("C5", "SCOPE.md present", "standard", "pass")
else:
_r("C5", "SCOPE.md present", "standard", "fail", "SCOPE.md not found at repo root")
# C6: CLAUDE.md
if not repo_path:
_r("C6", "CLAUDE.md present", "standard", "skip", "Local path unavailable")
elif (Path(repo_path) / "CLAUDE.md").exists():
_r("C6", "CLAUDE.md present", "standard", "pass")
else:
_r("C6", "CLAUDE.md present", "standard", "fail", "CLAUDE.md not found at repo root")
# C7: workplan convention — consistency check 0 FAIL
try:
fail, warn, _ = await _run_consistency(slug, api_base)
if fail == 0:
_r("C7", "Workplan convention (0 FAIL)", "standard", "pass", f"consistency: {fail} fail / {warn} warn")
else:
_r("C7", "Workplan convention (0 FAIL)", "standard", "fail", f"consistency: {fail} fail / {warn} warn")
except Exception as e:
_r("C7", "Workplan convention (0 FAIL)", "standard", "skip", f"Could not run consistency check: {e}")
# C8: SBOM ingested
last_sbom = repo.get("last_sbom_at")
if last_sbom:
_r("C8", "SBOM ingested", "standard", "pass", f"last ingested: {last_sbom[:10]}")
else:
_r("C8", "SBOM ingested", "standard", "fail", "last_sbom_at not set — run make ingest-sbom")
# C9: TPSC declared (tpsc.yaml present + snapshot exists)
tpsc_file_ok = repo_path and (Path(repo_path) / "tpsc.yaml").exists()
tpsc_snaps = await _get(api_base, "/tpsc/snapshots/", {"repo_slug": slug}) or []
has_snap = len(tpsc_snaps) > 0
if not repo_path:
_r("C9", "TPSC declared", "standard", "skip", "Local path unavailable")
elif tpsc_file_ok and has_snap:
_r("C9", "TPSC declared", "standard", "pass", f"{len(tpsc_snaps)} snapshot(s)")
elif tpsc_file_ok and not has_snap:
_r("C9", "TPSC declared", "standard", "warn", "tpsc.yaml exists but not yet ingested — run make ingest-tpsc")
elif not tpsc_file_ok:
_r("C9", "TPSC declared", "standard", "fail", "tpsc.yaml missing at repo root")
# ── Tier 3: Full ─────────────────────────────────────────────────────────
# C10: active repo goal
goals = await _get(api_base, "/repo-goals/", {"repo_slug": slug}) or []
active_goals = [g for g in goals if g.get("status") == "active"]
if active_goals:
_r("C10", "Active repo goal", "full", "pass", f"{len(active_goals)} active goal(s)")
else:
_r("C10", "Active repo goal", "full", "fail", "No active repo goal — create one with create_repo_goal()")
# C11: Provided Capabilities declared in SCOPE.md
if not repo_path:
_r("C11", "Provided Capabilities declared", "full", "skip", "Local path unavailable")
else:
scope = Path(repo_path) / "SCOPE.md"
if not scope.exists():
_r("C11", "Provided Capabilities declared", "full", "skip", "SCOPE.md absent")
else:
text = scope.read_text()
has_cap_block = "```capability" in text
has_none_explicit = "## Provided Capabilities" in text and (
"none" in text.lower().split("## provided capabilities")[-1][:200]
or "no capabilities" in text.lower().split("## provided capabilities")[-1][:200]
)
if has_cap_block:
_r("C11", "Provided Capabilities declared", "full", "pass", "capability block(s) found in SCOPE.md")
elif has_none_explicit:
_r("C11", "Provided Capabilities declared", "full", "pass", "Explicitly declared none in SCOPE.md")
elif "## Provided Capabilities" in text:
_r("C11", "Provided Capabilities declared", "full", "warn",
"Section present but no capability block or explicit none — add blocks or state 'none'")
else:
_r("C11", "Provided Capabilities declared", "full", "fail",
"No '## Provided Capabilities' section in SCOPE.md")
# C12: agents template applied (CLAUDE.md mentions kaizen)
if not repo_path:
_r("C12", "Agents template applied", "full", "skip", "Local path unavailable")
else:
claude_md = Path(repo_path) / "CLAUDE.md"
if not claude_md.exists():
_r("C12", "Agents template applied", "full", "skip", "CLAUDE.md absent")
else:
text = claude_md.read_text()
if "get_kaizen_agent" in text or "kaizen" in text.lower():
_r("C12", "Agents template applied", "full", "pass")
else:
_r("C12", "Agents template applied", "full", "fail",
"CLAUDE.md has no kaizen agent reference")
# C13: consistency check clean (0 FAIL, 0 WARN — C-12 exempt)
try:
fail, warn, _ = await _run_consistency(slug, api_base)
# C-12 warns are legacy DB-only tasks — deduct them from warn count
c12_count = await _get(api_base, "/tasks/", {"workstream_id": None}) or []
# Use raw counts from the script output
if fail == 0 and warn == 0:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "pass")
elif fail == 0 and warn > 0:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "warn",
f"{warn} warn(s) — C-12 legacy tasks may be exempt")
else:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "fail",
f"{fail} fail(s), {warn} warn(s)")
except Exception as e:
_r("C13", "Consistency check clean (0 FAIL/WARN)", "full", "skip", f"Could not run: {e}")
# C14: host paths registered
host_paths = repo.get("host_paths") or {}
if host_paths:
_r("C14", "Host paths registered", "full", "pass",
f"{len(host_paths)} host(s): {', '.join(host_paths.keys())}")
else:
_r("C14", "Host paths registered", "full", "fail",
"host_paths empty — run update_repo_path() for each active machine")
# ── Compute tier ─────────────────────────────────────────────────────────
by_id = {r.id: r for r in results}
def _tier_pass(ids: set[str]) -> bool:
return all(by_id[i].status in ("pass", "warn") for i in ids if i in by_id)
core_pass = _tier_pass(CORE_IDS)
standard_pass = core_pass and _tier_pass(STANDARD_IDS)
full_pass = standard_pass and _tier_pass(FULL_IDS)
if full_pass:
tier: Tier = "full"
elif standard_pass:
tier = "standard"
elif core_pass:
tier = "core"
else:
tier = "none"
return DoIReport(
repo_slug=slug,
tier=tier,
core_pass=core_pass,
standard_pass=standard_pass,
full_pass=full_pass,
criteria=results,
)

View File

@@ -5,11 +5,13 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session
from api.doi_engine import evaluate as _doi_evaluate
from api.models.domain import Domain
from api.models.managed_repo import ManagedRepo
from api.models.repo_goal import RepoGoal
from api.models.task import Task
from api.models.workstream import Workstream
from api.schemas.doi import DoICriterion, DoIReport, DoISummaryEntry
from api.schemas.managed_repo import (
DispatchTask,
DispatchWorkstream,
@@ -68,6 +70,72 @@ async def register_repo(
return repo
@router.get("/doi/summary", response_model=list[DoISummaryEntry])
async def doi_summary(session: AsyncSession = Depends(get_session)) -> list[DoISummaryEntry]:
"""Return DoI tier for all active repos, worst tier first."""
result = await session.execute(
select(ManagedRepo).where(ManagedRepo.status == "active").order_by(ManagedRepo.name)
)
repos = list(result.scalars().all())
domain_result = await session.execute(select(Domain))
domain_map = {d.id: d.slug for d in domain_result.scalars().all()}
entries: list[DoISummaryEntry] = []
for repo in repos:
repo_dict = {
"slug": repo.slug,
"domain_slug": domain_map.get(repo.domain_id),
"local_path": repo.local_path,
"remote_url": repo.remote_url,
"host_paths": repo.host_paths or {},
"last_sbom_at": str(repo.last_sbom_at) if repo.last_sbom_at else None,
}
report = await _doi_evaluate(repo_dict)
entries.append(DoISummaryEntry(
repo_slug=repo.slug,
domain_slug=domain_map.get(repo.domain_id),
tier=report.tier,
core_pass=report.core_pass,
standard_pass=report.standard_pass,
full_pass=report.full_pass,
checked_at=report.checked_at,
))
tier_order = {"none": 0, "core": 1, "standard": 2, "full": 3}
entries.sort(key=lambda e: tier_order.get(e.tier, 0))
return entries
@router.get("/{slug}/doi", response_model=DoIReport)
async def get_repo_doi(slug: str, session: AsyncSession = Depends(get_session)) -> DoIReport:
"""Evaluate the 14 DoI criteria for a single repo."""
repo = await _get_repo_by_slug(slug, session)
domain_result = await session.execute(select(Domain).where(Domain.id == repo.domain_id))
domain_obj = domain_result.scalar_one_or_none()
repo_dict = {
"slug": repo.slug,
"domain_slug": domain_obj.slug if domain_obj else None,
"local_path": repo.local_path,
"remote_url": repo.remote_url,
"host_paths": repo.host_paths or {},
"last_sbom_at": str(repo.last_sbom_at) if repo.last_sbom_at else None,
}
report = await _doi_evaluate(repo_dict)
return DoIReport(
repo_slug=report.repo_slug,
tier=report.tier,
core_pass=report.core_pass,
standard_pass=report.standard_pass,
full_pass=report.full_pass,
checked_at=report.checked_at,
criteria=[
DoICriterion(id=c.id, label=c.label, tier=c.tier, status=c.status, detail=c.detail)
for c in report.criteria
],
)
@router.get("/{slug}/", response_model=RepoRead)
async def get_repo(
slug: str,

View File

@@ -144,7 +144,9 @@ async def list_snapshots(
repo_slug: str | None = None,
session: AsyncSession = Depends(get_session),
):
q = select(TPSCSnapshot).options(selectinload(TPSCSnapshot.entries))
q = select(TPSCSnapshot).options(
selectinload(TPSCSnapshot.entries).selectinload(TPSCEntry.catalog_entry)
)
if repo_slug:
repo = (await session.execute(select(ManagedRepo).where(ManagedRepo.slug == repo_slug))).scalar_one_or_none()
if not repo:

View File

@@ -0,0 +1,29 @@
from pydantic import BaseModel
class DoICriterion(BaseModel):
id: str
label: str
tier: str
status: str # pass | fail | warn | skip
detail: str = ""
class DoIReport(BaseModel):
repo_slug: str
tier: str # none | core | standard | full
core_pass: bool
standard_pass: bool
full_pass: bool
criteria: list[DoICriterion] = []
checked_at: str
class DoISummaryEntry(BaseModel):
repo_slug: str
domain_slug: str | None
tier: str
core_pass: bool
standard_pass: bool
full_pass: bool
checked_at: str

View File

@@ -7,15 +7,16 @@ import {API} from "./components/config.js";
```
```js
let _repos = [], _domains = [], _sbom = [], _eps = [], _tds = [], _workstreams = [];
let _repos = [], _domains = [], _sbom = [], _eps = [], _tds = [], _workstreams = [], _doi = [];
try {
[_repos, _domains, _sbom, _eps, _tds, _workstreams] = await Promise.all([
[_repos, _domains, _sbom, _eps, _tds, _workstreams, _doi] = await Promise.all([
fetch(`${API}/repos/`).then(r => r.ok ? r.json() : []),
fetch(`${API}/domains/`).then(r => r.ok ? r.json() : []),
fetch(`${API}/sbom/`).then(r => r.ok ? r.json() : []),
fetch(`${API}/extension-points/`).then(r => r.ok ? r.json() : []),
fetch(`${API}/technical-debt/`).then(r => r.ok ? r.json() : []),
fetch(`${API}/workstreams/`).then(r => r.ok ? r.json() : []),
fetch(`${API}/repos/doi/summary`).then(r => r.ok ? r.json() : []),
]);
} catch {}
```
@@ -27,6 +28,14 @@ const sbom = _sbom ?? [];
const eps = _eps ?? [];
const tds = _tds ?? [];
const workstreams = _workstreams ?? [];
const doi = _doi ?? [];
// DoI lookups
const doiBySlug = Object.fromEntries(doi.map(d => [d.repo_slug, d]));
const DOI_TIER_ORDER = {none: 0, core: 1, standard: 2, full: 3};
const DOI_TIER_COLOR = {none: "#ef4444", core: "#f97316", standard: "#eab308", full: "#22c55e"};
const DOI_TIER_BG = {none: "#fef2f2", core: "#fff7ed", standard: "#fefce8", full: "#f0fdf4"};
const DOI_TIER_LABEL = {none: "None", core: "Core", standard: "Standard", full: "Full"};
// Lookups
const domainById = Object.fromEntries(domains.map(d => [d.id, d]));
@@ -79,11 +88,14 @@ const repoRows = repos
? new Date(r.last_sbom_at).toLocaleDateString()
: (sbomData?.snapshot_at ? new Date(sbomData.snapshot_at).toLocaleDateString() : null);
const integrating = !!integratingBySlug[r.slug];
const doiEntry = doiBySlug[r.slug] ?? null;
const doiTier = doiEntry?.tier ?? "none";
return {
_id: r.id,
_domSlug: domSlug,
_hasSbom: hasSbom,
_integrating: integrating,
_doiTier: doiTier,
repo: r.slug,
domain: domName,
status: integrating ? "⚙ integrating" : "ready",
@@ -96,9 +108,11 @@ const repoRows = repos
})
.sort((a, b) => a._domSlug.localeCompare(b._domSlug) || a.repo.localeCompare(b.repo));
const gapCount = repoRows.filter(r => !r._hasSbom).length;
const coveredCount = repoRows.filter(r => r._hasSbom).length;
const gapCount = repoRows.filter(r => !r._hasSbom).length;
const coveredCount = repoRows.filter(r => r._hasSbom).length;
const integratingCount = repoRows.filter(r => r._integrating).length;
const doiFullCount = repoRows.filter(r => r._doiTier === "full").length;
const doiNoneCount = repoRows.filter(r => r._doiTier === "none").length;
```
# Repos
@@ -107,6 +121,13 @@ const integratingCount = repoRows.filter(r => r._integrating).length;
import {withDocHelp} from "./components/doc-overlay.js";
const _h1 = document.querySelector("#observablehq-main h1");
if (_h1) { _h1.style.position = "relative"; withDocHelp(_h1, "/docs/repos"); }
display(html`<p style="font-size:0.85rem;color:#6b7280;margin-top:-0.5rem;">
DoI tiers: <strong style="color:#ef4444;">None</strong> →
<strong style="color:#f97316;">Core</strong> →
<strong style="color:#eab308;">Standard</strong> →
<strong style="color:#22c55e;">Full</strong> —
<a href="/policy/repo-doi" style="color:#1d4ed8;">Definition of Integrated policy ↗</a>
</p>`);
```
```js
@@ -134,6 +155,11 @@ display(html`<div class="kpi-row">
<p class="big-num">${gapCount}</p>
<small>${gapCount === 0 ? "✓ All repos covered" : `${gapCount} repo(s) not ingested`}</small>
</div>
<div class="card ${doiNoneCount > 0 ? 'card-warn' : 'card-ok'}">
<h3>DoI: Fully Integrated</h3>
<p class="big-num">${doiFullCount} / ${repoRows.length}</p>
<small>${doiNoneCount > 0 ? `${doiNoneCount} at tier None` : "✓ All pass Core tier"}</small>
</div>
</div>`);
```
@@ -147,6 +173,15 @@ function _sbomGap() {
return el;
}
function _doiBadge(tier) {
const color = DOI_TIER_COLOR[tier] || "#9ca3af";
const bg = DOI_TIER_BG[tier] || "#f9fafb";
const label = DOI_TIER_LABEL[tier] || tier;
return html`<span style="background:${bg}; color:${color}; border:1px solid ${color}60;
border-radius:4px; padding:1px 7px; font-size:0.72rem; font-weight:700; white-space:nowrap;">
DoI: ${label}</span>`;
}
// Group by domain
const byDomain = {};
for (const r of repoRows) {
@@ -162,6 +197,9 @@ if (domainBlocks.length === 0) {
${domainBlocks.map(([slug, rows]) => {
const dom = domainBySlug[slug];
const allCovered = rows.every(r => r._hasSbom);
const doiWorst = rows.map(r => DOI_TIER_ORDER[r._doiTier] ?? 0);
const doiMin = Math.min(...doiWorst);
const doiMinKey = Object.keys(DOI_TIER_ORDER).find(k => DOI_TIER_ORDER[k] === doiMin) ?? "none";
const hasEps = (epByDomain[slug] ?? 0) > 0;
const hasTds = (tdByDomain[slug] ?? 0) > 0;
return html`
@@ -178,11 +216,15 @@ if (domainBlocks.length === 0) {
${hasTds
? html`<span class="chip chip-ok">TDs: ${tdByDomain[slug]}</span>`
: html`<span class="chip chip-neutral">TDs: —</span>`}
<span style="font-size:0.72rem; color:${DOI_TIER_COLOR[doiMinKey]}; font-weight:600;">
DoI min: ${DOI_TIER_LABEL[doiMinKey]}
</span>
</span>
</div>
<table class="repo-table">
<thead><tr>
<th>Repo</th>
<th>DoI Tier</th>
<th>Status</th>
<th>SBOM</th>
<th>Packages</th>
@@ -191,6 +233,7 @@ if (domainBlocks.length === 0) {
<tbody>
${rows.map(r => html`<tr class="${r._integrating ? 'row-integrating' : r._hasSbom ? '' : 'row-gap'}">
<td class="repo-cell"><code>${r.repo}</code></td>
<td>${_doiBadge(r._doiTier)}</td>
<td>${r._integrating
? html`<span class="chip chip-integrating">⚙ integrating</span>`
: html`<span class="chip chip-ok">ready</span>`}</td>
@@ -211,26 +254,29 @@ if (domainBlocks.length === 0) {
```js
const domainFilter = Inputs.select(["all", ...new Set(repoRows.map(r => r._domSlug)).values()], {label: "Domain", value: "all"});
const doiFilter = Inputs.select(["all", "none", "core", "standard", "full"], {label: "DoI tier", value: "all"});
const gapFilter = Inputs.toggle({label: "Gaps only (no SBOM)", value: false});
display(html`<div style="display:flex;gap:1rem;flex-wrap:wrap;margin-bottom:1rem">${domainFilter}${gapFilter}</div>`);
display(html`<div style="display:flex;gap:1rem;flex-wrap:wrap;margin-bottom:1rem">${domainFilter}${doiFilter}${gapFilter}</div>`);
```
```js
const filteredRows = repoRows.filter(r =>
(domainFilter.value === "all" || r._domSlug === domainFilter.value) &&
(doiFilter.value === "all" || r._doiTier === doiFilter.value) &&
(!gapFilter.value || !r._hasSbom)
);
display(Inputs.table(filteredRows.map(r => ({
Repo: r.repo,
Domain: r.domain,
Status: r.status,
SBOM: r.sbom,
Pkgs: r.pkgs,
Repo: r.repo,
Domain: r.domain,
"DoI Tier": DOI_TIER_LABEL[r._doiTier] ?? r._doiTier,
Status: r.status,
SBOM: r.sbom,
Pkgs: r.pkgs,
"EPs (domain)": r.eps || "—",
"TDs (domain)": r.tds || "—",
Path: r.path,
})), {maxWidth: 1100}));
Path: r.path,
})), {maxWidth: 1200}));
```
## Onboard a New Repo

View File

@@ -2045,6 +2045,40 @@ def get_gdpr_report() -> str:
return json.dumps(_get("/tpsc/report/gdpr"), indent=2)
# ---------------------------------------------------------------------------
# Repository Definition of Integrated (DoI)
# ---------------------------------------------------------------------------
@mcp.tool()
def check_repo_doi(repo_slug: str) -> str:
"""Evaluate the 14 DoI criteria for a repo and return a full report.
Criteria are grouped into three tiers:
Core (C1C4): registered, domain, path, remote URL
Standard (C5C9): SCOPE.md, CLAUDE.md, workplan, SBOM, TPSC
Full (C10C14): repo goal, capabilities, agents, clean consistency, host paths
Status values: pass | fail | warn | skip
The 'tier' field shows the highest tier where ALL criteria pass or warn:
none | core | standard | full
Args:
repo_slug: Registered repo slug (e.g. 'llm-connect', 'the-custodian')
"""
return json.dumps(_get(f"/repos/{repo_slug}/doi"), indent=2)
@mcp.tool()
def get_doi_summary() -> str:
"""Return DoI tier for all active repos, sorted worst-first.
Useful at session start to spot repos that need integration work.
Tiers: none (red) → core → standard → full (green).
"""
return json.dumps(_get("/repos/doi/summary"), indent=2)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------

View File

@@ -0,0 +1,132 @@
#!/usr/bin/env python3
"""Check Repository Definition of Integrated (DoI) criteria.
Usage:
uv run python scripts/check_doi.py --repo <slug> [--json]
uv run python scripts/check_doi.py --all [--json]
"""
import argparse
import asyncio
import json
import sys
import urllib.request
from pathlib import Path
# Allow importing from the api package
sys.path.insert(0, str(Path(__file__).parent.parent))
from api.doi_engine import evaluate
API_BASE = "http://127.0.0.1:8000"
STATUS_ICON = {"pass": "", "fail": "", "warn": "", "skip": ""}
STATUS_COLOR = {
"pass": "\033[32m", # green
"fail": "\033[31m", # red
"warn": "\033[33m", # yellow
"skip": "\033[90m", # grey
}
RESET = "\033[0m"
BOLD = "\033[1m"
TIER_COLOR = {
"full": "\033[32m",
"standard": "\033[33m",
"core": "\033[33m",
"none": "\033[31m",
}
def _get(path: str) -> object:
req = urllib.request.Request(
f"{API_BASE}{path}/",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read())
def _print_report(report, use_color: bool = True) -> None:
tier_c = TIER_COLOR.get(report.tier, "") if use_color else ""
reset = RESET if use_color else ""
bold = BOLD if use_color else ""
print(f"\n{bold}Repo: {report.repo_slug}{reset}")
print(f" Tier: {tier_c}{bold}{report.tier.upper()}{reset} "
f"(core={'' if report.core_pass else ''} "
f"standard={'' if report.standard_pass else ''} "
f"full={'' if report.full_pass else ''})")
current_tier = None
for c in report.criteria:
if c.tier != current_tier:
current_tier = c.tier
print(f" ── {c.tier.upper()} ──")
sc = STATUS_COLOR.get(c.status, "") if use_color else ""
ico = STATUS_ICON.get(c.status, "?")
detail = f" {c.detail}" if c.detail else ""
print(f" {sc}{ico}{reset} {c.id}: {c.label}{detail}")
async def check_repo(slug: str, as_json: bool) -> bool:
try:
repo = _get(f"/repos/{slug}/")
except Exception as e:
print(f"✗ Could not fetch repo '{slug}': {e}", file=sys.stderr)
return False
report = await evaluate(repo, API_BASE)
if as_json:
print(json.dumps({
"repo_slug": report.repo_slug,
"tier": report.tier,
"core_pass": report.core_pass,
"standard_pass": report.standard_pass,
"full_pass": report.full_pass,
"checked_at": report.checked_at,
"criteria": [
{"id": c.id, "label": c.label, "tier": c.tier,
"status": c.status, "detail": c.detail}
for c in report.criteria
],
}, indent=2))
else:
_print_report(report)
return report.tier != "none"
async def main_async() -> None:
parser = argparse.ArgumentParser(description="Check Repository DoI criteria")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--repo", metavar="SLUG")
group.add_argument("--all", action="store_true")
parser.add_argument("--json", action="store_true", dest="as_json")
args = parser.parse_args()
if args.all:
repos = _get("/repos/")
slugs = [r["slug"] for r in repos if r.get("status") == "active"]
else:
slugs = [args.repo]
results = await asyncio.gather(*[check_repo(s, args.as_json) for s in slugs])
if not args.as_json:
counts = {"full": 0, "standard": 0, "core": 0, "none": 0}
for slug, ok in zip(slugs, results):
pass # already printed per-repo
if len(slugs) > 1:
# Re-fetch for summary (already printed above)
pass
sys.exit(0 if all(results) else 1)
def main() -> None:
asyncio.run(main_async())
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,154 @@
---
id: CUST-WP-0024
type: feature
title: Repository DoI automated gate and dashboard integration
domain: custodian
status: done
owner: custodian-agent
topic_slug: custodian
created: 2026-03-20
updated: 2026-03-20
state_hub_workstream_id: "f587f244-0dd5-4268-b955-0c4e4cf9aa69"
---
# Repository DoI Automated Gate & Dashboard
Automates the 14-criterion DoI checklist from `policies/repo-doi.md` and
surfaces per-repo DoI status on the Repositories dashboard page.
## Task: DoI check script — scripts/check_doi.py
```task
id: CUST-WP-0024-T01
status: done
priority: high
state_hub_task_id: "58e77114-bc54-4f86-9ddc-3834bc702b5c"
```
Create `state-hub/scripts/check_doi.py` implementing all 14 DoI criteria:
**Tier 1 — Core**
- C1: repo registered (`GET /repos/{slug}` returns 200)
- C2: domain assigned (`domain_slug` non-null and domain status active)
- C3: local path resolves (exists on disk after expanduser)
- C4: remote_url set (non-null, non-empty)
**Tier 2 — Standard**
- C5: SCOPE.md present at repo root
- C6: CLAUDE.md present at repo root
- C7: workplan convention followed — consistency check returns 0 FAIL
- C8: SBOM ingested — `last_sbom_at` non-null on repo record
- C9: TPSC declared — `tpsc.yaml` exists at repo root AND at least one
TPSC snapshot exists for the repo
**Tier 3 — Full**
- C10: active repo goal exists (`GET /repo-goals/?repo_slug=...` has ≥1 active)
- C11: Provided Capabilities declared — `SCOPE.md` contains a `capability`
fenced block OR `## Provided Capabilities` section says "none" explicitly
- C12: agents template applied — CLAUDE.md contains `get_kaizen_agent` or
`kaizen` reference
- C13: consistency check clean — 0 FAIL and 0 WARN (C-12 exempt)
- C14: host paths registered — `host_paths` non-empty
Output per criterion: PASS / FAIL / WARN / SKIP (with reason).
Overall tier: core_pass, standard_pass, full_pass (boolean each).
CLI flags: `--repo SLUG`, `--all`, `--json`, `--fix` (no-op for now,
reserved for future auto-remediation).
Makefile targets:
```
make check-doi REPO=<slug>
make check-doi-all
make check-doi REPO=<slug> JSON=1
```
---
## Task: API endpoint GET /repos/{slug}/doi
```task
id: CUST-WP-0024-T02
status: done
priority: high
state_hub_task_id: "97801608-2cad-40e6-ba0c-d209bcc3ec04"
```
Add to `api/routers/repos.py`:
`GET /repos/{slug}/doi` — runs the DoI check logic server-side and returns:
```json
{
"repo_slug": "llm-connect",
"tier": "standard", // "none" | "core" | "standard" | "full"
"core_pass": true,
"standard_pass": true,
"full_pass": false,
"criteria": [
{"id": "C1", "label": "Registered", "tier": "core", "status": "pass"},
...
],
"checked_at": "2026-03-20T..."
}
```
Also add `GET /repos/doi/summary` — returns all repos with their tier,
sorted by tier ascending (worst first).
Add Pydantic schemas `DoICriterion`, `DoIReport`, `DoISummaryEntry` to
`api/schemas/doi.py`.
---
## Task: MCP tool — check_repo_doi
```task
id: CUST-WP-0024-T03
status: done
priority: medium
state_hub_task_id: "3e18a74a-576a-4491-b7cf-3d6ba7746a0f"
```
Add to `mcp_server/server.py`:
`check_repo_doi(repo_slug)` — returns the DoI report for a single repo.
`get_doi_summary()` — returns all repos sorted by tier (worst first),
useful for session orientation.
---
## Task: Repositories dashboard page — DoI integration
```task
id: CUST-WP-0024-T04
status: done
priority: high
state_hub_task_id: "ab1a843c-60e3-4eba-ae6a-dec7d74c03f4"
```
Review and update `dashboard/src/repos.md` (or whichever page shows the
repo list) to include:
- DoI tier badge per repo (None / Core / Standard / Full) with colour coding
(red / amber / yellow / green)
- Expandable per-repo criterion detail (collapsible row or modal)
- Summary KPI: count of repos at each tier
- Link to the Repository DoI policy page
If `repos.md` does not exist, create it. If it already shows repo data,
enrich it in-place.
---
## Task: Consistency check — run fix-consistency
```task
id: CUST-WP-0024-T05
status: done
priority: low
state_hub_task_id: "5372bde5-b0e5-4993-94a0-ffa9b7bb26e9"
```
After all code is written and tested:
- Run `make fix-consistency REPO=the-custodian`
- Commit all new/modified files