Files
railiance-fabric/railiance_fabric/validation.py

323 lines
13 KiB
Python

from __future__ import annotations
from collections import defaultdict
from pathlib import Path
from typing import Any
from .loader import load_declarations, load_yaml, repo_root
from .model import Declaration, ValidationReport
from .schema_validation import draft202012_validator
SCHEMA_BY_KIND = {
"ServiceDeclaration": "service.schema.yaml",
"CapabilityDeclaration": "capability.schema.yaml",
"InterfaceDeclaration": "interface.schema.yaml",
"DependencyDeclaration": "dependency.schema.yaml",
"BindingAssertion": "binding.schema.yaml",
}
def validate_roots(paths: list[Path]) -> ValidationReport:
root = repo_root()
report = ValidationReport()
declarations, load_errors = load_declarations(paths)
for path, message in load_errors:
report.add("ERROR", "load.yaml", message, path)
_validate_schema(root, declarations, report)
_validate_graph(root, declarations, report)
return report
def _validate_schema(root: Path, declarations: list[Declaration], report: ValidationReport) -> None:
schemas_dir = root / "schemas"
for declaration in declarations:
schema_name = SCHEMA_BY_KIND.get(declaration.kind)
if schema_name is None:
report.add(
"ERROR",
"schema.kind",
f"unknown declaration kind {declaration.kind!r}",
declaration.path,
)
continue
schema_path = schemas_dir / schema_name
validator = draft202012_validator(schema_path)
for error in sorted(validator.iter_errors(declaration.data), key=lambda e: list(e.path)):
location = ".".join(str(part) for part in error.path) or "<root>"
report.add(
"ERROR",
"schema.invalid",
f"{location}: {error.message}",
declaration.path,
)
def _validate_graph(root: Path, declarations: list[Declaration], report: ValidationReport) -> None:
by_id: dict[str, Declaration] = {}
by_kind: dict[str, list[Declaration]] = defaultdict(list)
for declaration in declarations:
if not declaration.id:
continue
if declaration.id in by_id:
report.add(
"ERROR",
"graph.duplicate_id",
f"duplicate declaration id {declaration.id!r}",
declaration.path,
)
else:
by_id[declaration.id] = declaration
by_kind[declaration.kind].append(declaration)
cap_types, iface_types, expected_ifaces = _load_type_catalog(root, report)
services = {d.id: d for d in by_kind["ServiceDeclaration"]}
capabilities = {d.id: d for d in by_kind["CapabilityDeclaration"]}
interfaces = {d.id: d for d in by_kind["InterfaceDeclaration"]}
dependencies = {d.id: d for d in by_kind["DependencyDeclaration"]}
for declaration in by_kind["CapabilityDeclaration"]:
spec = declaration.spec
capability_type = spec.get("capability_type")
if capability_type not in cap_types:
report.add("ERROR", "catalog.unknown_capability_type", f"unknown capability type {capability_type!r}", declaration.path)
_require_ref(report, declaration, "service_id", services, spec.get("service_id"))
for interface_id in spec.get("interface_ids", []):
_require_ref(report, declaration, "interface_ids", interfaces, interface_id)
for declaration in by_kind["InterfaceDeclaration"]:
spec = declaration.spec
interface_type = spec.get("interface_type")
if interface_type not in iface_types:
report.add("ERROR", "catalog.unknown_interface_type", f"unknown interface type {interface_type!r}", declaration.path)
_require_ref(report, declaration, "service_id", services, spec.get("service_id"))
for capability_id in spec.get("capability_ids", []):
_require_ref(report, declaration, "capability_ids", capabilities, capability_id)
for declaration in by_kind["ServiceDeclaration"]:
spec = declaration.spec
for capability_id in spec.get("provides_capabilities", []):
_require_ref(report, declaration, "provides_capabilities", capabilities, capability_id)
for interface_id in spec.get("exposes_interfaces", []):
_require_ref(report, declaration, "exposes_interfaces", interfaces, interface_id)
for declaration in by_kind["DependencyDeclaration"]:
_validate_dependency(
declaration,
report,
services,
capabilities,
interfaces,
cap_types,
iface_types,
expected_ifaces,
)
for declaration in by_kind["BindingAssertion"]:
_validate_binding(declaration, report, dependencies, capabilities, interfaces)
_detect_cycles(by_kind["DependencyDeclaration"], by_kind["BindingAssertion"], services, capabilities, report)
def _load_type_catalog(root: Path, report: ValidationReport) -> tuple[set[str], set[str], dict[str, set[str]]]:
cap_types: set[str] = set()
iface_types: set[str] = set()
expected_ifaces: dict[str, set[str]] = {}
try:
cap_catalog = load_yaml(root / "catalog/capability-types.yaml")
for item in cap_catalog["spec"]["types"]:
cap_types.add(item["id"])
expected_ifaces[item["id"]] = set(item.get("expected_interface_types", []))
except Exception as exc:
report.add("ERROR", "catalog.load", f"cannot load capability catalog: {exc}")
try:
iface_catalog = load_yaml(root / "catalog/interface-types.yaml")
for item in iface_catalog["spec"]["types"]:
iface_types.add(item["id"])
except Exception as exc:
report.add("ERROR", "catalog.load", f"cannot load interface catalog: {exc}")
return cap_types, iface_types, expected_ifaces
def _require_ref(
report: ValidationReport,
declaration: Declaration,
field: str,
collection: dict[str, Declaration],
value: Any,
) -> None:
if isinstance(value, str) and value in collection:
return
report.add("ERROR", "graph.missing_ref", f"{field} references unknown id {value!r}", declaration.path)
def _validate_dependency(
declaration: Declaration,
report: ValidationReport,
services: dict[str, Declaration],
capabilities: dict[str, Declaration],
interfaces: dict[str, Declaration],
cap_types: set[str],
iface_types: set[str],
expected_ifaces: dict[str, set[str]],
) -> None:
spec = declaration.spec
_require_ref(report, declaration, "consumer_service_id", services, spec.get("consumer_service_id"))
requires = spec.get("requires", {})
capability_type = requires.get("capability_type")
capability_id = requires.get("capability_id")
if capability_type not in cap_types:
report.add("ERROR", "catalog.unknown_capability_type", f"unknown required capability type {capability_type!r}", declaration.path)
if capability_id:
_require_ref(report, declaration, "requires.capability_id", capabilities, capability_id)
interface_type = spec.get("interface", {}).get("type")
if interface_type:
if interface_type not in iface_types:
report.add("ERROR", "catalog.unknown_interface_type", f"unknown dependency interface type {interface_type!r}", declaration.path)
expected = expected_ifaces.get(str(capability_type), set())
if expected and interface_type not in expected:
report.add(
"WARN",
"graph.unexpected_interface_type",
f"interface type {interface_type!r} is not expected for capability type {capability_type!r}",
declaration.path,
)
providers = _matching_providers(requires, capabilities)
if not providers:
report.add(
"ERROR",
"graph.missing_provider",
f"no provider capability found for {requires!r}",
declaration.path,
)
if _is_active_production(spec) and not declaration.metadata.get("source_links"):
report.add(
"ERROR",
"graph.missing_source_links",
"active production dependency requires metadata.source_links",
declaration.path,
)
if _is_active_production(spec):
viable = [provider for provider in providers if _provider_covers_dependency(provider.spec, spec)]
if providers and not viable:
report.add(
"ERROR",
"graph.incompatible_provider",
"no matching provider is active in the dependency environment",
declaration.path,
)
def _validate_binding(
declaration: Declaration,
report: ValidationReport,
dependencies: dict[str, Declaration],
capabilities: dict[str, Declaration],
interfaces: dict[str, Declaration],
) -> None:
spec = declaration.spec
dependency = dependencies.get(str(spec.get("dependency_id")))
provider = capabilities.get(str(spec.get("provider_capability_id")))
_require_ref(report, declaration, "dependency_id", dependencies, spec.get("dependency_id"))
_require_ref(report, declaration, "provider_capability_id", capabilities, spec.get("provider_capability_id"))
if spec.get("provider_interface_id"):
_require_ref(report, declaration, "provider_interface_id", interfaces, spec.get("provider_interface_id"))
if dependency and provider:
required_type = dependency.spec.get("requires", {}).get("capability_type")
provider_type = provider.spec.get("capability_type")
if required_type != provider_type:
report.add(
"ERROR",
"graph.binding_type_mismatch",
f"binding provider type {provider_type!r} does not satisfy dependency type {required_type!r}",
declaration.path,
)
def _matching_providers(requires: dict[str, Any], capabilities: dict[str, Declaration]) -> list[Declaration]:
capability_id = requires.get("capability_id")
if capability_id:
provider = capabilities.get(str(capability_id))
return [provider] if provider is not None else []
capability_type = requires.get("capability_type")
return [
declaration
for declaration in capabilities.values()
if declaration.spec.get("capability_type") == capability_type
]
def _is_active_production(spec: dict[str, Any]) -> bool:
environments = set(spec.get("environments", []))
return spec.get("lifecycle") == "active" and bool(environments & {"prod", "all"})
def _provider_covers_dependency(provider_spec: dict[str, Any], dependency_spec: dict[str, Any]) -> bool:
if provider_spec.get("lifecycle") != "active":
return False
provider_envs = set(provider_spec.get("environments", []))
dependency_envs = set(dependency_spec.get("environments", []))
if "all" in provider_envs:
return True
if "all" in dependency_envs:
return {"dev", "staging", "prod"}.issubset(provider_envs)
return bool(provider_envs & dependency_envs)
def _detect_cycles(
dependencies: list[Declaration],
bindings: list[Declaration],
services: dict[str, Declaration],
capabilities: dict[str, Declaration],
report: ValidationReport,
) -> None:
dependency_by_id = {d.id: d for d in dependencies}
provider_by_dependency: dict[str, Declaration] = {}
for binding in bindings:
dep = str(binding.spec.get("dependency_id"))
provider_capability_id = str(binding.spec.get("provider_capability_id"))
provider = capabilities.get(provider_capability_id)
if dep and provider:
provider_by_dependency[dep] = provider
edges: dict[str, set[str]] = defaultdict(set)
for dep_id, provider in provider_by_dependency.items():
dependency = dependency_by_id.get(dep_id)
if dependency is None:
continue
consumer_service = dependency.spec.get("consumer_service_id")
provider_service = provider.spec.get("service_id")
if consumer_service in services and provider_service in services and consumer_service != provider_service:
edges[str(consumer_service)].add(str(provider_service))
visiting: set[str] = set()
visited: set[str] = set()
def visit(node: str, stack: list[str]) -> None:
if node in visiting:
cycle = stack[stack.index(node):] + [node]
report.add("WARN", "graph.cycle", "service dependency cycle: " + " -> ".join(cycle))
return
if node in visited:
return
visiting.add(node)
for target in edges.get(node, set()):
visit(target, stack + [target])
visiting.remove(node)
visited.add(node)
for node in sorted(edges):
visit(node, [node])