generated from coulomb/repo-seed
Some checks failed
ci / validate-registry (push) Has been cancelled
Link local-repo-roster in RegistryFederation; rollout milestone history; update IntentScopeGapAnalysis (60 hub members). Add stats --roster --federation-ready for workstation federation readiness.
350 lines
12 KiB
Python
350 lines
12 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import urllib.error
|
||
import urllib.request
|
||
from collections import Counter
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import yaml
|
||
|
||
from reuse_surface import hub_client
|
||
from reuse_surface.registry import (
|
||
LEVEL_ORDERS,
|
||
entry_vector,
|
||
load_index_at,
|
||
parse_front_matter,
|
||
parse_vector,
|
||
registry_paths,
|
||
vectors_match,
|
||
)
|
||
|
||
|
||
def _histogram(values: list[str], order: list[str]) -> dict[str, int]:
|
||
counts = Counter(values)
|
||
return {level: counts.get(level, 0) for level in order if counts.get(level, 0)}
|
||
|
||
|
||
def _probe_url(url: str) -> dict[str, Any]:
|
||
request = urllib.request.Request(
|
||
url,
|
||
method="HEAD",
|
||
headers={"User-Agent": "reuse-surface/0.1"},
|
||
)
|
||
try:
|
||
with urllib.request.urlopen(request, timeout=30) as response:
|
||
return {
|
||
"url": url,
|
||
"status": response.status,
|
||
"content_type": response.headers.get("Content-Type", ""),
|
||
"ok": response.status == 200,
|
||
}
|
||
except urllib.error.HTTPError as exc:
|
||
return {
|
||
"url": url,
|
||
"status": exc.code,
|
||
"content_type": exc.headers.get("Content-Type", ""),
|
||
"ok": False,
|
||
}
|
||
except urllib.error.URLError as exc:
|
||
return {"url": url, "status": None, "error": str(exc.reason), "ok": False}
|
||
|
||
|
||
def collect_stats(
|
||
repo_root: Path,
|
||
*,
|
||
federation_ready: bool = False,
|
||
raw_url: str | None = None,
|
||
hub_url: str | None = None,
|
||
) -> dict[str, Any]:
|
||
paths = registry_paths(repo_root)
|
||
stats: dict[str, Any] = {
|
||
"repo_root": str(repo_root),
|
||
"registry_present": paths["registry"].exists(),
|
||
"index_present": paths["index"].exists(),
|
||
"sources_present": paths["sources"].exists(),
|
||
"capability_count": 0,
|
||
"histograms": {},
|
||
"reliability": {"r0_r2": 0, "r3_plus": 0},
|
||
"consumption_modes": {},
|
||
"vector_drift": [],
|
||
"federation": {},
|
||
"hub": {},
|
||
}
|
||
|
||
if not paths["index"].exists():
|
||
if federation_ready and raw_url:
|
||
stats["federation"]["raw_url_probe"] = _probe_url(raw_url)
|
||
if hub_url or _hub_configured():
|
||
stats["hub"] = _hub_summary(hub_url)
|
||
return stats
|
||
|
||
index = load_index_at(paths["index"])
|
||
capabilities = index.get("capabilities", [])
|
||
stats["capability_count"] = len(capabilities)
|
||
stats["domain"] = index.get("domain")
|
||
|
||
discovery: list[str] = []
|
||
availability: list[str] = []
|
||
completeness: list[str] = []
|
||
reliability: list[str] = []
|
||
mode_counts: Counter[str] = Counter()
|
||
|
||
for row in capabilities:
|
||
vector = parse_vector(row["vector"])
|
||
discovery.append(vector["discovery"])
|
||
availability.append(vector["availability"])
|
||
completeness.append(vector["completeness"])
|
||
reliability.append(vector["reliability"])
|
||
for mode in row.get("consumption_modes", []):
|
||
mode_counts[mode] += 1
|
||
|
||
entry_path = repo_root / row["path"]
|
||
if entry_path.exists():
|
||
try:
|
||
front_matter = parse_front_matter(entry_path)
|
||
if not vectors_match(row["vector"], front_matter):
|
||
stats["vector_drift"].append(
|
||
{
|
||
"id": row["id"],
|
||
"index_vector": row["vector"],
|
||
"entry_vector": entry_vector(front_matter),
|
||
}
|
||
)
|
||
except ValueError:
|
||
stats["vector_drift"].append(
|
||
{"id": row["id"], "error": "invalid entry front matter"}
|
||
)
|
||
|
||
stats["histograms"] = {
|
||
"discovery": _histogram(discovery, LEVEL_ORDERS["discovery"]),
|
||
"availability": _histogram(availability, LEVEL_ORDERS["availability"]),
|
||
"completeness": _histogram(completeness, LEVEL_ORDERS["completeness"]),
|
||
"reliability": _histogram(reliability, LEVEL_ORDERS["reliability"]),
|
||
}
|
||
stats["reliability"] = {
|
||
"r0_r2": sum(1 for level in reliability if level in {"R0", "R1", "R2"}),
|
||
"r3_plus": sum(1 for level in reliability if level_at_least_reliability(level, "R3")),
|
||
}
|
||
stats["consumption_modes"] = dict(sorted(mode_counts.items()))
|
||
|
||
if federation_ready:
|
||
probe_url = raw_url
|
||
if not probe_url and paths["index"].exists():
|
||
probe_url = _default_raw_url(repo_root)
|
||
if probe_url:
|
||
stats["federation"]["raw_url_probe"] = _probe_url(probe_url)
|
||
stats["federation"]["index_valid_yaml"] = _index_yaml_valid(paths["index"])
|
||
|
||
stats["hub"] = _hub_summary(hub_url)
|
||
return stats
|
||
|
||
|
||
def level_at_least_reliability(current: str, minimum: str) -> bool:
|
||
order = LEVEL_ORDERS["reliability"]
|
||
return order.index(current) >= order.index(minimum)
|
||
|
||
|
||
def _hub_configured() -> bool:
|
||
import os
|
||
|
||
return bool(os.environ.get("REUSE_SURFACE_URL"))
|
||
|
||
|
||
def _hub_summary(hub_url: str | None) -> dict[str, Any]:
|
||
try:
|
||
status, payload = hub_client.hub_list(hub_url)
|
||
except (ValueError, urllib.error.URLError, OSError):
|
||
return {"configured": False}
|
||
if status != 200:
|
||
return {"configured": True, "status": status, "error": payload}
|
||
repos = payload.get("repos", [])
|
||
return {
|
||
"configured": True,
|
||
"registration_count": payload.get("count", len(repos)),
|
||
"enabled_count": sum(1 for repo in repos if repo.get("enabled", True)),
|
||
}
|
||
|
||
|
||
def _default_raw_url(repo_root: Path) -> str | None:
|
||
return None
|
||
|
||
|
||
def _index_yaml_valid(index_path: Path) -> bool:
|
||
try:
|
||
data = load_index_at(index_path)
|
||
return isinstance(data, dict) and "capabilities" in data
|
||
except (OSError, yaml.YAMLError):
|
||
return False
|
||
|
||
|
||
def format_stats_markdown(stats: dict[str, Any]) -> str:
|
||
lines = ["# Registry stats", ""]
|
||
lines.append(f"**Repo:** `{stats['repo_root']}`")
|
||
lines.append(f"**Capabilities:** {stats['capability_count']}")
|
||
if stats.get("domain"):
|
||
lines.append(f"**Domain:** `{stats['domain']}`")
|
||
lines.append("")
|
||
|
||
lines.append("## Layout")
|
||
lines.append(f"- registry present: `{stats['registry_present']}`")
|
||
lines.append(f"- index present: `{stats['index_present']}`")
|
||
lines.append(f"- federation sources present: `{stats['sources_present']}`")
|
||
lines.append("")
|
||
|
||
rel = stats["reliability"]
|
||
lines.append("## Reliability bands (index vectors)")
|
||
lines.append(f"- R0–R2: **{rel['r0_r2']}**")
|
||
lines.append(f"- R3+: **{rel['r3_plus']}**")
|
||
lines.append("")
|
||
|
||
for dimension, histogram in stats.get("histograms", {}).items():
|
||
if not histogram:
|
||
continue
|
||
lines.append(f"## {dimension.title()} histogram")
|
||
for level, count in histogram.items():
|
||
lines.append(f"- `{level}`: {count}")
|
||
lines.append("")
|
||
|
||
if stats.get("consumption_modes"):
|
||
lines.append("## Consumption modes")
|
||
for mode, count in stats["consumption_modes"].items():
|
||
lines.append(f"- `{mode}`: {count}")
|
||
lines.append("")
|
||
|
||
drift = stats.get("vector_drift", [])
|
||
lines.append(f"## Vector drift: **{len(drift)}**")
|
||
for item in drift[:10]:
|
||
if "error" in item:
|
||
lines.append(f"- `{item['id']}`: {item['error']}")
|
||
else:
|
||
lines.append(
|
||
f"- `{item['id']}`: index `{item['index_vector']}` "
|
||
f"≠ entry `{item['entry_vector']}`"
|
||
)
|
||
if len(drift) > 10:
|
||
lines.append(f"- … and {len(drift) - 10} more")
|
||
lines.append("")
|
||
|
||
federation = stats.get("federation", {})
|
||
if federation:
|
||
lines.append("## Federation readiness")
|
||
if "index_valid_yaml" in federation:
|
||
lines.append(f"- index valid YAML: `{federation['index_valid_yaml']}`")
|
||
probe = federation.get("raw_url_probe")
|
||
if probe:
|
||
status = probe.get("status")
|
||
ok = probe.get("ok")
|
||
lines.append(f"- raw URL probe: status **{status}** ({'ok' if ok else 'fail'})")
|
||
lines.append(f" `{probe.get('url', '')}`")
|
||
lines.append("")
|
||
|
||
hub = stats.get("hub", {})
|
||
if hub.get("configured"):
|
||
lines.append("## Hub")
|
||
if "registration_count" in hub:
|
||
lines.append(
|
||
f"- registrations: **{hub['registration_count']}** "
|
||
f"({hub.get('enabled_count', 0)} enabled)"
|
||
)
|
||
elif "error" in hub:
|
||
lines.append(f"- hub error: {hub['error']}")
|
||
lines.append("")
|
||
|
||
return "\n".join(lines) + "\n"
|
||
|
||
|
||
def format_stats_json(stats: dict[str, Any]) -> str:
|
||
return json.dumps(stats, indent=2, sort_keys=True)
|
||
|
||
|
||
def collect_roster_stats(
|
||
roster_path: Path,
|
||
*,
|
||
federation_ready: bool = False,
|
||
) -> dict[str, Any]:
|
||
data = yaml.safe_load(roster_path.read_text(encoding="utf-8"))
|
||
repos = data.get("repos", [])
|
||
summary = data.get("summary", {})
|
||
publish_fail = [r["slug"] for r in repos if r.get("publish_check") == "fail"]
|
||
hub_missing = [r["slug"] for r in repos if not r.get("hub_registered")]
|
||
pending = [r["slug"] for r in repos if r.get("status") != "established"]
|
||
|
||
stats: dict[str, Any] = {
|
||
"roster_path": str(roster_path),
|
||
"workstation_root": data.get("workstation_root"),
|
||
"definition": data.get("definition"),
|
||
"summary": summary,
|
||
"counts": {
|
||
"total": summary.get("total", len(repos)),
|
||
"established": summary.get("established", 0),
|
||
"pending": summary.get("pending", 0),
|
||
"hub_registered": summary.get("hub_registered", 0),
|
||
"publish_pass": summary.get("publish_pass", 0),
|
||
"publish_fail": summary.get("publish_fail", len(publish_fail)),
|
||
"with_reuse_surface_seed": summary.get("with_reuse_surface_seed", 0),
|
||
},
|
||
"publish_fail_slugs": publish_fail,
|
||
"hub_unregistered_slugs": hub_missing,
|
||
"pending_slugs": pending,
|
||
"federation_ready": federation_ready,
|
||
}
|
||
if federation_ready:
|
||
total = stats["counts"]["total"] or len(repos)
|
||
publish_pass = stats["counts"]["publish_pass"] or 0
|
||
stats["federation_readiness"] = {
|
||
"all_established": stats["counts"]["pending"] == 0,
|
||
"all_hub_registered": len(hub_missing) == 0,
|
||
"all_publish_pass": len(publish_fail) == 0,
|
||
"publish_pass_ratio": f"{publish_pass}/{total}",
|
||
"publish_sweep": summary.get("publish_sweep"),
|
||
}
|
||
return stats
|
||
|
||
|
||
def format_roster_stats_markdown(stats: dict[str, Any]) -> str:
|
||
lines = ["# Workstation roster federation stats", ""]
|
||
lines.append(f"**Roster:** `{stats['roster_path']}`")
|
||
if stats.get("workstation_root"):
|
||
lines.append(f"**Workstation root:** `{stats['workstation_root']}`")
|
||
lines.append("")
|
||
|
||
counts = stats["counts"]
|
||
lines.append("## Summary")
|
||
lines.append(f"- total repos: **{counts['total']}**")
|
||
lines.append(f"- established: **{counts['established']}**")
|
||
lines.append(f"- pending: **{counts['pending']}**")
|
||
lines.append(f"- hub registered: **{counts['hub_registered']}**")
|
||
lines.append(f"- publish pass: **{counts['publish_pass']}**")
|
||
lines.append(f"- publish fail: **{counts['publish_fail']}**")
|
||
lines.append("")
|
||
|
||
if stats.get("federation_readiness"):
|
||
fr = stats["federation_readiness"]
|
||
lines.append("## Federation readiness")
|
||
lines.append(f"- all established: `{fr['all_established']}`")
|
||
lines.append(f"- all hub registered: `{fr['all_hub_registered']}`")
|
||
lines.append(f"- all publish pass: `{fr['all_publish_pass']}`")
|
||
lines.append(f"- publish pass ratio: **{fr['publish_pass_ratio']}**")
|
||
if fr.get("publish_sweep"):
|
||
lines.append(f"- last sweep: `{fr['publish_sweep']}`")
|
||
lines.append("")
|
||
|
||
if stats.get("publish_fail_slugs"):
|
||
lines.append("## Publish fail")
|
||
for slug in stats["publish_fail_slugs"]:
|
||
lines.append(f"- `{slug}`")
|
||
lines.append("")
|
||
|
||
if stats.get("hub_unregistered_slugs"):
|
||
lines.append("## Hub not registered")
|
||
for slug in stats["hub_unregistered_slugs"]:
|
||
lines.append(f"- `{slug}`")
|
||
lines.append("")
|
||
|
||
return "\n".join(lines) + "\n"
|
||
|
||
|
||
def format_roster_stats_json(stats: dict[str, Any]) -> str:
|
||
return json.dumps(stats, indent=2, sort_keys=True) |