generated from coulomb/repo-seed
Replace the ad-hoc coordination-domain spine with the Repo Classification Standard: 14 market domains, classification columns on managed_repos, and workplans anchored by repo_id (topic_id optional). - Add Alembic migration d8e9f0a1b2c3 with data backfill and workstream→workplan rename - Add api/classification.py validation and register-from-classification tooling - Expose workplan-first REST/MCP surface with legacy workstream aliases - Add C-24 consistency rule and legacy domain frontmatter mapping - Update dashboard repos page with category/capability/stake filters - Update orientation docs; mark STATE-WP-0065 finished
290 lines
9.7 KiB
Python
290 lines
9.7 KiB
Python
"""Repo classification validation for State Hub registration (STATE-WP-0065 P1).
|
|
|
|
Loads allowed values from the custodian canon standard and validates classification
|
|
blocks against controlled vocabularies.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
# Primary path (sibling checkout); fallback relative to state-hub repo root.
|
|
_PRIMARY_ALLOWED = Path(
|
|
"/home/worsch/the-custodian/canon/standards/repo-classification.allowed.yaml"
|
|
)
|
|
_FALLBACK_ALLOWED = (
|
|
Path(__file__).resolve().parent.parent.parent
|
|
/ "the-custodian"
|
|
/ "canon"
|
|
/ "standards"
|
|
/ "repo-classification.allowed.yaml"
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class ClassificationData:
|
|
"""Normalized classification fields stored on ``managed_repos``."""
|
|
|
|
category: str
|
|
domain: str
|
|
secondary_domains: list[str] = field(default_factory=list)
|
|
capability_tags: list[str] = field(default_factory=list)
|
|
business_stake: list[str] = field(default_factory=list)
|
|
business_mechanics: list[str] = field(default_factory=list)
|
|
classified_at: str | None = None
|
|
classified_by: str | None = None
|
|
standard_version: str | None = None
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"category": self.category,
|
|
"domain": self.domain,
|
|
"secondary_domains": list(self.secondary_domains),
|
|
"capability_tags": list(self.capability_tags),
|
|
"business_stake": list(self.business_stake),
|
|
"business_mechanics": list(self.business_mechanics),
|
|
"classified_at": self.classified_at,
|
|
"classified_by": self.classified_by,
|
|
"standard_version": self.standard_version,
|
|
}
|
|
|
|
@classmethod
|
|
def from_block(cls, block: dict) -> ClassificationData:
|
|
return cls(
|
|
category=block["category"],
|
|
domain=block["domain"],
|
|
secondary_domains=list(block.get("secondary_domains") or []),
|
|
capability_tags=list(block.get("capability_tags") or []),
|
|
business_stake=list(block.get("business_stake") or []),
|
|
business_mechanics=list(block.get("business_mechanics") or []),
|
|
classified_at=block.get("classified_at"),
|
|
classified_by=block.get("classified_by"),
|
|
standard_version=block.get("version") or block.get("standard_version"),
|
|
)
|
|
|
|
|
|
def _allowed_path() -> Path:
|
|
if _PRIMARY_ALLOWED.is_file():
|
|
return _PRIMARY_ALLOWED
|
|
if _FALLBACK_ALLOWED.is_file():
|
|
return _FALLBACK_ALLOWED
|
|
raise FileNotFoundError(
|
|
"repo-classification.allowed.yaml not found at "
|
|
f"{_PRIMARY_ALLOWED} or {_FALLBACK_ALLOWED}"
|
|
)
|
|
|
|
|
|
def load_allowed_values(path: Path | None = None) -> dict:
|
|
"""Load the machine-readable allowed-values YAML."""
|
|
target = path or _allowed_path()
|
|
with target.open(encoding="utf-8") as fh:
|
|
return yaml.safe_load(fh)
|
|
|
|
|
|
def _known_capability_tags(allowed: dict) -> set[str]:
|
|
tags: set[str] = set()
|
|
for fam in (allowed.get("capability_families") or {}).values():
|
|
tags.update(fam or [])
|
|
return tags
|
|
|
|
|
|
def validate_classification(block: dict) -> tuple[list[str], list[str]]:
|
|
"""Validate a ``repo_classification`` block.
|
|
|
|
Returns ``(errors, warnings)``. *block* should be the inner mapping (not the
|
|
full YAML document with the ``repo_classification`` wrapper).
|
|
"""
|
|
allowed = load_allowed_values()
|
|
errors: list[str] = []
|
|
warnings: list[str] = []
|
|
|
|
if not isinstance(block, dict):
|
|
return (["classification block must be a mapping"], [])
|
|
|
|
categories = set(allowed["categories"])
|
|
domains = set(allowed["domains"])
|
|
stakes = set(allowed["business_stake"])
|
|
mechanics = set(allowed["business_mechanics"])
|
|
guidance = allowed.get("guidance", {})
|
|
pattern = re.compile(
|
|
guidance.get("capability_tag_pattern", r"^[a-z0-9]+(-[a-z0-9]+)*$")
|
|
)
|
|
|
|
category = block.get("category")
|
|
if category is None:
|
|
errors.append("`category` is required")
|
|
elif category not in categories:
|
|
errors.append(f"`category` '{category}' not in {sorted(categories)}")
|
|
|
|
domain = block.get("domain")
|
|
if domain is None:
|
|
errors.append("`domain` is required")
|
|
elif domain not in domains:
|
|
errors.append(f"`domain` '{domain}' not in allowed domains")
|
|
|
|
secondary = block.get("secondary_domains") or []
|
|
if not isinstance(secondary, list):
|
|
errors.append("`secondary_domains` must be a list")
|
|
secondary = []
|
|
for d in secondary:
|
|
if d not in domains:
|
|
errors.append(f"secondary domain '{d}' not in allowed domains")
|
|
if d == domain:
|
|
errors.append(f"secondary domain '{d}' repeats the primary domain")
|
|
if len(secondary) != len(set(secondary)):
|
|
errors.append("`secondary_domains` contains duplicates")
|
|
smax = guidance.get("secondary_domains_max", 3)
|
|
if len(secondary) > smax:
|
|
warnings.append(
|
|
f"{len(secondary)} secondary_domains exceeds recommended max {smax}"
|
|
)
|
|
|
|
tags = block.get("capability_tags") or []
|
|
if not isinstance(tags, list):
|
|
errors.append("`capability_tags` must be a list")
|
|
tags = []
|
|
known = _known_capability_tags(allowed)
|
|
for t in tags:
|
|
if not isinstance(t, str) or not pattern.match(t):
|
|
errors.append(f"capability_tag '{t}' is not lowercase kebab-case")
|
|
elif t not in known:
|
|
warnings.append(
|
|
f"capability_tag '{t}' is not a recommended family tag "
|
|
"(allowed, check for synonym)"
|
|
)
|
|
|
|
stake = block.get("business_stake") or []
|
|
if not isinstance(stake, list):
|
|
errors.append("`business_stake` must be a list")
|
|
stake = []
|
|
for s in stake:
|
|
if s not in stakes:
|
|
errors.append(f"business_stake '{s}' not in {sorted(stakes)}")
|
|
if stake:
|
|
lo = guidance.get("business_stake_recommended_min", 2)
|
|
hi = guidance.get("business_stake_recommended_max", 6)
|
|
if not (lo <= len(stake) <= hi):
|
|
warnings.append(
|
|
f"{len(stake)} business_stake values; {lo}-{hi} recommended"
|
|
)
|
|
|
|
mech = block.get("business_mechanics") or []
|
|
if not isinstance(mech, list):
|
|
errors.append("`business_mechanics` must be a list")
|
|
mech = []
|
|
for m in mech:
|
|
if m not in mechanics:
|
|
errors.append(f"business_mechanics '{m}' not in {sorted(mechanics)}")
|
|
|
|
return errors, warnings
|
|
|
|
|
|
CLASSIFICATION_FILENAME = ".repo-classification.yaml"
|
|
|
|
# Market-domain slugs (Repo Classification Standard v1.0 §6).
|
|
MARKET_DOMAIN_SLUGS: frozenset[str] = frozenset({
|
|
"infotech",
|
|
"financials",
|
|
"communication",
|
|
"consumer",
|
|
"health",
|
|
"industrials",
|
|
"energy",
|
|
"utilities",
|
|
"materials",
|
|
"realestate",
|
|
"crypto",
|
|
"agents",
|
|
"space",
|
|
"government",
|
|
})
|
|
|
|
# Legacy coordination-domain slugs still found in workplan frontmatter ``domain:``.
|
|
# Maps to market-domain slugs used by the Hub ``domains`` table post-migration.
|
|
LEGACY_COORDINATION_TO_MARKET: dict[str, str] = {
|
|
"custodian": "infotech",
|
|
"railiance": "financials",
|
|
"markitect": "communication",
|
|
"coulomb_social": "communication",
|
|
"personhood": "government",
|
|
"foerster_capabilities": "agents",
|
|
"capabilities": "agents",
|
|
"canon": "infotech",
|
|
"citation_evidence": "infotech",
|
|
"helix_forge": "infotech",
|
|
"inter_hub": "infotech",
|
|
"netkingdom": "communication",
|
|
"stack": "infotech",
|
|
"vergabe_teilnahme": "government",
|
|
"whynot": "consumer",
|
|
"test_domain_v2": "infotech",
|
|
}
|
|
|
|
|
|
def resolve_topic_domain_slug(
|
|
workplan_domain: str,
|
|
*,
|
|
repo_market_domain: str | None = None,
|
|
) -> str:
|
|
"""Map a workplan frontmatter ``domain`` value to a market-domain slug.
|
|
|
|
Workplans may still carry legacy coordination slugs (e.g. ``custodian``)
|
|
after the spine migration; topic lookup must use the market domain stored
|
|
on ``domains.slug``.
|
|
"""
|
|
domain = (workplan_domain or "").strip()
|
|
if not domain:
|
|
return repo_market_domain or ""
|
|
if domain in MARKET_DOMAIN_SLUGS:
|
|
return domain
|
|
mapped = LEGACY_COORDINATION_TO_MARKET.get(domain)
|
|
if mapped:
|
|
return mapped
|
|
return repo_market_domain or domain
|
|
|
|
|
|
def load_classification_document(path: Path) -> dict | None:
|
|
"""Load and return the YAML document, or ``None`` if missing/unreadable."""
|
|
if not path.is_file():
|
|
return None
|
|
try:
|
|
with path.open(encoding="utf-8") as fh:
|
|
doc = yaml.safe_load(fh)
|
|
except (OSError, yaml.YAMLError):
|
|
return None
|
|
return doc if isinstance(doc, dict) else None
|
|
|
|
|
|
def extract_classification_block(doc: dict | None) -> dict | None:
|
|
"""Return the inner ``repo_classification`` mapping from a loaded document."""
|
|
if not doc:
|
|
return None
|
|
block = doc.get("repo_classification")
|
|
return block if isinstance(block, dict) else None
|
|
|
|
|
|
def load_classification_file(
|
|
repo_path: Path | str,
|
|
*,
|
|
filename: str = CLASSIFICATION_FILENAME,
|
|
) -> tuple[ClassificationData | None, list[str], list[str]]:
|
|
"""Load ``.repo-classification.yaml`` from a repo root and validate it.
|
|
|
|
Returns ``(data, errors, warnings)``. *data* is ``None`` when the file is
|
|
missing, unreadable, or has blocking validation errors.
|
|
"""
|
|
root = Path(repo_path)
|
|
doc = load_classification_document(root / filename)
|
|
block = extract_classification_block(doc)
|
|
if block is None:
|
|
if doc is None:
|
|
return (None, [f"{filename} missing or unreadable"], [])
|
|
return (None, [f"{filename} has no repo_classification block"], [])
|
|
|
|
errors, warnings = validate_classification(block)
|
|
if errors:
|
|
return (None, errors, warnings)
|
|
return (ClassificationData.from_block(block), [], warnings) |