Implements the 14-criterion DoI checklist as a runnable gate with API,
MCP tools, CLI script, and dashboard integration.
Core components:
- api/doi_engine.py — async engine evaluating all 14 criteria (asyncio.to_thread
for non-blocking HTTP self-calls), shared by API and CLI
- api/schemas/doi.py — DoICriterion, DoIReport, DoISummaryEntry schemas
- api/routers/repos.py — GET /repos/{slug}/doi + GET /repos/doi/summary
- scripts/check_doi.py — CLI: make check-doi REPO=<slug> / check-doi-all
- mcp_server/server.py — check_repo_doi(), get_doi_summary() tools
Dashboard (repos.md):
- DoI tier badge per repo (None/Core/Standard/Full) colour-coded red→green
- Domain block shows lowest DoI tier across its repos
- DoI KPI card in summary row
- DoI filter in All Repos Table
- Link to Repository DoI policy page
Also fixes: TPSC snapshots 500 error (missing nested selectinload for
catalog_entry relationship in list_snapshots endpoint).
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
133 lines
3.9 KiB
Python
133 lines
3.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Check Repository Definition of Integrated (DoI) criteria.
|
|
|
|
Usage:
|
|
uv run python scripts/check_doi.py --repo <slug> [--json]
|
|
uv run python scripts/check_doi.py --all [--json]
|
|
"""
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
# Allow importing from the api package
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
from api.doi_engine import evaluate
|
|
|
|
API_BASE = "http://127.0.0.1:8000"
|
|
|
|
STATUS_ICON = {"pass": "✓", "fail": "✗", "warn": "⚠", "skip": "—"}
|
|
STATUS_COLOR = {
|
|
"pass": "\033[32m", # green
|
|
"fail": "\033[31m", # red
|
|
"warn": "\033[33m", # yellow
|
|
"skip": "\033[90m", # grey
|
|
}
|
|
RESET = "\033[0m"
|
|
BOLD = "\033[1m"
|
|
|
|
TIER_COLOR = {
|
|
"full": "\033[32m",
|
|
"standard": "\033[33m",
|
|
"core": "\033[33m",
|
|
"none": "\033[31m",
|
|
}
|
|
|
|
|
|
def _get(path: str) -> object:
|
|
req = urllib.request.Request(
|
|
f"{API_BASE}{path}/",
|
|
headers={"Accept": "application/json"},
|
|
)
|
|
with urllib.request.urlopen(req, timeout=10) as r:
|
|
return json.loads(r.read())
|
|
|
|
|
|
def _print_report(report, use_color: bool = True) -> None:
|
|
tier_c = TIER_COLOR.get(report.tier, "") if use_color else ""
|
|
reset = RESET if use_color else ""
|
|
bold = BOLD if use_color else ""
|
|
|
|
print(f"\n{bold}Repo: {report.repo_slug}{reset}")
|
|
print(f" Tier: {tier_c}{bold}{report.tier.upper()}{reset} "
|
|
f"(core={'✓' if report.core_pass else '✗'} "
|
|
f"standard={'✓' if report.standard_pass else '✗'} "
|
|
f"full={'✓' if report.full_pass else '✗'})")
|
|
|
|
current_tier = None
|
|
for c in report.criteria:
|
|
if c.tier != current_tier:
|
|
current_tier = c.tier
|
|
print(f" ── {c.tier.upper()} ──")
|
|
sc = STATUS_COLOR.get(c.status, "") if use_color else ""
|
|
ico = STATUS_ICON.get(c.status, "?")
|
|
detail = f" {c.detail}" if c.detail else ""
|
|
print(f" {sc}{ico}{reset} {c.id}: {c.label}{detail}")
|
|
|
|
|
|
async def check_repo(slug: str, as_json: bool) -> bool:
|
|
try:
|
|
repo = _get(f"/repos/{slug}/")
|
|
except Exception as e:
|
|
print(f"✗ Could not fetch repo '{slug}': {e}", file=sys.stderr)
|
|
return False
|
|
|
|
report = await evaluate(repo, API_BASE)
|
|
|
|
if as_json:
|
|
print(json.dumps({
|
|
"repo_slug": report.repo_slug,
|
|
"tier": report.tier,
|
|
"core_pass": report.core_pass,
|
|
"standard_pass": report.standard_pass,
|
|
"full_pass": report.full_pass,
|
|
"checked_at": report.checked_at,
|
|
"criteria": [
|
|
{"id": c.id, "label": c.label, "tier": c.tier,
|
|
"status": c.status, "detail": c.detail}
|
|
for c in report.criteria
|
|
],
|
|
}, indent=2))
|
|
else:
|
|
_print_report(report)
|
|
|
|
return report.tier != "none"
|
|
|
|
|
|
async def main_async() -> None:
|
|
parser = argparse.ArgumentParser(description="Check Repository DoI criteria")
|
|
group = parser.add_mutually_exclusive_group(required=True)
|
|
group.add_argument("--repo", metavar="SLUG")
|
|
group.add_argument("--all", action="store_true")
|
|
parser.add_argument("--json", action="store_true", dest="as_json")
|
|
args = parser.parse_args()
|
|
|
|
if args.all:
|
|
repos = _get("/repos/")
|
|
slugs = [r["slug"] for r in repos if r.get("status") == "active"]
|
|
else:
|
|
slugs = [args.repo]
|
|
|
|
results = await asyncio.gather(*[check_repo(s, args.as_json) for s in slugs])
|
|
|
|
if not args.as_json:
|
|
counts = {"full": 0, "standard": 0, "core": 0, "none": 0}
|
|
for slug, ok in zip(slugs, results):
|
|
pass # already printed per-repo
|
|
|
|
if len(slugs) > 1:
|
|
# Re-fetch for summary (already printed above)
|
|
pass
|
|
|
|
sys.exit(0 if all(results) else 1)
|
|
|
|
|
|
def main() -> None:
|
|
asyncio.run(main_async())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|