from __future__ import annotations from collections import defaultdict from pathlib import Path from typing import Any from .loader import load_declarations, load_yaml, repo_root from .model import Declaration, ValidationReport from .schema_validation import draft202012_validator SCHEMA_BY_KIND = { "ServiceDeclaration": "service.schema.yaml", "CapabilityDeclaration": "capability.schema.yaml", "InterfaceDeclaration": "interface.schema.yaml", "DependencyDeclaration": "dependency.schema.yaml", "BindingAssertion": "binding.schema.yaml", } def validate_roots(paths: list[Path]) -> ValidationReport: root = repo_root() report = ValidationReport() declarations, load_errors = load_declarations(paths) for path, message in load_errors: report.add("ERROR", "load.yaml", message, path) _validate_schema(root, declarations, report) _validate_graph(root, declarations, report) return report def _validate_schema(root: Path, declarations: list[Declaration], report: ValidationReport) -> None: schemas_dir = root / "schemas" for declaration in declarations: schema_name = SCHEMA_BY_KIND.get(declaration.kind) if schema_name is None: report.add( "ERROR", "schema.kind", f"unknown declaration kind {declaration.kind!r}", declaration.path, ) continue schema_path = schemas_dir / schema_name validator = draft202012_validator(schema_path) for error in sorted(validator.iter_errors(declaration.data), key=lambda e: list(e.path)): location = ".".join(str(part) for part in error.path) or "" report.add( "ERROR", "schema.invalid", f"{location}: {error.message}", declaration.path, ) def _validate_graph(root: Path, declarations: list[Declaration], report: ValidationReport) -> None: by_id: dict[str, Declaration] = {} by_kind: dict[str, list[Declaration]] = defaultdict(list) for declaration in declarations: if not declaration.id: continue if declaration.id in by_id: report.add( "ERROR", "graph.duplicate_id", f"duplicate declaration id {declaration.id!r}", declaration.path, ) else: by_id[declaration.id] = declaration by_kind[declaration.kind].append(declaration) cap_types, iface_types, expected_ifaces = _load_type_catalog(root, report) services = {d.id: d for d in by_kind["ServiceDeclaration"]} capabilities = {d.id: d for d in by_kind["CapabilityDeclaration"]} interfaces = {d.id: d for d in by_kind["InterfaceDeclaration"]} dependencies = {d.id: d for d in by_kind["DependencyDeclaration"]} for declaration in by_kind["CapabilityDeclaration"]: spec = declaration.spec capability_type = spec.get("capability_type") if capability_type not in cap_types: report.add("ERROR", "catalog.unknown_capability_type", f"unknown capability type {capability_type!r}", declaration.path) _require_ref(report, declaration, "service_id", services, spec.get("service_id")) for interface_id in spec.get("interface_ids", []): _require_ref(report, declaration, "interface_ids", interfaces, interface_id) for declaration in by_kind["InterfaceDeclaration"]: spec = declaration.spec interface_type = spec.get("interface_type") if interface_type not in iface_types: report.add("ERROR", "catalog.unknown_interface_type", f"unknown interface type {interface_type!r}", declaration.path) _require_ref(report, declaration, "service_id", services, spec.get("service_id")) for capability_id in spec.get("capability_ids", []): _require_ref(report, declaration, "capability_ids", capabilities, capability_id) for declaration in by_kind["ServiceDeclaration"]: spec = declaration.spec for capability_id in spec.get("provides_capabilities", []): _require_ref(report, declaration, "provides_capabilities", capabilities, capability_id) for interface_id in spec.get("exposes_interfaces", []): _require_ref(report, declaration, "exposes_interfaces", interfaces, interface_id) for declaration in by_kind["DependencyDeclaration"]: _validate_dependency( declaration, report, services, capabilities, interfaces, cap_types, iface_types, expected_ifaces, ) for declaration in by_kind["BindingAssertion"]: _validate_binding(declaration, report, dependencies, capabilities, interfaces) _detect_cycles(by_kind["DependencyDeclaration"], by_kind["BindingAssertion"], services, capabilities, report) def _load_type_catalog(root: Path, report: ValidationReport) -> tuple[set[str], set[str], dict[str, set[str]]]: cap_types: set[str] = set() iface_types: set[str] = set() expected_ifaces: dict[str, set[str]] = {} try: cap_catalog = load_yaml(root / "catalog/capability-types.yaml") for item in cap_catalog["spec"]["types"]: cap_types.add(item["id"]) expected_ifaces[item["id"]] = set(item.get("expected_interface_types", [])) except Exception as exc: report.add("ERROR", "catalog.load", f"cannot load capability catalog: {exc}") try: iface_catalog = load_yaml(root / "catalog/interface-types.yaml") for item in iface_catalog["spec"]["types"]: iface_types.add(item["id"]) except Exception as exc: report.add("ERROR", "catalog.load", f"cannot load interface catalog: {exc}") return cap_types, iface_types, expected_ifaces def _require_ref( report: ValidationReport, declaration: Declaration, field: str, collection: dict[str, Declaration], value: Any, ) -> None: if isinstance(value, str) and value in collection: return report.add("ERROR", "graph.missing_ref", f"{field} references unknown id {value!r}", declaration.path) def _validate_dependency( declaration: Declaration, report: ValidationReport, services: dict[str, Declaration], capabilities: dict[str, Declaration], interfaces: dict[str, Declaration], cap_types: set[str], iface_types: set[str], expected_ifaces: dict[str, set[str]], ) -> None: spec = declaration.spec _require_ref(report, declaration, "consumer_service_id", services, spec.get("consumer_service_id")) requires = spec.get("requires", {}) capability_type = requires.get("capability_type") capability_id = requires.get("capability_id") if capability_type not in cap_types: report.add("ERROR", "catalog.unknown_capability_type", f"unknown required capability type {capability_type!r}", declaration.path) if capability_id: _require_ref(report, declaration, "requires.capability_id", capabilities, capability_id) interface_type = spec.get("interface", {}).get("type") if interface_type: if interface_type not in iface_types: report.add("ERROR", "catalog.unknown_interface_type", f"unknown dependency interface type {interface_type!r}", declaration.path) expected = expected_ifaces.get(str(capability_type), set()) if expected and interface_type not in expected: report.add( "WARN", "graph.unexpected_interface_type", f"interface type {interface_type!r} is not expected for capability type {capability_type!r}", declaration.path, ) providers = _matching_providers(requires, capabilities) if not providers: report.add( "ERROR", "graph.missing_provider", f"no provider capability found for {requires!r}", declaration.path, ) if _is_active_production(spec) and not declaration.metadata.get("source_links"): report.add( "ERROR", "graph.missing_source_links", "active production dependency requires metadata.source_links", declaration.path, ) if _is_active_production(spec): viable = [provider for provider in providers if _provider_covers_dependency(provider.spec, spec)] if providers and not viable: report.add( "ERROR", "graph.incompatible_provider", "no matching provider is active in the dependency environment", declaration.path, ) def _validate_binding( declaration: Declaration, report: ValidationReport, dependencies: dict[str, Declaration], capabilities: dict[str, Declaration], interfaces: dict[str, Declaration], ) -> None: spec = declaration.spec dependency = dependencies.get(str(spec.get("dependency_id"))) provider = capabilities.get(str(spec.get("provider_capability_id"))) _require_ref(report, declaration, "dependency_id", dependencies, spec.get("dependency_id")) _require_ref(report, declaration, "provider_capability_id", capabilities, spec.get("provider_capability_id")) if spec.get("provider_interface_id"): _require_ref(report, declaration, "provider_interface_id", interfaces, spec.get("provider_interface_id")) if dependency and provider: required_type = dependency.spec.get("requires", {}).get("capability_type") provider_type = provider.spec.get("capability_type") if required_type != provider_type: report.add( "ERROR", "graph.binding_type_mismatch", f"binding provider type {provider_type!r} does not satisfy dependency type {required_type!r}", declaration.path, ) def _matching_providers(requires: dict[str, Any], capabilities: dict[str, Declaration]) -> list[Declaration]: capability_id = requires.get("capability_id") if capability_id: provider = capabilities.get(str(capability_id)) return [provider] if provider is not None else [] capability_type = requires.get("capability_type") return [ declaration for declaration in capabilities.values() if declaration.spec.get("capability_type") == capability_type ] def _is_active_production(spec: dict[str, Any]) -> bool: environments = set(spec.get("environments", [])) return spec.get("lifecycle") == "active" and bool(environments & {"prod", "all"}) def _provider_covers_dependency(provider_spec: dict[str, Any], dependency_spec: dict[str, Any]) -> bool: if provider_spec.get("lifecycle") != "active": return False provider_envs = set(provider_spec.get("environments", [])) dependency_envs = set(dependency_spec.get("environments", [])) if "all" in provider_envs: return True if "all" in dependency_envs: return {"dev", "staging", "prod"}.issubset(provider_envs) return bool(provider_envs & dependency_envs) def _detect_cycles( dependencies: list[Declaration], bindings: list[Declaration], services: dict[str, Declaration], capabilities: dict[str, Declaration], report: ValidationReport, ) -> None: dependency_by_id = {d.id: d for d in dependencies} provider_by_dependency: dict[str, Declaration] = {} for binding in bindings: dep = str(binding.spec.get("dependency_id")) provider_capability_id = str(binding.spec.get("provider_capability_id")) provider = capabilities.get(provider_capability_id) if dep and provider: provider_by_dependency[dep] = provider edges: dict[str, set[str]] = defaultdict(set) for dep_id, provider in provider_by_dependency.items(): dependency = dependency_by_id.get(dep_id) if dependency is None: continue consumer_service = dependency.spec.get("consumer_service_id") provider_service = provider.spec.get("service_id") if consumer_service in services and provider_service in services and consumer_service != provider_service: edges[str(consumer_service)].add(str(provider_service)) visiting: set[str] = set() visited: set[str] = set() def visit(node: str, stack: list[str]) -> None: if node in visiting: cycle = stack[stack.index(node):] + [node] report.add("WARN", "graph.cycle", "service dependency cycle: " + " -> ".join(cycle)) return if node in visited: return visiting.add(node) for target in edges.get(node, set()): visit(target, stack + [target]) visiting.remove(node) visited.add(node) for node in sorted(edges): visit(node, [node])