"""Repo classification validation for State Hub registration (STATE-WP-0065 P1). Loads allowed values from the custodian canon standard and validates classification blocks against controlled vocabularies. """ from __future__ import annotations import re from dataclasses import dataclass, field from pathlib import Path import yaml # Primary path (sibling checkout); fallback relative to state-hub repo root. _PRIMARY_ALLOWED = Path( "/home/worsch/the-custodian/canon/standards/repo-classification.allowed.yaml" ) _FALLBACK_ALLOWED = ( Path(__file__).resolve().parent.parent.parent / "the-custodian" / "canon" / "standards" / "repo-classification.allowed.yaml" ) @dataclass class ClassificationData: """Normalized classification fields stored on ``managed_repos``.""" category: str domain: str secondary_domains: list[str] = field(default_factory=list) capability_tags: list[str] = field(default_factory=list) business_stake: list[str] = field(default_factory=list) business_mechanics: list[str] = field(default_factory=list) classified_at: str | None = None classified_by: str | None = None standard_version: str | None = None def to_dict(self) -> dict: return { "category": self.category, "domain": self.domain, "secondary_domains": list(self.secondary_domains), "capability_tags": list(self.capability_tags), "business_stake": list(self.business_stake), "business_mechanics": list(self.business_mechanics), "classified_at": self.classified_at, "classified_by": self.classified_by, "standard_version": self.standard_version, } @classmethod def from_block(cls, block: dict) -> ClassificationData: return cls( category=block["category"], domain=block["domain"], secondary_domains=list(block.get("secondary_domains") or []), capability_tags=list(block.get("capability_tags") or []), business_stake=list(block.get("business_stake") or []), business_mechanics=list(block.get("business_mechanics") or []), classified_at=block.get("classified_at"), classified_by=block.get("classified_by"), standard_version=block.get("version") or block.get("standard_version"), ) def _allowed_path() -> Path: if _PRIMARY_ALLOWED.is_file(): return _PRIMARY_ALLOWED if _FALLBACK_ALLOWED.is_file(): return _FALLBACK_ALLOWED raise FileNotFoundError( "repo-classification.allowed.yaml not found at " f"{_PRIMARY_ALLOWED} or {_FALLBACK_ALLOWED}" ) def load_allowed_values(path: Path | None = None) -> dict: """Load the machine-readable allowed-values YAML.""" target = path or _allowed_path() with target.open(encoding="utf-8") as fh: return yaml.safe_load(fh) def _known_capability_tags(allowed: dict) -> set[str]: tags: set[str] = set() for fam in (allowed.get("capability_families") or {}).values(): tags.update(fam or []) return tags def validate_classification(block: dict) -> tuple[list[str], list[str]]: """Validate a ``repo_classification`` block. Returns ``(errors, warnings)``. *block* should be the inner mapping (not the full YAML document with the ``repo_classification`` wrapper). """ allowed = load_allowed_values() errors: list[str] = [] warnings: list[str] = [] if not isinstance(block, dict): return (["classification block must be a mapping"], []) categories = set(allowed["categories"]) domains = set(allowed["domains"]) stakes = set(allowed["business_stake"]) mechanics = set(allowed["business_mechanics"]) guidance = allowed.get("guidance", {}) pattern = re.compile( guidance.get("capability_tag_pattern", r"^[a-z0-9]+(-[a-z0-9]+)*$") ) category = block.get("category") if category is None: errors.append("`category` is required") elif category not in categories: errors.append(f"`category` '{category}' not in {sorted(categories)}") domain = block.get("domain") if domain is None: errors.append("`domain` is required") elif domain not in domains: errors.append(f"`domain` '{domain}' not in allowed domains") secondary = block.get("secondary_domains") or [] if not isinstance(secondary, list): errors.append("`secondary_domains` must be a list") secondary = [] for d in secondary: if d not in domains: errors.append(f"secondary domain '{d}' not in allowed domains") if d == domain: errors.append(f"secondary domain '{d}' repeats the primary domain") if len(secondary) != len(set(secondary)): errors.append("`secondary_domains` contains duplicates") smax = guidance.get("secondary_domains_max", 3) if len(secondary) > smax: warnings.append( f"{len(secondary)} secondary_domains exceeds recommended max {smax}" ) tags = block.get("capability_tags") or [] if not isinstance(tags, list): errors.append("`capability_tags` must be a list") tags = [] known = _known_capability_tags(allowed) for t in tags: if not isinstance(t, str) or not pattern.match(t): errors.append(f"capability_tag '{t}' is not lowercase kebab-case") elif t not in known: warnings.append( f"capability_tag '{t}' is not a recommended family tag " "(allowed, check for synonym)" ) stake = block.get("business_stake") or [] if not isinstance(stake, list): errors.append("`business_stake` must be a list") stake = [] for s in stake: if s not in stakes: errors.append(f"business_stake '{s}' not in {sorted(stakes)}") if stake: lo = guidance.get("business_stake_recommended_min", 2) hi = guidance.get("business_stake_recommended_max", 6) if not (lo <= len(stake) <= hi): warnings.append( f"{len(stake)} business_stake values; {lo}-{hi} recommended" ) mech = block.get("business_mechanics") or [] if not isinstance(mech, list): errors.append("`business_mechanics` must be a list") mech = [] for m in mech: if m not in mechanics: errors.append(f"business_mechanics '{m}' not in {sorted(mechanics)}") return errors, warnings CLASSIFICATION_FILENAME = ".repo-classification.yaml" # Market-domain slugs (Repo Classification Standard v1.0 ยง6). MARKET_DOMAIN_SLUGS: frozenset[str] = frozenset({ "infotech", "financials", "communication", "consumer", "health", "industrials", "energy", "utilities", "materials", "realestate", "crypto", "agents", "space", "government", }) # Legacy coordination-domain slugs still found in workplan frontmatter ``domain:``. # Maps to market-domain slugs used by the Hub ``domains`` table post-migration. LEGACY_COORDINATION_TO_MARKET: dict[str, str] = { "custodian": "infotech", "railiance": "financials", "markitect": "communication", "coulomb_social": "communication", "personhood": "government", "foerster_capabilities": "agents", "capabilities": "agents", "canon": "infotech", "citation_evidence": "infotech", "helix_forge": "infotech", "inter_hub": "infotech", "netkingdom": "communication", "stack": "infotech", "vergabe_teilnahme": "government", "whynot": "consumer", "test_domain_v2": "infotech", } def resolve_topic_domain_slug( workplan_domain: str, *, repo_market_domain: str | None = None, ) -> str: """Map a workplan frontmatter ``domain`` value to a market-domain slug. Workplans may still carry legacy coordination slugs (e.g. ``custodian``) after the spine migration; topic lookup must use the market domain stored on ``domains.slug``. """ domain = (workplan_domain or "").strip() if not domain: return repo_market_domain or "" if domain in MARKET_DOMAIN_SLUGS: return domain mapped = LEGACY_COORDINATION_TO_MARKET.get(domain) if mapped: return mapped return repo_market_domain or domain def load_classification_document(path: Path) -> dict | None: """Load and return the YAML document, or ``None`` if missing/unreadable.""" if not path.is_file(): return None try: with path.open(encoding="utf-8") as fh: doc = yaml.safe_load(fh) except (OSError, yaml.YAMLError): return None return doc if isinstance(doc, dict) else None def extract_classification_block(doc: dict | None) -> dict | None: """Return the inner ``repo_classification`` mapping from a loaded document.""" if not doc: return None block = doc.get("repo_classification") return block if isinstance(block, dict) else None def load_classification_file( repo_path: Path | str, *, filename: str = CLASSIFICATION_FILENAME, ) -> tuple[ClassificationData | None, list[str], list[str]]: """Load ``.repo-classification.yaml`` from a repo root and validate it. Returns ``(data, errors, warnings)``. *data* is ``None`` when the file is missing, unreadable, or has blocking validation errors. """ root = Path(repo_path) doc = load_classification_document(root / filename) block = extract_classification_block(doc) if block is None: if doc is None: return (None, [f"{filename} missing or unreadable"], []) return (None, [f"{filename} has no repo_classification block"], []) errors, warnings = validate_classification(block) if errors: return (None, errors, warnings) return (ClassificationData.from_block(block), [], warnings)