from __future__ import annotations import json from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Protocol from .canon import edge_canon_mapping, evidence_state_for, node_canon_mapping from .discovery import ( attribute_stable_key, discovery_stable_key, relationship_stable_key, replacement_scope_id, short_fingerprint, source_fingerprint, ) from .loader import load_yaml CONNECTOR_TYPES = { "package_registry": "Package registry metadata such as package descriptions, versions, and ownership.", "container_registry": "Container image metadata such as image names, tags, digests, and labels.", "api_catalog": "API catalog metadata such as OpenAPI/AsyncAPI records and ownership.", "service_catalog": "Service catalog metadata such as service ownership, lifecycle, and dependencies.", "deployment_inventory": "Deployment inventory metadata such as environments, workloads, and runtime placement.", "fabric_registry": "Existing Railiance Fabric registry or onboarding manifest data.", } @dataclass(frozen=True) class ConnectorConfig: connector_id: str connector_type: str source_path: str | None = None enabled: bool = True @dataclass(frozen=True) class ConnectorContext: repo_path: Path repo_slug: str snapshot: dict[str, Any] @dataclass class ConnectorResult: connector_run: dict[str, object] replacement_scopes: list[dict[str, object]] candidates: dict[str, list[dict[str, object]]] review_artifacts: list[dict[str, object]] class DiscoveryConnector(Protocol): connector_id: str connector_type: str def collect(self, context: ConnectorContext) -> ConnectorResult: """Collect connector evidence without mutating ``context.snapshot``.""" class PackageRegistryConnector(DiscoveryConnector, Protocol): pass class ContainerRegistryConnector(DiscoveryConnector, Protocol): pass class ApiCatalogConnector(DiscoveryConnector, Protocol): pass class ServiceCatalogConnector(DiscoveryConnector, Protocol): pass class DeploymentInventoryConnector(DiscoveryConnector, Protocol): pass class FabricRegistryConnector(DiscoveryConnector, Protocol): pass class LocalFabricRegistryConnector: connector_id = "local-fabric-registry" connector_type = "fabric_registry" def __init__(self, manifest_path: Path) -> None: self.manifest_path = manifest_path def collect(self, context: ConnectorContext) -> ConnectorResult: started_at = _utc_now() source = str(self.manifest_path) if not self.manifest_path.is_file(): return _connector_result( connector_id=self.connector_id, connector_type=self.connector_type, status="unavailable", source=source, message=f"manifest not found: {self.manifest_path}", started_at=started_at, review_artifacts=[ _review_artifact( artifact_type="connector_unavailable", origin="registry", message=f"Local Fabric registry manifest not found: {self.manifest_path}", payload={"source": source}, ) ], ) try: manifest = load_yaml(self.manifest_path) except Exception as exc: return _connector_result( connector_id=self.connector_id, connector_type=self.connector_type, status="failed", source=source, message=str(exc), started_at=started_at, review_artifacts=[ _review_artifact( artifact_type="connector_failed", origin="registry", message=f"Cannot load Local Fabric registry manifest: {exc}", payload={"source": source}, ) ], ) repositories = manifest.get("repositories") if isinstance(manifest, dict) else None if not isinstance(repositories, list): return _connector_result( connector_id=self.connector_id, connector_type=self.connector_type, status="failed", source=source, message="manifest requires a repositories list", started_at=started_at, review_artifacts=[ _review_artifact( artifact_type="connector_failed", origin="registry", message="Local Fabric registry manifest requires a repositories list.", payload={"source": source}, ) ], ) match_index, match = _manifest_repo(repositories, context.repo_slug) if match is None: return _connector_result( connector_id=self.connector_id, connector_type=self.connector_type, status="skipped", source=source, message=f"repo {context.repo_slug!r} not present in manifest", started_at=started_at, ) rel_source = _display_path(self.manifest_path, context.repo_path) scope = { "id": replacement_scope_id( context.repo_slug, self.connector_id, "fabric_registry", source_path=rel_source, ), "extractor_id": self.connector_id, "source_kind": "fabric_registry", "source_path": rel_source, "mode": "replacement", "description": "Local Fabric registry onboarding manifest evidence.", } anchor = _source_anchor("fabric_registry", rel_source, json_pointer=f"/repositories/{match_index}") provenance = { "extractor_id": self.connector_id, "extractor_version": "0.1.0", "method": "connector", "origin": "registry", } repo_key = _repository_key(context.snapshot, context.repo_slug) entry_key = discovery_stable_key(context.repo_slug, "FabricRegistryEntry", context.repo_slug) node_mapping = node_canon_mapping("FabricRegistryEntry") node = { "stable_key": entry_key, "kind": "FabricRegistryEntry", "label": str(match.get("name") or context.repo_slug), "repo": context.repo_slug, "domain": str(match.get("domain") or ""), "canon_category": node_mapping.category, "canon_anchor": node_mapping.canon_anchor, "mapping_fit": node_mapping.fit, "evidence_state": evidence_state_for( origin="registry", source_kind="fabric_registry", review_state="candidate", confidence=0.9, ), "aliases": _unique_strings([context.repo_slug, match.get("name")]), "attributes": { "registry_slug": context.repo_slug, "registry_name": match.get("name") or "", "registry_domain": match.get("domain") or "", "registry_path": match.get("path") or "", "registry_remote_url": match.get("remote_url") or "", "registry_default_branch": match.get("default_branch") or "", "state_hub_repo_id": match.get("state_hub_repo_id") or "", "declaration_paths": match.get("declaration_paths") if isinstance(match.get("declaration_paths"), list) else [], }, "origin": "registry", "review_state": "candidate", "status": "active", "confidence": 0.9, "replacement_scope": scope["id"], "provenance": [provenance], "source_anchors": [anchor], } edge_mapping = edge_canon_mapping("cataloged_as") edge = { "stable_key": relationship_stable_key(repo_key, "cataloged_as", entry_key, evidence_scope=scope["id"]), "edge_type": "cataloged_as", "canonical_type": edge_mapping.canonical_type, "canon_anchor": edge_mapping.canon_anchor, "mapping_fit": edge_mapping.fit, "display_only": edge_mapping.display_only, "evidence_state": evidence_state_for( origin="registry", source_kind="fabric_registry", review_state="candidate", confidence=0.9, ), "source_key": repo_key, "target_key": entry_key, "origin": "registry", "review_state": "candidate", "status": "active", "confidence": 0.9, "replacement_scope": scope["id"], "provenance": [provenance], "source_anchors": [anchor], } attributes = [ _attribute(repo_key, name, value, scope["id"], provenance, anchor) for name, value in node["attributes"].items() if value not in ("", [], None) ] return _connector_result( connector_id=self.connector_id, connector_type=self.connector_type, status="success", source=source, message=f"matched repo {context.repo_slug}", started_at=started_at, replacement_scopes=[scope], candidates={ "nodes": [node], "edges": [edge], "attributes": attributes, }, ) def apply_connectors( snapshot: dict[str, Any], *, repo_path: Path, configs: list[ConnectorConfig], ) -> dict[str, Any]: result = _copy_json(snapshot) context = ConnectorContext( repo_path=repo_path, repo_slug=str((snapshot.get("source") or {}).get("repo_slug") or repo_path.name), snapshot=result, ) for config in configs: if not config.enabled: _merge_connector_result( result, _connector_result( connector_id=config.connector_id, connector_type=config.connector_type, status="skipped", source=config.source_path or "", message="connector disabled", started_at=_utc_now(), ), ) continue try: connector = create_connector(config) connector_result = connector.collect(context) except Exception as exc: connector_result = _connector_result( connector_id=config.connector_id, connector_type=config.connector_type, status="failed", source=config.source_path or "", message=str(exc), started_at=_utc_now(), review_artifacts=[ _review_artifact( artifact_type="connector_failed", origin="registry", message=f"Connector {config.connector_id} failed: {exc}", payload={"connector_type": config.connector_type, "source": config.source_path or ""}, ) ], ) _merge_connector_result(result, connector_result) return result def create_connector(config: ConnectorConfig) -> DiscoveryConnector: if config.connector_id == LocalFabricRegistryConnector.connector_id: source = Path(config.source_path or "registry/local-repos.yaml") return LocalFabricRegistryConnector(source) raise ValueError(f"unknown connector: {config.connector_id}") def _connector_result( *, connector_id: str, connector_type: str, status: str, source: str, message: str, started_at: str, replacement_scopes: list[dict[str, object]] | None = None, candidates: dict[str, list[dict[str, object]]] | None = None, review_artifacts: list[dict[str, object]] | None = None, ) -> ConnectorResult: candidates = candidates or {"nodes": [], "edges": [], "attributes": []} return ConnectorResult( connector_run={ "connector_id": connector_id, "connector_type": connector_type, "status": status, "source": source, "message": message, "candidate_counts": { "nodes": len(candidates.get("nodes", [])), "edges": len(candidates.get("edges", [])), "attributes": len(candidates.get("attributes", [])), }, "started_at": started_at, "completed_at": _utc_now(), }, replacement_scopes=replacement_scopes or [], candidates=candidates, review_artifacts=review_artifacts or [], ) def _merge_connector_result(snapshot: dict[str, Any], result: ConnectorResult) -> None: scopes = { str(scope.get("id")): scope for scope in snapshot.setdefault("replacement_scopes", []) if isinstance(scope, dict) } for scope in result.replacement_scopes: scopes[str(scope["id"])] = scope snapshot["replacement_scopes"] = [scopes[key] for key in sorted(scopes)] candidates = snapshot.setdefault("candidates", {"nodes": [], "edges": [], "attributes": []}) for collection in ("nodes", "edges", "attributes"): current = { str(item.get("stable_key")): item for item in candidates.setdefault(collection, []) if isinstance(item, dict) and item.get("stable_key") } for incoming in result.candidates.get(collection, []): key = str(incoming.get("stable_key")) current[key] = _merge_candidate(current.get(key), incoming) candidates[collection] = [current[key] for key in sorted(current)] if result.review_artifacts: snapshot.setdefault("review_artifacts", []).extend(result.review_artifacts) snapshot.setdefault("connector_runs", []).append(result.connector_run) def _merge_candidate(existing: dict[str, object] | None, incoming: dict[str, object]) -> dict[str, object]: if existing is None: return incoming merged = {**existing} for field in ("aliases", "provenance", "source_anchors"): values = [*list(existing.get(field, [])), *list(incoming.get(field, []))] if values: merged[field] = _unique_json(values) if field != "aliases" else _unique_strings(values) if isinstance(existing.get("attributes"), dict) or isinstance(incoming.get("attributes"), dict): merged["attributes"] = { **(existing.get("attributes") if isinstance(existing.get("attributes"), dict) else {}), **(incoming.get("attributes") if isinstance(incoming.get("attributes"), dict) else {}), } if isinstance(existing.get("confidence"), (int, float)) and isinstance(incoming.get("confidence"), (int, float)): merged["confidence"] = max(float(existing["confidence"]), float(incoming["confidence"])) return merged def _manifest_repo(repositories: list[object], repo_slug: str) -> tuple[int, dict[str, object] | None]: for index, item in enumerate(repositories): if isinstance(item, dict) and item.get("slug") == repo_slug: return index, item return -1, None def _repository_key(snapshot: dict[str, Any], repo_slug: str) -> str: candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {} for node in candidates.get("nodes", []): if not isinstance(node, dict): continue if node.get("kind") == "Repository": return str(node.get("stable_key")) return discovery_stable_key(repo_slug, "Repository", repo_slug) def _attribute( entity_key: str, name: str, value: object, replacement_scope: str, provenance: dict[str, object], source_anchor: dict[str, object], ) -> dict[str, object]: return { "stable_key": attribute_stable_key(entity_key, name), "entity_key": entity_key, "name": name, "value": _json_value(value), "origin": "registry", "review_state": "candidate", "confidence": 0.9, "replacement_scope": replacement_scope, "provenance": [provenance], "source_anchors": [source_anchor], } def _source_anchor(source_kind: str, path: str, *, json_pointer: str | None = None) -> dict[str, object]: anchor: dict[str, object] = {"source_kind": source_kind, "path": path} if json_pointer: anchor["json_pointer"] = json_pointer anchor["fingerprint"] = source_fingerprint(anchor) return anchor def _review_artifact( *, artifact_type: str, origin: str, message: str, payload: dict[str, object], ) -> dict[str, object]: body = { "artifact_type": artifact_type, "origin": origin, "message": message, "payload": payload, "created_at": _utc_now(), } return {"id": f"review:{short_fingerprint(body, length=20)}", **body} def _display_path(path: Path, repo_path: Path) -> str: try: return path.resolve().relative_to(repo_path.resolve()).as_posix() except ValueError: return str(path) def _json_value(value: object) -> object: if value is None or isinstance(value, (str, int, float, bool)): return value if isinstance(value, list): return [_json_value(item) for item in value] if isinstance(value, dict): return {str(key): _json_value(item) for key, item in value.items()} return str(value) def _unique_strings(values: list[object]) -> list[str]: seen: set[str] = set() result: list[str] = [] for value in values: text = str(value or "").strip() if not text or text in seen: continue seen.add(text) result.append(text) return result def _unique_json(values: list[object]) -> list[object]: seen: set[str] = set() result: list[object] = [] for value in values: key = json.dumps(value, sort_keys=True, default=str) if key in seen: continue seen.add(key) result.append(value) return result def _copy_json(value: Any) -> Any: return json.loads(json.dumps(value, default=str)) def _utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")