Files

352 lines
15 KiB
Python

from __future__ import annotations
import json
import re
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from .loader import load_declarations
from .canon import edge_canon_mapping, node_canon_mapping
from .model import Declaration
@dataclass(frozen=True)
class ConsumerMatch:
consumer_service_id: str
dependency_id: str
required_capability_type: str
provider_capability_id: str = ""
provider_interface_id: str = ""
status: str = ""
@dataclass
class FabricGraph:
declarations: list[Declaration]
load_errors: list[tuple[Path, str]] = field(default_factory=list)
def __post_init__(self) -> None:
self.by_id: dict[str, Declaration] = {}
self.by_kind: dict[str, list[Declaration]] = defaultdict(list)
for declaration in self.declarations:
if declaration.id and declaration.id not in self.by_id:
self.by_id[declaration.id] = declaration
self.by_kind[declaration.kind].append(declaration)
self.services = {d.id: d for d in self.by_kind["ServiceDeclaration"]}
self.capabilities = {d.id: d for d in self.by_kind["CapabilityDeclaration"]}
self.interfaces = {d.id: d for d in self.by_kind["InterfaceDeclaration"]}
self.dependencies = {d.id: d for d in self.by_kind["DependencyDeclaration"]}
self.bindings = {d.id: d for d in self.by_kind["BindingAssertion"]}
self.bindings_by_dependency: dict[str, list[Declaration]] = defaultdict(list)
for binding in self.bindings.values():
dependency_id = str(binding.spec.get("dependency_id", ""))
if dependency_id:
self.bindings_by_dependency[dependency_id].append(binding)
def providers(self, capability: str) -> list[Declaration]:
if capability in self.capabilities:
return [self.capabilities[capability]]
return [
declaration
for declaration in self.capabilities.values()
if declaration.spec.get("capability_type") == capability
]
def consumers(self, target: str) -> list[ConsumerMatch]:
matches: list[ConsumerMatch] = []
target_interface_type = self.interfaces.get(target, Declaration(Path(), {})).spec.get("interface_type")
for dependency in self.dependencies.values():
spec = dependency.spec
requires = spec.get("requires", {})
dependency_matches = (
target == dependency.id
or target == requires.get("capability_id")
or target == requires.get("capability_type")
or target == spec.get("interface", {}).get("type")
or (target_interface_type and target_interface_type == spec.get("interface", {}).get("type"))
)
dependency_bindings = self.bindings_by_dependency.get(dependency.id, [])
binding_matches = [
binding
for binding in dependency_bindings
if target in {
binding.spec.get("provider_capability_id"),
binding.spec.get("provider_interface_id"),
}
]
if dependency_matches and not dependency_bindings:
matches.append(
ConsumerMatch(
consumer_service_id=str(spec.get("consumer_service_id", "")),
dependency_id=dependency.id,
required_capability_type=str(requires.get("capability_type", "")),
)
)
for binding in dependency_bindings:
if dependency_matches or binding in binding_matches:
matches.append(
ConsumerMatch(
consumer_service_id=str(spec.get("consumer_service_id", "")),
dependency_id=dependency.id,
required_capability_type=str(requires.get("capability_type", "")),
provider_capability_id=str(binding.spec.get("provider_capability_id", "")),
provider_interface_id=str(binding.spec.get("provider_interface_id", "")),
status=str(binding.spec.get("status", "")),
)
)
return sorted(matches, key=lambda item: (item.consumer_service_id, item.dependency_id))
def unresolved_dependencies(self) -> list[Declaration]:
unresolved: list[Declaration] = []
for dependency in self.dependencies.values():
providers = self.matching_providers(dependency)
bindings = self.bindings_by_dependency.get(dependency.id, [])
has_missing_binding = any(binding.spec.get("status") in {"missing", "disputed"} for binding in bindings)
if not providers or has_missing_binding:
unresolved.append(dependency)
return sorted(unresolved, key=lambda item: item.id)
def matching_providers(self, dependency: Declaration) -> list[Declaration]:
requires = dependency.spec.get("requires", {})
capability_id = requires.get("capability_id")
if capability_id:
provider = self.capabilities.get(str(capability_id))
return [provider] if provider is not None else []
capability_type = requires.get("capability_type")
return [
provider
for provider in self.capabilities.values()
if provider.spec.get("capability_type") == capability_type
]
def dependency_path_lines(self, service_id: str) -> list[str]:
if service_id not in self.services:
return [f"unknown service: {service_id}"]
lines: list[str] = []
def walk(current: str, indent: int, stack: list[str]) -> None:
prefix = " " * indent
if current in stack:
lines.append(f"{prefix}{current} (cycle)")
return
lines.append(f"{prefix}{current}")
deps = [
dep
for dep in self.dependencies.values()
if dep.spec.get("consumer_service_id") == current
]
if not deps:
lines.append(f"{prefix} no declared dependencies")
return
for dep in sorted(deps, key=lambda item: item.id):
required = dep.spec.get("requires", {}).get("capability_type", "")
lines.append(f"{prefix} requires {required}: {dep.id}")
bindings = self.bindings_by_dependency.get(dep.id, [])
if not bindings:
providers = self.matching_providers(dep)
if providers:
for provider in providers:
lines.append(f"{prefix} candidate {provider.id}")
else:
lines.append(f"{prefix} unresolved")
continue
for binding in sorted(bindings, key=lambda item: item.id):
provider_id = str(binding.spec.get("provider_capability_id", ""))
provider = self.capabilities.get(provider_id)
provider_service = provider.spec.get("service_id") if provider else ""
status = binding.spec.get("status", "")
lines.append(f"{prefix} {status} -> {provider_id}")
if provider_service and provider_service != current:
walk(str(provider_service), indent + 3, stack + [current])
walk(service_id, 0, [])
return lines
def blast_radius(self, interface: str) -> list[ConsumerMatch]:
if interface in self.interfaces:
return [
match
for match in self.consumers(interface)
if match.provider_interface_id == interface
]
return [
match
for match in self.consumers(interface)
if self.dependencies[match.dependency_id].spec.get("interface", {}).get("type") == interface
]
def to_export(self) -> dict[str, Any]:
nodes: list[dict[str, Any]] = []
edges: list[dict[str, str]] = []
for declaration in sorted(self.declarations, key=lambda item: (item.kind, item.id)):
canon_mapping = node_canon_mapping(declaration.kind)
nodes.append(
{
"id": declaration.id,
"kind": declaration.kind,
"name": declaration.metadata.get("name", declaration.id),
"repo": declaration.metadata.get("repo", ""),
"domain": declaration.metadata.get("domain", ""),
"lifecycle": declaration.spec.get("lifecycle", ""),
"canon_category": canon_mapping.category,
"canon_anchor": canon_mapping.canon_anchor,
"mapping_fit": canon_mapping.fit,
"evidence_state": "declared",
"attributes": _export_attributes(declaration),
}
)
for service in self.services.values():
for capability_id in service.spec.get("provides_capabilities", []):
edges.append(_export_edge(service.id, capability_id, "provides"))
for interface_id in service.spec.get("exposes_interfaces", []):
edges.append(_export_edge(service.id, interface_id, "exposes"))
for capability in self.capabilities.values():
for interface_id in capability.spec.get("interface_ids", []):
edges.append(_export_edge(capability.id, interface_id, "available_via"))
for dependency in self.dependencies.values():
consumer = str(dependency.spec.get("consumer_service_id", ""))
if consumer:
edges.append(_export_edge(consumer, dependency.id, "consumes"))
for binding in self.bindings_by_dependency.get(dependency.id, []):
edges.append(
_export_edge(
dependency.id,
str(binding.spec.get("provider_capability_id", "")),
f"binds:{binding.spec.get('status', '')}",
)
)
interface_id = str(binding.spec.get("provider_interface_id", ""))
if interface_id:
edges.append(_export_edge(dependency.id, interface_id, "uses_interface"))
return {
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "FabricGraphExport",
"nodes": nodes,
"edges": edges,
}
def to_json(self) -> str:
return json.dumps(self.to_export(), indent=2, sort_keys=True)
def to_mermaid(self) -> str:
export = self.to_export()
lines = ["graph TD"]
for node in export["nodes"]:
node_id = _mermaid_id(node["id"])
label = f"{node['name']}\\n{node['kind']}"
lines.append(f' {node_id}["{_escape_mermaid(label)}"]')
for edge in export["edges"]:
source = _mermaid_id(edge["from"])
target = _mermaid_id(edge["to"])
label = _escape_mermaid(edge["type"])
lines.append(f" {source} -- {label} --> {target}")
return "\n".join(lines)
def build_graph(paths: list[Path]) -> FabricGraph:
declarations, load_errors = load_declarations(paths)
return FabricGraph(declarations=declarations, load_errors=load_errors)
def _mermaid_id(value: str) -> str:
cleaned = re.sub(r"[^A-Za-z0-9_]", "_", value)
if cleaned and cleaned[0].isdigit():
cleaned = "_" + cleaned
return cleaned or "unknown"
def _escape_mermaid(value: str) -> str:
return value.replace('"', '\\"')
def _export_edge(source: str, target: str, edge_type: str) -> dict[str, Any]:
canon_mapping = edge_canon_mapping(edge_type)
return {
"from": source,
"to": target,
"type": edge_type,
"canonical_type": canon_mapping.canonical_type,
"canon_anchor": canon_mapping.canon_anchor,
"mapping_fit": canon_mapping.fit,
"display_only": canon_mapping.display_only,
"evidence_state": "declared",
}
def _export_attributes(declaration: Declaration) -> dict[str, Any]:
spec = declaration.spec
base = _base_export_attributes(declaration)
if declaration.kind == "ServiceDeclaration":
return {
**base,
"service_type": spec.get("service_type", ""),
"environments": list(spec.get("environments", [])),
"provides_capabilities": list(spec.get("provides_capabilities", [])),
"exposes_interfaces": list(spec.get("exposes_interfaces", [])),
}
if declaration.kind == "CapabilityDeclaration":
return {
**base,
"capability_type": spec.get("capability_type", ""),
"service_id": spec.get("service_id", ""),
"interface_ids": list(spec.get("interface_ids", [])),
"environments": list(spec.get("environments", [])),
}
if declaration.kind == "InterfaceDeclaration":
return {
**base,
"interface_type": spec.get("interface_type", ""),
"service_id": spec.get("service_id", ""),
"capability_ids": list(spec.get("capability_ids", [])),
"version": spec.get("version", ""),
"auth": spec.get("auth", ""),
"endpoint": spec.get("endpoint", {}),
"environments": list(spec.get("environments", [])),
}
if declaration.kind == "DependencyDeclaration":
requires = spec.get("requires", {})
interface = spec.get("interface", {})
return {
**base,
"consumer_service_id": spec.get("consumer_service_id", ""),
"requires_capability_id": requires.get("capability_id", ""),
"requires_capability_type": requires.get("capability_type", ""),
"interface_type": interface.get("type", ""),
"environments": list(spec.get("environments", [])),
"criticality": spec.get("criticality", ""),
}
if declaration.kind == "BindingAssertion":
return {
**base,
"dependency_id": spec.get("dependency_id", ""),
"provider_capability_id": spec.get("provider_capability_id", ""),
"provider_interface_id": spec.get("provider_interface_id", ""),
"status": spec.get("status", ""),
}
return base
def _base_export_attributes(declaration: Declaration) -> dict[str, Any]:
source_links = declaration.metadata.get("source_links", [])
attributes = {
"owner": declaration.metadata.get("owner", ""),
"description": declaration.spec.get("description", ""),
"source_path": str(declaration.path),
"source_links": source_links if isinstance(source_links, list) else [],
}
if isinstance(declaration.spec.get("deployment_overlay"), dict):
attributes["deployment_overlay"] = declaration.spec["deployment_overlay"]
return attributes