from __future__ import annotations import json import re from collections import defaultdict from dataclasses import dataclass, field from pathlib import Path from typing import Any from .loader import load_declarations from .canon import edge_canon_mapping, node_canon_mapping from .model import Declaration @dataclass(frozen=True) class ConsumerMatch: consumer_service_id: str dependency_id: str required_capability_type: str provider_capability_id: str = "" provider_interface_id: str = "" status: str = "" @dataclass class FabricGraph: declarations: list[Declaration] load_errors: list[tuple[Path, str]] = field(default_factory=list) def __post_init__(self) -> None: self.by_id: dict[str, Declaration] = {} self.by_kind: dict[str, list[Declaration]] = defaultdict(list) for declaration in self.declarations: if declaration.id and declaration.id not in self.by_id: self.by_id[declaration.id] = declaration self.by_kind[declaration.kind].append(declaration) self.services = {d.id: d for d in self.by_kind["ServiceDeclaration"]} self.capabilities = {d.id: d for d in self.by_kind["CapabilityDeclaration"]} self.interfaces = {d.id: d for d in self.by_kind["InterfaceDeclaration"]} self.dependencies = {d.id: d for d in self.by_kind["DependencyDeclaration"]} self.bindings = {d.id: d for d in self.by_kind["BindingAssertion"]} self.bindings_by_dependency: dict[str, list[Declaration]] = defaultdict(list) for binding in self.bindings.values(): dependency_id = str(binding.spec.get("dependency_id", "")) if dependency_id: self.bindings_by_dependency[dependency_id].append(binding) def providers(self, capability: str) -> list[Declaration]: if capability in self.capabilities: return [self.capabilities[capability]] return [ declaration for declaration in self.capabilities.values() if declaration.spec.get("capability_type") == capability ] def consumers(self, target: str) -> list[ConsumerMatch]: matches: list[ConsumerMatch] = [] target_interface_type = self.interfaces.get(target, Declaration(Path(), {})).spec.get("interface_type") for dependency in self.dependencies.values(): spec = dependency.spec requires = spec.get("requires", {}) dependency_matches = ( target == dependency.id or target == requires.get("capability_id") or target == requires.get("capability_type") or target == spec.get("interface", {}).get("type") or (target_interface_type and target_interface_type == spec.get("interface", {}).get("type")) ) dependency_bindings = self.bindings_by_dependency.get(dependency.id, []) binding_matches = [ binding for binding in dependency_bindings if target in { binding.spec.get("provider_capability_id"), binding.spec.get("provider_interface_id"), } ] if dependency_matches and not dependency_bindings: matches.append( ConsumerMatch( consumer_service_id=str(spec.get("consumer_service_id", "")), dependency_id=dependency.id, required_capability_type=str(requires.get("capability_type", "")), ) ) for binding in dependency_bindings: if dependency_matches or binding in binding_matches: matches.append( ConsumerMatch( consumer_service_id=str(spec.get("consumer_service_id", "")), dependency_id=dependency.id, required_capability_type=str(requires.get("capability_type", "")), provider_capability_id=str(binding.spec.get("provider_capability_id", "")), provider_interface_id=str(binding.spec.get("provider_interface_id", "")), status=str(binding.spec.get("status", "")), ) ) return sorted(matches, key=lambda item: (item.consumer_service_id, item.dependency_id)) def unresolved_dependencies(self) -> list[Declaration]: unresolved: list[Declaration] = [] for dependency in self.dependencies.values(): providers = self.matching_providers(dependency) bindings = self.bindings_by_dependency.get(dependency.id, []) has_missing_binding = any(binding.spec.get("status") in {"missing", "disputed"} for binding in bindings) if not providers or has_missing_binding: unresolved.append(dependency) return sorted(unresolved, key=lambda item: item.id) def matching_providers(self, dependency: Declaration) -> list[Declaration]: requires = dependency.spec.get("requires", {}) capability_id = requires.get("capability_id") if capability_id: provider = self.capabilities.get(str(capability_id)) return [provider] if provider is not None else [] capability_type = requires.get("capability_type") return [ provider for provider in self.capabilities.values() if provider.spec.get("capability_type") == capability_type ] def dependency_path_lines(self, service_id: str) -> list[str]: if service_id not in self.services: return [f"unknown service: {service_id}"] lines: list[str] = [] def walk(current: str, indent: int, stack: list[str]) -> None: prefix = " " * indent if current in stack: lines.append(f"{prefix}{current} (cycle)") return lines.append(f"{prefix}{current}") deps = [ dep for dep in self.dependencies.values() if dep.spec.get("consumer_service_id") == current ] if not deps: lines.append(f"{prefix} no declared dependencies") return for dep in sorted(deps, key=lambda item: item.id): required = dep.spec.get("requires", {}).get("capability_type", "") lines.append(f"{prefix} requires {required}: {dep.id}") bindings = self.bindings_by_dependency.get(dep.id, []) if not bindings: providers = self.matching_providers(dep) if providers: for provider in providers: lines.append(f"{prefix} candidate {provider.id}") else: lines.append(f"{prefix} unresolved") continue for binding in sorted(bindings, key=lambda item: item.id): provider_id = str(binding.spec.get("provider_capability_id", "")) provider = self.capabilities.get(provider_id) provider_service = provider.spec.get("service_id") if provider else "" status = binding.spec.get("status", "") lines.append(f"{prefix} {status} -> {provider_id}") if provider_service and provider_service != current: walk(str(provider_service), indent + 3, stack + [current]) walk(service_id, 0, []) return lines def blast_radius(self, interface: str) -> list[ConsumerMatch]: if interface in self.interfaces: return [ match for match in self.consumers(interface) if match.provider_interface_id == interface ] return [ match for match in self.consumers(interface) if self.dependencies[match.dependency_id].spec.get("interface", {}).get("type") == interface ] def to_export(self) -> dict[str, Any]: nodes: list[dict[str, Any]] = [] edges: list[dict[str, str]] = [] for declaration in sorted(self.declarations, key=lambda item: (item.kind, item.id)): canon_mapping = node_canon_mapping(declaration.kind) nodes.append( { "id": declaration.id, "kind": declaration.kind, "name": declaration.metadata.get("name", declaration.id), "repo": declaration.metadata.get("repo", ""), "domain": declaration.metadata.get("domain", ""), "lifecycle": declaration.spec.get("lifecycle", ""), "canon_category": canon_mapping.category, "canon_anchor": canon_mapping.canon_anchor, "mapping_fit": canon_mapping.fit, "evidence_state": "declared", "attributes": _export_attributes(declaration), } ) for service in self.services.values(): for capability_id in service.spec.get("provides_capabilities", []): edges.append(_export_edge(service.id, capability_id, "provides")) for interface_id in service.spec.get("exposes_interfaces", []): edges.append(_export_edge(service.id, interface_id, "exposes")) for capability in self.capabilities.values(): for interface_id in capability.spec.get("interface_ids", []): edges.append(_export_edge(capability.id, interface_id, "available_via")) for dependency in self.dependencies.values(): consumer = str(dependency.spec.get("consumer_service_id", "")) if consumer: edges.append(_export_edge(consumer, dependency.id, "consumes")) for binding in self.bindings_by_dependency.get(dependency.id, []): edges.append( _export_edge( dependency.id, str(binding.spec.get("provider_capability_id", "")), f"binds:{binding.spec.get('status', '')}", ) ) interface_id = str(binding.spec.get("provider_interface_id", "")) if interface_id: edges.append(_export_edge(dependency.id, interface_id, "uses_interface")) return { "apiVersion": "railiance.fabric/v1alpha1", "kind": "FabricGraphExport", "nodes": nodes, "edges": edges, } def to_json(self) -> str: return json.dumps(self.to_export(), indent=2, sort_keys=True) def to_mermaid(self) -> str: export = self.to_export() lines = ["graph TD"] for node in export["nodes"]: node_id = _mermaid_id(node["id"]) label = f"{node['name']}\\n{node['kind']}" lines.append(f' {node_id}["{_escape_mermaid(label)}"]') for edge in export["edges"]: source = _mermaid_id(edge["from"]) target = _mermaid_id(edge["to"]) label = _escape_mermaid(edge["type"]) lines.append(f" {source} -- {label} --> {target}") return "\n".join(lines) def build_graph(paths: list[Path]) -> FabricGraph: declarations, load_errors = load_declarations(paths) return FabricGraph(declarations=declarations, load_errors=load_errors) def _mermaid_id(value: str) -> str: cleaned = re.sub(r"[^A-Za-z0-9_]", "_", value) if cleaned and cleaned[0].isdigit(): cleaned = "_" + cleaned return cleaned or "unknown" def _escape_mermaid(value: str) -> str: return value.replace('"', '\\"') def _export_edge(source: str, target: str, edge_type: str) -> dict[str, Any]: canon_mapping = edge_canon_mapping(edge_type) return { "from": source, "to": target, "type": edge_type, "canonical_type": canon_mapping.canonical_type, "canon_anchor": canon_mapping.canon_anchor, "mapping_fit": canon_mapping.fit, "display_only": canon_mapping.display_only, "evidence_state": "declared", } def _export_attributes(declaration: Declaration) -> dict[str, Any]: spec = declaration.spec base = _base_export_attributes(declaration) if declaration.kind == "ServiceDeclaration": return { **base, "service_type": spec.get("service_type", ""), "environments": list(spec.get("environments", [])), "provides_capabilities": list(spec.get("provides_capabilities", [])), "exposes_interfaces": list(spec.get("exposes_interfaces", [])), } if declaration.kind == "CapabilityDeclaration": return { **base, "capability_type": spec.get("capability_type", ""), "service_id": spec.get("service_id", ""), "interface_ids": list(spec.get("interface_ids", [])), "environments": list(spec.get("environments", [])), } if declaration.kind == "InterfaceDeclaration": return { **base, "interface_type": spec.get("interface_type", ""), "service_id": spec.get("service_id", ""), "capability_ids": list(spec.get("capability_ids", [])), "version": spec.get("version", ""), "auth": spec.get("auth", ""), "endpoint": spec.get("endpoint", {}), "environments": list(spec.get("environments", [])), } if declaration.kind == "DependencyDeclaration": requires = spec.get("requires", {}) interface = spec.get("interface", {}) return { **base, "consumer_service_id": spec.get("consumer_service_id", ""), "requires_capability_id": requires.get("capability_id", ""), "requires_capability_type": requires.get("capability_type", ""), "interface_type": interface.get("type", ""), "environments": list(spec.get("environments", [])), "criticality": spec.get("criticality", ""), } if declaration.kind == "BindingAssertion": return { **base, "dependency_id": spec.get("dependency_id", ""), "provider_capability_id": spec.get("provider_capability_id", ""), "provider_interface_id": spec.get("provider_interface_id", ""), "status": spec.get("status", ""), } return base def _base_export_attributes(declaration: Declaration) -> dict[str, Any]: source_links = declaration.metadata.get("source_links", []) attributes = { "owner": declaration.metadata.get("owner", ""), "description": declaration.spec.get("description", ""), "source_path": str(declaration.path), "source_links": source_links if isinstance(source_links, list) else [], } if isinstance(declaration.spec.get("deployment_overlay"), dict): attributes["deployment_overlay"] = declaration.spec["deployment_overlay"] return attributes