from __future__ import annotations import hashlib import json from datetime import datetime, timezone from typing import Any from pydantic import ValidationError from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from api.models.fabric_graph import FabricGraphEdge, FabricGraphImport, FabricGraphNode from api.models.progress_event import ProgressEvent from api.schemas.fabric_graph import FabricGraphExportPayload DISPLAY_ONLY_EDGE_TYPES = { "collapsed_into", "declares", "grouped_with", "highlight_path", "near", "owns_deployment", "same_color_group", } class FabricGraphValidationError(Exception): def __init__(self, detail: dict[str, Any]) -> None: super().__init__(str(detail)) self.detail = detail def split_graph_ingest_body(body: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]: """Support both direct FabricGraphExport payloads and the documented wrapper.""" if isinstance(body.get("graph"), dict): metadata = {key: value for key, value in body.items() if key != "graph"} return body["graph"], metadata return body, {} async def ingest_fabric_graph_export( session: AsyncSession, payload: dict[str, Any], *, source_repo_slug: str, source_url: str | None, requested_by: str, ) -> tuple[FabricGraphImport, bool, bool]: now = datetime.now(timezone.utc) content_hash = graph_content_hash(payload) try: export = validate_fabric_graph_export(payload) except ValueError as exc: import_run = await _record_invalid_import( session, payload, source_repo_slug=source_repo_slug, source_url=source_url, content_hash=content_hash, error=str(exc), now=now, requested_by=requested_by, ) raise FabricGraphValidationError( { "message": str(exc), "import_id": str(import_run.id), "validation_status": import_run.validation_status, } ) from exc existing = await _find_import(session, source_repo_slug, content_hash) if existing and existing.validation_status == "valid": await _mark_latest(session, existing) existing.last_seen_at = now await _record_progress( session, "Fabric graph export already ingested; refreshed latest marker.", { "source_repo_slug": source_repo_slug, "source_url": source_url, "content_hash": content_hash, "node_count": existing.node_count, "edge_count": existing.edge_count, "requested_by": requested_by, "idempotent": True, }, ) await session.commit() await session.refresh(existing) return existing, False, True source = export.source import_run = FabricGraphImport( source_repo_slug=source_repo_slug, source_url=source_url, source_commit=source.commit if source else None, source_path=source.path if source else None, api_version=export.api_version, export_kind=export.kind, exported_at=export.generated_at, content_hash=content_hash, node_count=len(export.nodes), edge_count=len(export.edges), validation_status="valid", error_details=None, graph_json=export.model_dump(mode="json", by_alias=True), is_latest=True, last_seen_at=now, ) await _mark_previous_not_latest(session, source_repo_slug) session.add(import_run) await session.flush() for node in export.nodes: raw = node.model_dump(mode="json") session.add( FabricGraphNode( import_id=import_run.id, source_repo_slug=source_repo_slug, graph_id=node.id, kind=node.kind, name=node.name, repo_slug=node.repo, domain_slug=node.domain, lifecycle=node.lifecycle, canonical_type=raw.get("canonical_type"), canon_category=node.canon_category, canon_anchor=node.canon_anchor, mapping_fit=node.mapping_fit, evidence_state=node.evidence_state, display_only=bool(raw.get("display_only", False)), attributes=node.attributes, raw_json=raw, ) ) for edge in export.edges: raw = edge.model_dump(mode="json", by_alias=True) session.add( FabricGraphEdge( import_id=import_run.id, source_repo_slug=source_repo_slug, edge_key=edge_key(raw), from_graph_id=edge.from_graph_id, to_graph_id=edge.to_graph_id, edge_type=edge.edge_type, canonical_type=edge.canonical_type, canon_anchor=edge.canon_anchor, mapping_fit=edge.mapping_fit, evidence_state=edge.evidence_state, display_only=bool(edge.display_only), attributes=edge.attributes, raw_json=raw, ) ) await _record_progress( session, "Fabric graph export ingested as State Hub read model.", { "source_repo_slug": source_repo_slug, "source_url": source_url, "content_hash": content_hash, "node_count": len(export.nodes), "edge_count": len(export.edges), "requested_by": requested_by, }, ) await session.commit() await session.refresh(import_run) return import_run, True, False def validate_fabric_graph_export(payload: dict[str, Any]) -> FabricGraphExportPayload: try: export = FabricGraphExportPayload.model_validate(payload) except ValidationError as exc: first = exc.errors()[0] location = ".".join(str(part) for part in first.get("loc", [])) or "" message = first.get("msg", "invalid payload") raise ValueError(f"invalid FabricGraphExport at {location}: {message}") from exc canon_errors = _canon_metadata_errors(export) if canon_errors: raise ValueError(f"invalid FabricGraphExport canon metadata: {canon_errors[0]}") return export def graph_content_hash(payload: dict[str, Any]) -> str: canonical = _canonical_payload(payload) raw = json.dumps(canonical, sort_keys=True, separators=(",", ":"), ensure_ascii=True) return hashlib.sha256(raw.encode("utf-8")).hexdigest() def edge_key(edge: dict[str, Any]) -> str: raw = json.dumps(edge, sort_keys=True, separators=(",", ":"), ensure_ascii=True) return hashlib.sha256(raw.encode("utf-8")).hexdigest() async def record_fabric_graph_error( session: AsyncSession, summary: str, *, source_repo_slug: str, source_url: str | None, error: str, requested_by: str, ) -> None: await _record_progress( session, summary, { "source_repo_slug": source_repo_slug, "source_url": source_url, "error": error, "requested_by": requested_by, }, ) await session.commit() async def _find_import( session: AsyncSession, source_repo_slug: str, content_hash: str ) -> FabricGraphImport | None: result = await session.execute( select(FabricGraphImport).where( FabricGraphImport.source_repo_slug == source_repo_slug, FabricGraphImport.content_hash == content_hash, ) ) return result.scalar_one_or_none() async def _mark_latest(session: AsyncSession, import_run: FabricGraphImport) -> None: await _mark_previous_not_latest(session, import_run.source_repo_slug, exclude_id=import_run.id) import_run.is_latest = True async def _mark_previous_not_latest( session: AsyncSession, source_repo_slug: str, *, exclude_id: Any | None = None, ) -> None: stmt = update(FabricGraphImport).where(FabricGraphImport.source_repo_slug == source_repo_slug) if exclude_id is not None: stmt = stmt.where(FabricGraphImport.id != exclude_id) await session.execute(stmt.values(is_latest=False)) async def _record_invalid_import( session: AsyncSession, payload: dict[str, Any], *, source_repo_slug: str, source_url: str | None, content_hash: str, error: str, now: datetime, requested_by: str, ) -> FabricGraphImport: existing = await _find_import(session, source_repo_slug, content_hash) if existing and existing.validation_status == "invalid": existing.last_seen_at = now import_run = existing else: import_run = FabricGraphImport( source_repo_slug=source_repo_slug, source_url=source_url, source_commit=_source_value(payload, "commit"), source_path=_source_value(payload, "path"), api_version=str(payload.get("apiVersion")) if payload.get("apiVersion") else None, export_kind=str(payload.get("kind")) if payload.get("kind") else None, exported_at=_parse_datetime(payload.get("generated_at")), content_hash=content_hash, node_count=0, edge_count=0, validation_status="invalid", error_details={"error": error}, graph_json=payload, is_latest=False, last_seen_at=now, ) session.add(import_run) await session.flush() await _record_progress( session, "Fabric graph export rejected during validation.", { "source_repo_slug": source_repo_slug, "source_url": source_url, "content_hash": content_hash, "error": error, "requested_by": requested_by, }, ) await session.commit() await session.refresh(import_run) return import_run async def _record_progress(session: AsyncSession, summary: str, detail: dict[str, Any]) -> None: session.add( ProgressEvent( event_type="fabric_graph_import", summary=summary, detail=detail, author="state-hub", ) ) def _canonical_payload(payload: dict[str, Any]) -> dict[str, Any]: canonical = json.loads(json.dumps(payload, sort_keys=True, default=str)) canonical.pop("generated_at", None) return canonical def _canon_metadata_errors(export: FabricGraphExportPayload) -> list[str]: errors: list[str] = [] for index, node in enumerate(export.nodes): if any( value is not None for value in ( node.canon_category, node.canon_anchor, node.mapping_fit, node.evidence_state, ) ): _require_fields( errors, f"nodes[{index}]", { "canon_category": node.canon_category, "mapping_fit": node.mapping_fit, "evidence_state": node.evidence_state, }, ("canon_category", "mapping_fit", "evidence_state"), ) for index, edge in enumerate(export.edges): has_canon_fields = any( value is not None for value in ( edge.canonical_type, edge.canon_anchor, edge.mapping_fit, edge.display_only, edge.evidence_state, ) ) if has_canon_fields: _require_fields( errors, f"edges[{index}]", { "mapping_fit": edge.mapping_fit, "display_only": edge.display_only, "evidence_state": edge.evidence_state, }, ("mapping_fit", "display_only", "evidence_state"), ) if edge.edge_type in DISPLAY_ONLY_EDGE_TYPES and edge.display_only is not True: errors.append( f"edges[{index}] uses display-only edge type {edge.edge_type!r} without display_only=true" ) if edge.display_only is True and edge.edge_type and not has_canon_fields: errors.append(f"edges[{index}] is display-only but lacks canon metadata") return errors def _require_fields( errors: list[str], path: str, item: dict[str, Any], fields: tuple[str, ...], ) -> None: for field in fields: if item.get(field) in (None, ""): errors.append(f"{path} missing required canon metadata field {field!r}") def _source_value(payload: dict[str, Any], field: str) -> str | None: source = payload.get("source") if not isinstance(source, dict): return None value = source.get(field) return str(value) if value else None def _parse_datetime(value: Any) -> datetime | None: if not isinstance(value, str) or not value: return None try: normalized = value.replace("Z", "+00:00") return datetime.fromisoformat(normalized) except ValueError: return None