generated from coulomb/repo-seed
514 lines
18 KiB
Python
514 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Protocol
|
|
|
|
from .canon import edge_canon_mapping, evidence_state_for, node_canon_mapping
|
|
from .discovery import (
|
|
attribute_stable_key,
|
|
discovery_stable_key,
|
|
relationship_stable_key,
|
|
replacement_scope_id,
|
|
short_fingerprint,
|
|
source_fingerprint,
|
|
)
|
|
from .loader import load_yaml
|
|
|
|
|
|
CONNECTOR_TYPES = {
|
|
"package_registry": "Package registry metadata such as package descriptions, versions, and ownership.",
|
|
"container_registry": "Container image metadata such as image names, tags, digests, and labels.",
|
|
"api_catalog": "API catalog metadata such as OpenAPI/AsyncAPI records and ownership.",
|
|
"service_catalog": "Service catalog metadata such as service ownership, lifecycle, and dependencies.",
|
|
"deployment_inventory": "Deployment inventory metadata such as environments, workloads, and runtime placement.",
|
|
"fabric_registry": "Existing Railiance Fabric registry or onboarding manifest data.",
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ConnectorConfig:
|
|
connector_id: str
|
|
connector_type: str
|
|
source_path: str | None = None
|
|
enabled: bool = True
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ConnectorContext:
|
|
repo_path: Path
|
|
repo_slug: str
|
|
snapshot: dict[str, Any]
|
|
|
|
|
|
@dataclass
|
|
class ConnectorResult:
|
|
connector_run: dict[str, object]
|
|
replacement_scopes: list[dict[str, object]]
|
|
candidates: dict[str, list[dict[str, object]]]
|
|
review_artifacts: list[dict[str, object]]
|
|
|
|
|
|
class DiscoveryConnector(Protocol):
|
|
connector_id: str
|
|
connector_type: str
|
|
|
|
def collect(self, context: ConnectorContext) -> ConnectorResult:
|
|
"""Collect connector evidence without mutating ``context.snapshot``."""
|
|
|
|
|
|
class PackageRegistryConnector(DiscoveryConnector, Protocol):
|
|
pass
|
|
|
|
|
|
class ContainerRegistryConnector(DiscoveryConnector, Protocol):
|
|
pass
|
|
|
|
|
|
class ApiCatalogConnector(DiscoveryConnector, Protocol):
|
|
pass
|
|
|
|
|
|
class ServiceCatalogConnector(DiscoveryConnector, Protocol):
|
|
pass
|
|
|
|
|
|
class DeploymentInventoryConnector(DiscoveryConnector, Protocol):
|
|
pass
|
|
|
|
|
|
class FabricRegistryConnector(DiscoveryConnector, Protocol):
|
|
pass
|
|
|
|
|
|
class LocalFabricRegistryConnector:
|
|
connector_id = "local-fabric-registry"
|
|
connector_type = "fabric_registry"
|
|
|
|
def __init__(self, manifest_path: Path) -> None:
|
|
self.manifest_path = manifest_path
|
|
|
|
def collect(self, context: ConnectorContext) -> ConnectorResult:
|
|
started_at = _utc_now()
|
|
source = str(self.manifest_path)
|
|
if not self.manifest_path.is_file():
|
|
return _connector_result(
|
|
connector_id=self.connector_id,
|
|
connector_type=self.connector_type,
|
|
status="unavailable",
|
|
source=source,
|
|
message=f"manifest not found: {self.manifest_path}",
|
|
started_at=started_at,
|
|
review_artifacts=[
|
|
_review_artifact(
|
|
artifact_type="connector_unavailable",
|
|
origin="registry",
|
|
message=f"Local Fabric registry manifest not found: {self.manifest_path}",
|
|
payload={"source": source},
|
|
)
|
|
],
|
|
)
|
|
|
|
try:
|
|
manifest = load_yaml(self.manifest_path)
|
|
except Exception as exc:
|
|
return _connector_result(
|
|
connector_id=self.connector_id,
|
|
connector_type=self.connector_type,
|
|
status="failed",
|
|
source=source,
|
|
message=str(exc),
|
|
started_at=started_at,
|
|
review_artifacts=[
|
|
_review_artifact(
|
|
artifact_type="connector_failed",
|
|
origin="registry",
|
|
message=f"Cannot load Local Fabric registry manifest: {exc}",
|
|
payload={"source": source},
|
|
)
|
|
],
|
|
)
|
|
|
|
repositories = manifest.get("repositories") if isinstance(manifest, dict) else None
|
|
if not isinstance(repositories, list):
|
|
return _connector_result(
|
|
connector_id=self.connector_id,
|
|
connector_type=self.connector_type,
|
|
status="failed",
|
|
source=source,
|
|
message="manifest requires a repositories list",
|
|
started_at=started_at,
|
|
review_artifacts=[
|
|
_review_artifact(
|
|
artifact_type="connector_failed",
|
|
origin="registry",
|
|
message="Local Fabric registry manifest requires a repositories list.",
|
|
payload={"source": source},
|
|
)
|
|
],
|
|
)
|
|
|
|
match_index, match = _manifest_repo(repositories, context.repo_slug)
|
|
if match is None:
|
|
return _connector_result(
|
|
connector_id=self.connector_id,
|
|
connector_type=self.connector_type,
|
|
status="skipped",
|
|
source=source,
|
|
message=f"repo {context.repo_slug!r} not present in manifest",
|
|
started_at=started_at,
|
|
)
|
|
|
|
rel_source = _display_path(self.manifest_path, context.repo_path)
|
|
scope = {
|
|
"id": replacement_scope_id(
|
|
context.repo_slug,
|
|
self.connector_id,
|
|
"fabric_registry",
|
|
source_path=rel_source,
|
|
),
|
|
"extractor_id": self.connector_id,
|
|
"source_kind": "fabric_registry",
|
|
"source_path": rel_source,
|
|
"mode": "replacement",
|
|
"description": "Local Fabric registry onboarding manifest evidence.",
|
|
}
|
|
anchor = _source_anchor("fabric_registry", rel_source, json_pointer=f"/repositories/{match_index}")
|
|
provenance = {
|
|
"extractor_id": self.connector_id,
|
|
"extractor_version": "0.1.0",
|
|
"method": "connector",
|
|
"origin": "registry",
|
|
}
|
|
repo_key = _repository_key(context.snapshot, context.repo_slug)
|
|
entry_key = discovery_stable_key(context.repo_slug, "FabricRegistryEntry", context.repo_slug)
|
|
node_mapping = node_canon_mapping("FabricRegistryEntry")
|
|
node = {
|
|
"stable_key": entry_key,
|
|
"kind": "FabricRegistryEntry",
|
|
"label": str(match.get("name") or context.repo_slug),
|
|
"repo": context.repo_slug,
|
|
"domain": str(match.get("domain") or ""),
|
|
"canon_category": node_mapping.category,
|
|
"canon_anchor": node_mapping.canon_anchor,
|
|
"mapping_fit": node_mapping.fit,
|
|
"evidence_state": evidence_state_for(
|
|
origin="registry",
|
|
source_kind="fabric_registry",
|
|
review_state="candidate",
|
|
confidence=0.9,
|
|
),
|
|
"aliases": _unique_strings([context.repo_slug, match.get("name")]),
|
|
"attributes": {
|
|
"registry_slug": context.repo_slug,
|
|
"registry_name": match.get("name") or "",
|
|
"registry_domain": match.get("domain") or "",
|
|
"registry_path": match.get("path") or "",
|
|
"registry_remote_url": match.get("remote_url") or "",
|
|
"registry_default_branch": match.get("default_branch") or "",
|
|
"state_hub_repo_id": match.get("state_hub_repo_id") or "",
|
|
"declaration_paths": match.get("declaration_paths") if isinstance(match.get("declaration_paths"), list) else [],
|
|
},
|
|
"origin": "registry",
|
|
"review_state": "candidate",
|
|
"status": "active",
|
|
"confidence": 0.9,
|
|
"replacement_scope": scope["id"],
|
|
"provenance": [provenance],
|
|
"source_anchors": [anchor],
|
|
}
|
|
edge_mapping = edge_canon_mapping("cataloged_as")
|
|
edge = {
|
|
"stable_key": relationship_stable_key(repo_key, "cataloged_as", entry_key, evidence_scope=scope["id"]),
|
|
"edge_type": "cataloged_as",
|
|
"canonical_type": edge_mapping.canonical_type,
|
|
"canon_anchor": edge_mapping.canon_anchor,
|
|
"mapping_fit": edge_mapping.fit,
|
|
"display_only": edge_mapping.display_only,
|
|
"evidence_state": evidence_state_for(
|
|
origin="registry",
|
|
source_kind="fabric_registry",
|
|
review_state="candidate",
|
|
confidence=0.9,
|
|
),
|
|
"source_key": repo_key,
|
|
"target_key": entry_key,
|
|
"origin": "registry",
|
|
"review_state": "candidate",
|
|
"status": "active",
|
|
"confidence": 0.9,
|
|
"replacement_scope": scope["id"],
|
|
"provenance": [provenance],
|
|
"source_anchors": [anchor],
|
|
}
|
|
attributes = [
|
|
_attribute(repo_key, name, value, scope["id"], provenance, anchor)
|
|
for name, value in node["attributes"].items()
|
|
if value not in ("", [], None)
|
|
]
|
|
return _connector_result(
|
|
connector_id=self.connector_id,
|
|
connector_type=self.connector_type,
|
|
status="success",
|
|
source=source,
|
|
message=f"matched repo {context.repo_slug}",
|
|
started_at=started_at,
|
|
replacement_scopes=[scope],
|
|
candidates={
|
|
"nodes": [node],
|
|
"edges": [edge],
|
|
"attributes": attributes,
|
|
},
|
|
)
|
|
|
|
|
|
def apply_connectors(
|
|
snapshot: dict[str, Any],
|
|
*,
|
|
repo_path: Path,
|
|
configs: list[ConnectorConfig],
|
|
) -> dict[str, Any]:
|
|
result = _copy_json(snapshot)
|
|
context = ConnectorContext(
|
|
repo_path=repo_path,
|
|
repo_slug=str((snapshot.get("source") or {}).get("repo_slug") or repo_path.name),
|
|
snapshot=result,
|
|
)
|
|
for config in configs:
|
|
if not config.enabled:
|
|
_merge_connector_result(
|
|
result,
|
|
_connector_result(
|
|
connector_id=config.connector_id,
|
|
connector_type=config.connector_type,
|
|
status="skipped",
|
|
source=config.source_path or "",
|
|
message="connector disabled",
|
|
started_at=_utc_now(),
|
|
),
|
|
)
|
|
continue
|
|
try:
|
|
connector = create_connector(config)
|
|
connector_result = connector.collect(context)
|
|
except Exception as exc:
|
|
connector_result = _connector_result(
|
|
connector_id=config.connector_id,
|
|
connector_type=config.connector_type,
|
|
status="failed",
|
|
source=config.source_path or "",
|
|
message=str(exc),
|
|
started_at=_utc_now(),
|
|
review_artifacts=[
|
|
_review_artifact(
|
|
artifact_type="connector_failed",
|
|
origin="registry",
|
|
message=f"Connector {config.connector_id} failed: {exc}",
|
|
payload={"connector_type": config.connector_type, "source": config.source_path or ""},
|
|
)
|
|
],
|
|
)
|
|
_merge_connector_result(result, connector_result)
|
|
return result
|
|
|
|
|
|
def create_connector(config: ConnectorConfig) -> DiscoveryConnector:
|
|
if config.connector_id == LocalFabricRegistryConnector.connector_id:
|
|
source = Path(config.source_path or "registry/local-repos.yaml")
|
|
return LocalFabricRegistryConnector(source)
|
|
raise ValueError(f"unknown connector: {config.connector_id}")
|
|
|
|
|
|
def _connector_result(
|
|
*,
|
|
connector_id: str,
|
|
connector_type: str,
|
|
status: str,
|
|
source: str,
|
|
message: str,
|
|
started_at: str,
|
|
replacement_scopes: list[dict[str, object]] | None = None,
|
|
candidates: dict[str, list[dict[str, object]]] | None = None,
|
|
review_artifacts: list[dict[str, object]] | None = None,
|
|
) -> ConnectorResult:
|
|
candidates = candidates or {"nodes": [], "edges": [], "attributes": []}
|
|
return ConnectorResult(
|
|
connector_run={
|
|
"connector_id": connector_id,
|
|
"connector_type": connector_type,
|
|
"status": status,
|
|
"source": source,
|
|
"message": message,
|
|
"candidate_counts": {
|
|
"nodes": len(candidates.get("nodes", [])),
|
|
"edges": len(candidates.get("edges", [])),
|
|
"attributes": len(candidates.get("attributes", [])),
|
|
},
|
|
"started_at": started_at,
|
|
"completed_at": _utc_now(),
|
|
},
|
|
replacement_scopes=replacement_scopes or [],
|
|
candidates=candidates,
|
|
review_artifacts=review_artifacts or [],
|
|
)
|
|
|
|
|
|
def _merge_connector_result(snapshot: dict[str, Any], result: ConnectorResult) -> None:
|
|
scopes = {
|
|
str(scope.get("id")): scope
|
|
for scope in snapshot.setdefault("replacement_scopes", [])
|
|
if isinstance(scope, dict)
|
|
}
|
|
for scope in result.replacement_scopes:
|
|
scopes[str(scope["id"])] = scope
|
|
snapshot["replacement_scopes"] = [scopes[key] for key in sorted(scopes)]
|
|
|
|
candidates = snapshot.setdefault("candidates", {"nodes": [], "edges": [], "attributes": []})
|
|
for collection in ("nodes", "edges", "attributes"):
|
|
current = {
|
|
str(item.get("stable_key")): item
|
|
for item in candidates.setdefault(collection, [])
|
|
if isinstance(item, dict) and item.get("stable_key")
|
|
}
|
|
for incoming in result.candidates.get(collection, []):
|
|
key = str(incoming.get("stable_key"))
|
|
current[key] = _merge_candidate(current.get(key), incoming)
|
|
candidates[collection] = [current[key] for key in sorted(current)]
|
|
|
|
if result.review_artifacts:
|
|
snapshot.setdefault("review_artifacts", []).extend(result.review_artifacts)
|
|
snapshot.setdefault("connector_runs", []).append(result.connector_run)
|
|
|
|
|
|
def _merge_candidate(existing: dict[str, object] | None, incoming: dict[str, object]) -> dict[str, object]:
|
|
if existing is None:
|
|
return incoming
|
|
merged = {**existing}
|
|
for field in ("aliases", "provenance", "source_anchors"):
|
|
values = [*list(existing.get(field, [])), *list(incoming.get(field, []))]
|
|
if values:
|
|
merged[field] = _unique_json(values) if field != "aliases" else _unique_strings(values)
|
|
if isinstance(existing.get("attributes"), dict) or isinstance(incoming.get("attributes"), dict):
|
|
merged["attributes"] = {
|
|
**(existing.get("attributes") if isinstance(existing.get("attributes"), dict) else {}),
|
|
**(incoming.get("attributes") if isinstance(incoming.get("attributes"), dict) else {}),
|
|
}
|
|
if isinstance(existing.get("confidence"), (int, float)) and isinstance(incoming.get("confidence"), (int, float)):
|
|
merged["confidence"] = max(float(existing["confidence"]), float(incoming["confidence"]))
|
|
return merged
|
|
|
|
|
|
def _manifest_repo(repositories: list[object], repo_slug: str) -> tuple[int, dict[str, object] | None]:
|
|
for index, item in enumerate(repositories):
|
|
if isinstance(item, dict) and item.get("slug") == repo_slug:
|
|
return index, item
|
|
return -1, None
|
|
|
|
|
|
def _repository_key(snapshot: dict[str, Any], repo_slug: str) -> str:
|
|
candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {}
|
|
for node in candidates.get("nodes", []):
|
|
if not isinstance(node, dict):
|
|
continue
|
|
if node.get("kind") == "Repository":
|
|
return str(node.get("stable_key"))
|
|
return discovery_stable_key(repo_slug, "Repository", repo_slug)
|
|
|
|
|
|
def _attribute(
|
|
entity_key: str,
|
|
name: str,
|
|
value: object,
|
|
replacement_scope: str,
|
|
provenance: dict[str, object],
|
|
source_anchor: dict[str, object],
|
|
) -> dict[str, object]:
|
|
return {
|
|
"stable_key": attribute_stable_key(entity_key, name),
|
|
"entity_key": entity_key,
|
|
"name": name,
|
|
"value": _json_value(value),
|
|
"origin": "registry",
|
|
"review_state": "candidate",
|
|
"confidence": 0.9,
|
|
"replacement_scope": replacement_scope,
|
|
"provenance": [provenance],
|
|
"source_anchors": [source_anchor],
|
|
}
|
|
|
|
|
|
def _source_anchor(source_kind: str, path: str, *, json_pointer: str | None = None) -> dict[str, object]:
|
|
anchor: dict[str, object] = {"source_kind": source_kind, "path": path}
|
|
if json_pointer:
|
|
anchor["json_pointer"] = json_pointer
|
|
anchor["fingerprint"] = source_fingerprint(anchor)
|
|
return anchor
|
|
|
|
|
|
def _review_artifact(
|
|
*,
|
|
artifact_type: str,
|
|
origin: str,
|
|
message: str,
|
|
payload: dict[str, object],
|
|
) -> dict[str, object]:
|
|
body = {
|
|
"artifact_type": artifact_type,
|
|
"origin": origin,
|
|
"message": message,
|
|
"payload": payload,
|
|
"created_at": _utc_now(),
|
|
}
|
|
return {"id": f"review:{short_fingerprint(body, length=20)}", **body}
|
|
|
|
|
|
def _display_path(path: Path, repo_path: Path) -> str:
|
|
try:
|
|
return path.resolve().relative_to(repo_path.resolve()).as_posix()
|
|
except ValueError:
|
|
return str(path)
|
|
|
|
|
|
def _json_value(value: object) -> object:
|
|
if value is None or isinstance(value, (str, int, float, bool)):
|
|
return value
|
|
if isinstance(value, list):
|
|
return [_json_value(item) for item in value]
|
|
if isinstance(value, dict):
|
|
return {str(key): _json_value(item) for key, item in value.items()}
|
|
return str(value)
|
|
|
|
|
|
def _unique_strings(values: list[object]) -> list[str]:
|
|
seen: set[str] = set()
|
|
result: list[str] = []
|
|
for value in values:
|
|
text = str(value or "").strip()
|
|
if not text or text in seen:
|
|
continue
|
|
seen.add(text)
|
|
result.append(text)
|
|
return result
|
|
|
|
|
|
def _unique_json(values: list[object]) -> list[object]:
|
|
seen: set[str] = set()
|
|
result: list[object] = []
|
|
for value in values:
|
|
key = json.dumps(value, sort_keys=True, default=str)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
result.append(value)
|
|
return result
|
|
|
|
|
|
def _copy_json(value: Any) -> Any:
|
|
return json.loads(json.dumps(value, default=str))
|
|
|
|
|
|
def _utc_now() -> str:
|
|
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|