Files
reuse-surface/reuse_surface/stats.py
tegwick 7c048a9f09
Some checks failed
ci / validate-registry (push) Has been cancelled
REUSE-WP-0014: T11 docs, roster stats, workplan finished
Link local-repo-roster in RegistryFederation; rollout milestone history;
update IntentScopeGapAnalysis (60 hub members). Add stats --roster
--federation-ready for workstation federation readiness.
2026-06-16 02:09:57 +02:00

350 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import urllib.error
import urllib.request
from collections import Counter
from pathlib import Path
from typing import Any
import yaml
from reuse_surface import hub_client
from reuse_surface.registry import (
LEVEL_ORDERS,
entry_vector,
load_index_at,
parse_front_matter,
parse_vector,
registry_paths,
vectors_match,
)
def _histogram(values: list[str], order: list[str]) -> dict[str, int]:
counts = Counter(values)
return {level: counts.get(level, 0) for level in order if counts.get(level, 0)}
def _probe_url(url: str) -> dict[str, Any]:
request = urllib.request.Request(
url,
method="HEAD",
headers={"User-Agent": "reuse-surface/0.1"},
)
try:
with urllib.request.urlopen(request, timeout=30) as response:
return {
"url": url,
"status": response.status,
"content_type": response.headers.get("Content-Type", ""),
"ok": response.status == 200,
}
except urllib.error.HTTPError as exc:
return {
"url": url,
"status": exc.code,
"content_type": exc.headers.get("Content-Type", ""),
"ok": False,
}
except urllib.error.URLError as exc:
return {"url": url, "status": None, "error": str(exc.reason), "ok": False}
def collect_stats(
repo_root: Path,
*,
federation_ready: bool = False,
raw_url: str | None = None,
hub_url: str | None = None,
) -> dict[str, Any]:
paths = registry_paths(repo_root)
stats: dict[str, Any] = {
"repo_root": str(repo_root),
"registry_present": paths["registry"].exists(),
"index_present": paths["index"].exists(),
"sources_present": paths["sources"].exists(),
"capability_count": 0,
"histograms": {},
"reliability": {"r0_r2": 0, "r3_plus": 0},
"consumption_modes": {},
"vector_drift": [],
"federation": {},
"hub": {},
}
if not paths["index"].exists():
if federation_ready and raw_url:
stats["federation"]["raw_url_probe"] = _probe_url(raw_url)
if hub_url or _hub_configured():
stats["hub"] = _hub_summary(hub_url)
return stats
index = load_index_at(paths["index"])
capabilities = index.get("capabilities", [])
stats["capability_count"] = len(capabilities)
stats["domain"] = index.get("domain")
discovery: list[str] = []
availability: list[str] = []
completeness: list[str] = []
reliability: list[str] = []
mode_counts: Counter[str] = Counter()
for row in capabilities:
vector = parse_vector(row["vector"])
discovery.append(vector["discovery"])
availability.append(vector["availability"])
completeness.append(vector["completeness"])
reliability.append(vector["reliability"])
for mode in row.get("consumption_modes", []):
mode_counts[mode] += 1
entry_path = repo_root / row["path"]
if entry_path.exists():
try:
front_matter = parse_front_matter(entry_path)
if not vectors_match(row["vector"], front_matter):
stats["vector_drift"].append(
{
"id": row["id"],
"index_vector": row["vector"],
"entry_vector": entry_vector(front_matter),
}
)
except ValueError:
stats["vector_drift"].append(
{"id": row["id"], "error": "invalid entry front matter"}
)
stats["histograms"] = {
"discovery": _histogram(discovery, LEVEL_ORDERS["discovery"]),
"availability": _histogram(availability, LEVEL_ORDERS["availability"]),
"completeness": _histogram(completeness, LEVEL_ORDERS["completeness"]),
"reliability": _histogram(reliability, LEVEL_ORDERS["reliability"]),
}
stats["reliability"] = {
"r0_r2": sum(1 for level in reliability if level in {"R0", "R1", "R2"}),
"r3_plus": sum(1 for level in reliability if level_at_least_reliability(level, "R3")),
}
stats["consumption_modes"] = dict(sorted(mode_counts.items()))
if federation_ready:
probe_url = raw_url
if not probe_url and paths["index"].exists():
probe_url = _default_raw_url(repo_root)
if probe_url:
stats["federation"]["raw_url_probe"] = _probe_url(probe_url)
stats["federation"]["index_valid_yaml"] = _index_yaml_valid(paths["index"])
stats["hub"] = _hub_summary(hub_url)
return stats
def level_at_least_reliability(current: str, minimum: str) -> bool:
order = LEVEL_ORDERS["reliability"]
return order.index(current) >= order.index(minimum)
def _hub_configured() -> bool:
import os
return bool(os.environ.get("REUSE_SURFACE_URL"))
def _hub_summary(hub_url: str | None) -> dict[str, Any]:
try:
status, payload = hub_client.hub_list(hub_url)
except (ValueError, urllib.error.URLError, OSError):
return {"configured": False}
if status != 200:
return {"configured": True, "status": status, "error": payload}
repos = payload.get("repos", [])
return {
"configured": True,
"registration_count": payload.get("count", len(repos)),
"enabled_count": sum(1 for repo in repos if repo.get("enabled", True)),
}
def _default_raw_url(repo_root: Path) -> str | None:
return None
def _index_yaml_valid(index_path: Path) -> bool:
try:
data = load_index_at(index_path)
return isinstance(data, dict) and "capabilities" in data
except (OSError, yaml.YAMLError):
return False
def format_stats_markdown(stats: dict[str, Any]) -> str:
lines = ["# Registry stats", ""]
lines.append(f"**Repo:** `{stats['repo_root']}`")
lines.append(f"**Capabilities:** {stats['capability_count']}")
if stats.get("domain"):
lines.append(f"**Domain:** `{stats['domain']}`")
lines.append("")
lines.append("## Layout")
lines.append(f"- registry present: `{stats['registry_present']}`")
lines.append(f"- index present: `{stats['index_present']}`")
lines.append(f"- federation sources present: `{stats['sources_present']}`")
lines.append("")
rel = stats["reliability"]
lines.append("## Reliability bands (index vectors)")
lines.append(f"- R0R2: **{rel['r0_r2']}**")
lines.append(f"- R3+: **{rel['r3_plus']}**")
lines.append("")
for dimension, histogram in stats.get("histograms", {}).items():
if not histogram:
continue
lines.append(f"## {dimension.title()} histogram")
for level, count in histogram.items():
lines.append(f"- `{level}`: {count}")
lines.append("")
if stats.get("consumption_modes"):
lines.append("## Consumption modes")
for mode, count in stats["consumption_modes"].items():
lines.append(f"- `{mode}`: {count}")
lines.append("")
drift = stats.get("vector_drift", [])
lines.append(f"## Vector drift: **{len(drift)}**")
for item in drift[:10]:
if "error" in item:
lines.append(f"- `{item['id']}`: {item['error']}")
else:
lines.append(
f"- `{item['id']}`: index `{item['index_vector']}` "
f"≠ entry `{item['entry_vector']}`"
)
if len(drift) > 10:
lines.append(f"- … and {len(drift) - 10} more")
lines.append("")
federation = stats.get("federation", {})
if federation:
lines.append("## Federation readiness")
if "index_valid_yaml" in federation:
lines.append(f"- index valid YAML: `{federation['index_valid_yaml']}`")
probe = federation.get("raw_url_probe")
if probe:
status = probe.get("status")
ok = probe.get("ok")
lines.append(f"- raw URL probe: status **{status}** ({'ok' if ok else 'fail'})")
lines.append(f" `{probe.get('url', '')}`")
lines.append("")
hub = stats.get("hub", {})
if hub.get("configured"):
lines.append("## Hub")
if "registration_count" in hub:
lines.append(
f"- registrations: **{hub['registration_count']}** "
f"({hub.get('enabled_count', 0)} enabled)"
)
elif "error" in hub:
lines.append(f"- hub error: {hub['error']}")
lines.append("")
return "\n".join(lines) + "\n"
def format_stats_json(stats: dict[str, Any]) -> str:
return json.dumps(stats, indent=2, sort_keys=True)
def collect_roster_stats(
roster_path: Path,
*,
federation_ready: bool = False,
) -> dict[str, Any]:
data = yaml.safe_load(roster_path.read_text(encoding="utf-8"))
repos = data.get("repos", [])
summary = data.get("summary", {})
publish_fail = [r["slug"] for r in repos if r.get("publish_check") == "fail"]
hub_missing = [r["slug"] for r in repos if not r.get("hub_registered")]
pending = [r["slug"] for r in repos if r.get("status") != "established"]
stats: dict[str, Any] = {
"roster_path": str(roster_path),
"workstation_root": data.get("workstation_root"),
"definition": data.get("definition"),
"summary": summary,
"counts": {
"total": summary.get("total", len(repos)),
"established": summary.get("established", 0),
"pending": summary.get("pending", 0),
"hub_registered": summary.get("hub_registered", 0),
"publish_pass": summary.get("publish_pass", 0),
"publish_fail": summary.get("publish_fail", len(publish_fail)),
"with_reuse_surface_seed": summary.get("with_reuse_surface_seed", 0),
},
"publish_fail_slugs": publish_fail,
"hub_unregistered_slugs": hub_missing,
"pending_slugs": pending,
"federation_ready": federation_ready,
}
if federation_ready:
total = stats["counts"]["total"] or len(repos)
publish_pass = stats["counts"]["publish_pass"] or 0
stats["federation_readiness"] = {
"all_established": stats["counts"]["pending"] == 0,
"all_hub_registered": len(hub_missing) == 0,
"all_publish_pass": len(publish_fail) == 0,
"publish_pass_ratio": f"{publish_pass}/{total}",
"publish_sweep": summary.get("publish_sweep"),
}
return stats
def format_roster_stats_markdown(stats: dict[str, Any]) -> str:
lines = ["# Workstation roster federation stats", ""]
lines.append(f"**Roster:** `{stats['roster_path']}`")
if stats.get("workstation_root"):
lines.append(f"**Workstation root:** `{stats['workstation_root']}`")
lines.append("")
counts = stats["counts"]
lines.append("## Summary")
lines.append(f"- total repos: **{counts['total']}**")
lines.append(f"- established: **{counts['established']}**")
lines.append(f"- pending: **{counts['pending']}**")
lines.append(f"- hub registered: **{counts['hub_registered']}**")
lines.append(f"- publish pass: **{counts['publish_pass']}**")
lines.append(f"- publish fail: **{counts['publish_fail']}**")
lines.append("")
if stats.get("federation_readiness"):
fr = stats["federation_readiness"]
lines.append("## Federation readiness")
lines.append(f"- all established: `{fr['all_established']}`")
lines.append(f"- all hub registered: `{fr['all_hub_registered']}`")
lines.append(f"- all publish pass: `{fr['all_publish_pass']}`")
lines.append(f"- publish pass ratio: **{fr['publish_pass_ratio']}**")
if fr.get("publish_sweep"):
lines.append(f"- last sweep: `{fr['publish_sweep']}`")
lines.append("")
if stats.get("publish_fail_slugs"):
lines.append("## Publish fail")
for slug in stats["publish_fail_slugs"]:
lines.append(f"- `{slug}`")
lines.append("")
if stats.get("hub_unregistered_slugs"):
lines.append("## Hub not registered")
for slug in stats["hub_unregistered_slugs"]:
lines.append(f"- `{slug}`")
lines.append("")
return "\n".join(lines) + "\n"
def format_roster_stats_json(stats: dict[str, Any]) -> str:
return json.dumps(stats, indent=2, sort_keys=True)