Files
reuse-surface/reuse_surface/stats.py
tegwick 70a5003f6e
Some checks failed
ci / validate-registry (push) Has been cancelled
Implement REUSE-WP-0013 registry establish, update, and stats
Add stats, establish (scaffold, publish-check, discover), and update CLI
commands with optional llm-connect bridge, validate --root for sibling repos,
pytest coverage, and documentation for sibling registry onboarding.
2026-06-16 01:21:01 +02:00

259 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import urllib.error
import urllib.request
from collections import Counter
from pathlib import Path
from typing import Any
import yaml
from reuse_surface import hub_client
from reuse_surface.registry import (
LEVEL_ORDERS,
entry_vector,
load_index_at,
parse_front_matter,
parse_vector,
registry_paths,
vectors_match,
)
def _histogram(values: list[str], order: list[str]) -> dict[str, int]:
counts = Counter(values)
return {level: counts.get(level, 0) for level in order if counts.get(level, 0)}
def _probe_url(url: str) -> dict[str, Any]:
request = urllib.request.Request(
url,
method="HEAD",
headers={"User-Agent": "reuse-surface/0.1"},
)
try:
with urllib.request.urlopen(request, timeout=30) as response:
return {
"url": url,
"status": response.status,
"content_type": response.headers.get("Content-Type", ""),
"ok": response.status == 200,
}
except urllib.error.HTTPError as exc:
return {
"url": url,
"status": exc.code,
"content_type": exc.headers.get("Content-Type", ""),
"ok": False,
}
except urllib.error.URLError as exc:
return {"url": url, "status": None, "error": str(exc.reason), "ok": False}
def collect_stats(
repo_root: Path,
*,
federation_ready: bool = False,
raw_url: str | None = None,
hub_url: str | None = None,
) -> dict[str, Any]:
paths = registry_paths(repo_root)
stats: dict[str, Any] = {
"repo_root": str(repo_root),
"registry_present": paths["registry"].exists(),
"index_present": paths["index"].exists(),
"sources_present": paths["sources"].exists(),
"capability_count": 0,
"histograms": {},
"reliability": {"r0_r2": 0, "r3_plus": 0},
"consumption_modes": {},
"vector_drift": [],
"federation": {},
"hub": {},
}
if not paths["index"].exists():
if federation_ready and raw_url:
stats["federation"]["raw_url_probe"] = _probe_url(raw_url)
if hub_url or _hub_configured():
stats["hub"] = _hub_summary(hub_url)
return stats
index = load_index_at(paths["index"])
capabilities = index.get("capabilities", [])
stats["capability_count"] = len(capabilities)
stats["domain"] = index.get("domain")
discovery: list[str] = []
availability: list[str] = []
completeness: list[str] = []
reliability: list[str] = []
mode_counts: Counter[str] = Counter()
for row in capabilities:
vector = parse_vector(row["vector"])
discovery.append(vector["discovery"])
availability.append(vector["availability"])
completeness.append(vector["completeness"])
reliability.append(vector["reliability"])
for mode in row.get("consumption_modes", []):
mode_counts[mode] += 1
entry_path = repo_root / row["path"]
if entry_path.exists():
try:
front_matter = parse_front_matter(entry_path)
if not vectors_match(row["vector"], front_matter):
stats["vector_drift"].append(
{
"id": row["id"],
"index_vector": row["vector"],
"entry_vector": entry_vector(front_matter),
}
)
except ValueError:
stats["vector_drift"].append(
{"id": row["id"], "error": "invalid entry front matter"}
)
stats["histograms"] = {
"discovery": _histogram(discovery, LEVEL_ORDERS["discovery"]),
"availability": _histogram(availability, LEVEL_ORDERS["availability"]),
"completeness": _histogram(completeness, LEVEL_ORDERS["completeness"]),
"reliability": _histogram(reliability, LEVEL_ORDERS["reliability"]),
}
stats["reliability"] = {
"r0_r2": sum(1 for level in reliability if level in {"R0", "R1", "R2"}),
"r3_plus": sum(1 for level in reliability if level_at_least_reliability(level, "R3")),
}
stats["consumption_modes"] = dict(sorted(mode_counts.items()))
if federation_ready:
probe_url = raw_url
if not probe_url and paths["index"].exists():
probe_url = _default_raw_url(repo_root)
if probe_url:
stats["federation"]["raw_url_probe"] = _probe_url(probe_url)
stats["federation"]["index_valid_yaml"] = _index_yaml_valid(paths["index"])
stats["hub"] = _hub_summary(hub_url)
return stats
def level_at_least_reliability(current: str, minimum: str) -> bool:
order = LEVEL_ORDERS["reliability"]
return order.index(current) >= order.index(minimum)
def _hub_configured() -> bool:
import os
return bool(os.environ.get("REUSE_SURFACE_URL"))
def _hub_summary(hub_url: str | None) -> dict[str, Any]:
try:
status, payload = hub_client.hub_list(hub_url)
except (ValueError, urllib.error.URLError, OSError):
return {"configured": False}
if status != 200:
return {"configured": True, "status": status, "error": payload}
repos = payload.get("repos", [])
return {
"configured": True,
"registration_count": payload.get("count", len(repos)),
"enabled_count": sum(1 for repo in repos if repo.get("enabled", True)),
}
def _default_raw_url(repo_root: Path) -> str | None:
return None
def _index_yaml_valid(index_path: Path) -> bool:
try:
data = load_index_at(index_path)
return isinstance(data, dict) and "capabilities" in data
except (OSError, yaml.YAMLError):
return False
def format_stats_markdown(stats: dict[str, Any]) -> str:
lines = ["# Registry stats", ""]
lines.append(f"**Repo:** `{stats['repo_root']}`")
lines.append(f"**Capabilities:** {stats['capability_count']}")
if stats.get("domain"):
lines.append(f"**Domain:** `{stats['domain']}`")
lines.append("")
lines.append("## Layout")
lines.append(f"- registry present: `{stats['registry_present']}`")
lines.append(f"- index present: `{stats['index_present']}`")
lines.append(f"- federation sources present: `{stats['sources_present']}`")
lines.append("")
rel = stats["reliability"]
lines.append("## Reliability bands (index vectors)")
lines.append(f"- R0R2: **{rel['r0_r2']}**")
lines.append(f"- R3+: **{rel['r3_plus']}**")
lines.append("")
for dimension, histogram in stats.get("histograms", {}).items():
if not histogram:
continue
lines.append(f"## {dimension.title()} histogram")
for level, count in histogram.items():
lines.append(f"- `{level}`: {count}")
lines.append("")
if stats.get("consumption_modes"):
lines.append("## Consumption modes")
for mode, count in stats["consumption_modes"].items():
lines.append(f"- `{mode}`: {count}")
lines.append("")
drift = stats.get("vector_drift", [])
lines.append(f"## Vector drift: **{len(drift)}**")
for item in drift[:10]:
if "error" in item:
lines.append(f"- `{item['id']}`: {item['error']}")
else:
lines.append(
f"- `{item['id']}`: index `{item['index_vector']}` "
f"≠ entry `{item['entry_vector']}`"
)
if len(drift) > 10:
lines.append(f"- … and {len(drift) - 10} more")
lines.append("")
federation = stats.get("federation", {})
if federation:
lines.append("## Federation readiness")
if "index_valid_yaml" in federation:
lines.append(f"- index valid YAML: `{federation['index_valid_yaml']}`")
probe = federation.get("raw_url_probe")
if probe:
status = probe.get("status")
ok = probe.get("ok")
lines.append(f"- raw URL probe: status **{status}** ({'ok' if ok else 'fail'})")
lines.append(f" `{probe.get('url', '')}`")
lines.append("")
hub = stats.get("hub", {})
if hub.get("configured"):
lines.append("## Hub")
if "registration_count" in hub:
lines.append(
f"- registrations: **{hub['registration_count']}** "
f"({hub.get('enabled_count', 0)} enabled)"
)
elif "error" in hub:
lines.append(f"- hub error: {hub['error']}")
lines.append("")
return "\n".join(lines) + "\n"
def format_stats_json(stats: dict[str, Any]) -> str:
return json.dumps(stats, indent=2, sort_keys=True)