Implement REUSE-WP-0013 registry establish, update, and stats
Some checks failed
ci / validate-registry (push) Has been cancelled

Add stats, establish (scaffold, publish-check, discover), and update CLI
commands with optional llm-connect bridge, validate --root for sibling repos,
pytest coverage, and documentation for sibling registry onboarding.
This commit is contained in:
2026-06-16 01:21:01 +02:00
parent fb712b4b98
commit 70a5003f6e
19 changed files with 1740 additions and 30 deletions

259
reuse_surface/stats.py Normal file
View File

@@ -0,0 +1,259 @@
from __future__ import annotations
import json
import urllib.error
import urllib.request
from collections import Counter
from pathlib import Path
from typing import Any
import yaml
from reuse_surface import hub_client
from reuse_surface.registry import (
LEVEL_ORDERS,
entry_vector,
load_index_at,
parse_front_matter,
parse_vector,
registry_paths,
vectors_match,
)
def _histogram(values: list[str], order: list[str]) -> dict[str, int]:
counts = Counter(values)
return {level: counts.get(level, 0) for level in order if counts.get(level, 0)}
def _probe_url(url: str) -> dict[str, Any]:
request = urllib.request.Request(
url,
method="HEAD",
headers={"User-Agent": "reuse-surface/0.1"},
)
try:
with urllib.request.urlopen(request, timeout=30) as response:
return {
"url": url,
"status": response.status,
"content_type": response.headers.get("Content-Type", ""),
"ok": response.status == 200,
}
except urllib.error.HTTPError as exc:
return {
"url": url,
"status": exc.code,
"content_type": exc.headers.get("Content-Type", ""),
"ok": False,
}
except urllib.error.URLError as exc:
return {"url": url, "status": None, "error": str(exc.reason), "ok": False}
def collect_stats(
repo_root: Path,
*,
federation_ready: bool = False,
raw_url: str | None = None,
hub_url: str | None = None,
) -> dict[str, Any]:
paths = registry_paths(repo_root)
stats: dict[str, Any] = {
"repo_root": str(repo_root),
"registry_present": paths["registry"].exists(),
"index_present": paths["index"].exists(),
"sources_present": paths["sources"].exists(),
"capability_count": 0,
"histograms": {},
"reliability": {"r0_r2": 0, "r3_plus": 0},
"consumption_modes": {},
"vector_drift": [],
"federation": {},
"hub": {},
}
if not paths["index"].exists():
if federation_ready and raw_url:
stats["federation"]["raw_url_probe"] = _probe_url(raw_url)
if hub_url or _hub_configured():
stats["hub"] = _hub_summary(hub_url)
return stats
index = load_index_at(paths["index"])
capabilities = index.get("capabilities", [])
stats["capability_count"] = len(capabilities)
stats["domain"] = index.get("domain")
discovery: list[str] = []
availability: list[str] = []
completeness: list[str] = []
reliability: list[str] = []
mode_counts: Counter[str] = Counter()
for row in capabilities:
vector = parse_vector(row["vector"])
discovery.append(vector["discovery"])
availability.append(vector["availability"])
completeness.append(vector["completeness"])
reliability.append(vector["reliability"])
for mode in row.get("consumption_modes", []):
mode_counts[mode] += 1
entry_path = repo_root / row["path"]
if entry_path.exists():
try:
front_matter = parse_front_matter(entry_path)
if not vectors_match(row["vector"], front_matter):
stats["vector_drift"].append(
{
"id": row["id"],
"index_vector": row["vector"],
"entry_vector": entry_vector(front_matter),
}
)
except ValueError:
stats["vector_drift"].append(
{"id": row["id"], "error": "invalid entry front matter"}
)
stats["histograms"] = {
"discovery": _histogram(discovery, LEVEL_ORDERS["discovery"]),
"availability": _histogram(availability, LEVEL_ORDERS["availability"]),
"completeness": _histogram(completeness, LEVEL_ORDERS["completeness"]),
"reliability": _histogram(reliability, LEVEL_ORDERS["reliability"]),
}
stats["reliability"] = {
"r0_r2": sum(1 for level in reliability if level in {"R0", "R1", "R2"}),
"r3_plus": sum(1 for level in reliability if level_at_least_reliability(level, "R3")),
}
stats["consumption_modes"] = dict(sorted(mode_counts.items()))
if federation_ready:
probe_url = raw_url
if not probe_url and paths["index"].exists():
probe_url = _default_raw_url(repo_root)
if probe_url:
stats["federation"]["raw_url_probe"] = _probe_url(probe_url)
stats["federation"]["index_valid_yaml"] = _index_yaml_valid(paths["index"])
stats["hub"] = _hub_summary(hub_url)
return stats
def level_at_least_reliability(current: str, minimum: str) -> bool:
order = LEVEL_ORDERS["reliability"]
return order.index(current) >= order.index(minimum)
def _hub_configured() -> bool:
import os
return bool(os.environ.get("REUSE_SURFACE_URL"))
def _hub_summary(hub_url: str | None) -> dict[str, Any]:
try:
status, payload = hub_client.hub_list(hub_url)
except (ValueError, urllib.error.URLError, OSError):
return {"configured": False}
if status != 200:
return {"configured": True, "status": status, "error": payload}
repos = payload.get("repos", [])
return {
"configured": True,
"registration_count": payload.get("count", len(repos)),
"enabled_count": sum(1 for repo in repos if repo.get("enabled", True)),
}
def _default_raw_url(repo_root: Path) -> str | None:
return None
def _index_yaml_valid(index_path: Path) -> bool:
try:
data = load_index_at(index_path)
return isinstance(data, dict) and "capabilities" in data
except (OSError, yaml.YAMLError):
return False
def format_stats_markdown(stats: dict[str, Any]) -> str:
lines = ["# Registry stats", ""]
lines.append(f"**Repo:** `{stats['repo_root']}`")
lines.append(f"**Capabilities:** {stats['capability_count']}")
if stats.get("domain"):
lines.append(f"**Domain:** `{stats['domain']}`")
lines.append("")
lines.append("## Layout")
lines.append(f"- registry present: `{stats['registry_present']}`")
lines.append(f"- index present: `{stats['index_present']}`")
lines.append(f"- federation sources present: `{stats['sources_present']}`")
lines.append("")
rel = stats["reliability"]
lines.append("## Reliability bands (index vectors)")
lines.append(f"- R0R2: **{rel['r0_r2']}**")
lines.append(f"- R3+: **{rel['r3_plus']}**")
lines.append("")
for dimension, histogram in stats.get("histograms", {}).items():
if not histogram:
continue
lines.append(f"## {dimension.title()} histogram")
for level, count in histogram.items():
lines.append(f"- `{level}`: {count}")
lines.append("")
if stats.get("consumption_modes"):
lines.append("## Consumption modes")
for mode, count in stats["consumption_modes"].items():
lines.append(f"- `{mode}`: {count}")
lines.append("")
drift = stats.get("vector_drift", [])
lines.append(f"## Vector drift: **{len(drift)}**")
for item in drift[:10]:
if "error" in item:
lines.append(f"- `{item['id']}`: {item['error']}")
else:
lines.append(
f"- `{item['id']}`: index `{item['index_vector']}` "
f"≠ entry `{item['entry_vector']}`"
)
if len(drift) > 10:
lines.append(f"- … and {len(drift) - 10} more")
lines.append("")
federation = stats.get("federation", {})
if federation:
lines.append("## Federation readiness")
if "index_valid_yaml" in federation:
lines.append(f"- index valid YAML: `{federation['index_valid_yaml']}`")
probe = federation.get("raw_url_probe")
if probe:
status = probe.get("status")
ok = probe.get("ok")
lines.append(f"- raw URL probe: status **{status}** ({'ok' if ok else 'fail'})")
lines.append(f" `{probe.get('url', '')}`")
lines.append("")
hub = stats.get("hub", {})
if hub.get("configured"):
lines.append("## Hub")
if "registration_count" in hub:
lines.append(
f"- registrations: **{hub['registration_count']}** "
f"({hub.get('enabled_count', 0)} enabled)"
)
elif "error" in hub:
lines.append(f"- hub error: {hub['error']}")
lines.append("")
return "\n".join(lines) + "\n"
def format_stats_json(stats: dict[str, Any]) -> str:
return json.dumps(stats, indent=2, sort_keys=True)