Files
state-hub/api/services/fabric_graph.py

401 lines
13 KiB
Python

from __future__ import annotations
import hashlib
import json
from datetime import datetime, timezone
from typing import Any
from pydantic import ValidationError
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from api.models.fabric_graph import FabricGraphEdge, FabricGraphImport, FabricGraphNode
from api.models.progress_event import ProgressEvent
from api.schemas.fabric_graph import FabricGraphExportPayload
DISPLAY_ONLY_EDGE_TYPES = {
"collapsed_into",
"declares",
"grouped_with",
"highlight_path",
"near",
"owns_deployment",
"same_color_group",
}
class FabricGraphValidationError(Exception):
def __init__(self, detail: dict[str, Any]) -> None:
super().__init__(str(detail))
self.detail = detail
def split_graph_ingest_body(body: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
"""Support both direct FabricGraphExport payloads and the documented wrapper."""
if isinstance(body.get("graph"), dict):
metadata = {key: value for key, value in body.items() if key != "graph"}
return body["graph"], metadata
return body, {}
async def ingest_fabric_graph_export(
session: AsyncSession,
payload: dict[str, Any],
*,
source_repo_slug: str,
source_url: str | None,
requested_by: str,
) -> tuple[FabricGraphImport, bool, bool]:
now = datetime.now(timezone.utc)
content_hash = graph_content_hash(payload)
try:
export = validate_fabric_graph_export(payload)
except ValueError as exc:
import_run = await _record_invalid_import(
session,
payload,
source_repo_slug=source_repo_slug,
source_url=source_url,
content_hash=content_hash,
error=str(exc),
now=now,
requested_by=requested_by,
)
raise FabricGraphValidationError(
{
"message": str(exc),
"import_id": str(import_run.id),
"validation_status": import_run.validation_status,
}
) from exc
existing = await _find_import(session, source_repo_slug, content_hash)
if existing and existing.validation_status == "valid":
await _mark_latest(session, existing)
existing.last_seen_at = now
await _record_progress(
session,
"Fabric graph export already ingested; refreshed latest marker.",
{
"source_repo_slug": source_repo_slug,
"source_url": source_url,
"content_hash": content_hash,
"node_count": existing.node_count,
"edge_count": existing.edge_count,
"requested_by": requested_by,
"idempotent": True,
},
)
await session.commit()
await session.refresh(existing)
return existing, False, True
source = export.source
import_run = FabricGraphImport(
source_repo_slug=source_repo_slug,
source_url=source_url,
source_commit=source.commit if source else None,
source_path=source.path if source else None,
api_version=export.api_version,
export_kind=export.kind,
exported_at=export.generated_at,
content_hash=content_hash,
node_count=len(export.nodes),
edge_count=len(export.edges),
validation_status="valid",
error_details=None,
graph_json=export.model_dump(mode="json", by_alias=True),
is_latest=True,
last_seen_at=now,
)
await _mark_previous_not_latest(session, source_repo_slug)
session.add(import_run)
await session.flush()
for node in export.nodes:
raw = node.model_dump(mode="json")
session.add(
FabricGraphNode(
import_id=import_run.id,
source_repo_slug=source_repo_slug,
graph_id=node.id,
kind=node.kind,
name=node.name,
repo_slug=node.repo,
domain_slug=node.domain,
lifecycle=node.lifecycle,
canonical_type=raw.get("canonical_type"),
canon_category=node.canon_category,
canon_anchor=node.canon_anchor,
mapping_fit=node.mapping_fit,
evidence_state=node.evidence_state,
display_only=bool(raw.get("display_only", False)),
attributes=node.attributes,
raw_json=raw,
)
)
for edge in export.edges:
raw = edge.model_dump(mode="json", by_alias=True)
session.add(
FabricGraphEdge(
import_id=import_run.id,
source_repo_slug=source_repo_slug,
edge_key=edge_key(raw),
from_graph_id=edge.from_graph_id,
to_graph_id=edge.to_graph_id,
edge_type=edge.edge_type,
canonical_type=edge.canonical_type,
canon_anchor=edge.canon_anchor,
mapping_fit=edge.mapping_fit,
evidence_state=edge.evidence_state,
display_only=bool(edge.display_only),
attributes=edge.attributes,
raw_json=raw,
)
)
await _record_progress(
session,
"Fabric graph export ingested as State Hub read model.",
{
"source_repo_slug": source_repo_slug,
"source_url": source_url,
"content_hash": content_hash,
"node_count": len(export.nodes),
"edge_count": len(export.edges),
"requested_by": requested_by,
},
)
await session.commit()
await session.refresh(import_run)
return import_run, True, False
def validate_fabric_graph_export(payload: dict[str, Any]) -> FabricGraphExportPayload:
try:
export = FabricGraphExportPayload.model_validate(payload)
except ValidationError as exc:
first = exc.errors()[0]
location = ".".join(str(part) for part in first.get("loc", [])) or "<root>"
message = first.get("msg", "invalid payload")
raise ValueError(f"invalid FabricGraphExport at {location}: {message}") from exc
canon_errors = _canon_metadata_errors(export)
if canon_errors:
raise ValueError(f"invalid FabricGraphExport canon metadata: {canon_errors[0]}")
return export
def graph_content_hash(payload: dict[str, Any]) -> str:
canonical = _canonical_payload(payload)
raw = json.dumps(canonical, sort_keys=True, separators=(",", ":"), ensure_ascii=True)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def edge_key(edge: dict[str, Any]) -> str:
raw = json.dumps(edge, sort_keys=True, separators=(",", ":"), ensure_ascii=True)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
async def record_fabric_graph_error(
session: AsyncSession,
summary: str,
*,
source_repo_slug: str,
source_url: str | None,
error: str,
requested_by: str,
) -> None:
await _record_progress(
session,
summary,
{
"source_repo_slug": source_repo_slug,
"source_url": source_url,
"error": error,
"requested_by": requested_by,
},
)
await session.commit()
async def _find_import(
session: AsyncSession, source_repo_slug: str, content_hash: str
) -> FabricGraphImport | None:
result = await session.execute(
select(FabricGraphImport).where(
FabricGraphImport.source_repo_slug == source_repo_slug,
FabricGraphImport.content_hash == content_hash,
)
)
return result.scalar_one_or_none()
async def _mark_latest(session: AsyncSession, import_run: FabricGraphImport) -> None:
await _mark_previous_not_latest(session, import_run.source_repo_slug, exclude_id=import_run.id)
import_run.is_latest = True
async def _mark_previous_not_latest(
session: AsyncSession,
source_repo_slug: str,
*,
exclude_id: Any | None = None,
) -> None:
stmt = update(FabricGraphImport).where(FabricGraphImport.source_repo_slug == source_repo_slug)
if exclude_id is not None:
stmt = stmt.where(FabricGraphImport.id != exclude_id)
await session.execute(stmt.values(is_latest=False))
async def _record_invalid_import(
session: AsyncSession,
payload: dict[str, Any],
*,
source_repo_slug: str,
source_url: str | None,
content_hash: str,
error: str,
now: datetime,
requested_by: str,
) -> FabricGraphImport:
existing = await _find_import(session, source_repo_slug, content_hash)
if existing and existing.validation_status == "invalid":
existing.last_seen_at = now
import_run = existing
else:
import_run = FabricGraphImport(
source_repo_slug=source_repo_slug,
source_url=source_url,
source_commit=_source_value(payload, "commit"),
source_path=_source_value(payload, "path"),
api_version=str(payload.get("apiVersion")) if payload.get("apiVersion") else None,
export_kind=str(payload.get("kind")) if payload.get("kind") else None,
exported_at=_parse_datetime(payload.get("generated_at")),
content_hash=content_hash,
node_count=0,
edge_count=0,
validation_status="invalid",
error_details={"error": error},
graph_json=payload,
is_latest=False,
last_seen_at=now,
)
session.add(import_run)
await session.flush()
await _record_progress(
session,
"Fabric graph export rejected during validation.",
{
"source_repo_slug": source_repo_slug,
"source_url": source_url,
"content_hash": content_hash,
"error": error,
"requested_by": requested_by,
},
)
await session.commit()
await session.refresh(import_run)
return import_run
async def _record_progress(session: AsyncSession, summary: str, detail: dict[str, Any]) -> None:
session.add(
ProgressEvent(
event_type="fabric_graph_import",
summary=summary,
detail=detail,
author="state-hub",
)
)
def _canonical_payload(payload: dict[str, Any]) -> dict[str, Any]:
canonical = json.loads(json.dumps(payload, sort_keys=True, default=str))
canonical.pop("generated_at", None)
return canonical
def _canon_metadata_errors(export: FabricGraphExportPayload) -> list[str]:
errors: list[str] = []
for index, node in enumerate(export.nodes):
if any(
value is not None
for value in (
node.canon_category,
node.canon_anchor,
node.mapping_fit,
node.evidence_state,
)
):
_require_fields(
errors,
f"nodes[{index}]",
{
"canon_category": node.canon_category,
"mapping_fit": node.mapping_fit,
"evidence_state": node.evidence_state,
},
("canon_category", "mapping_fit", "evidence_state"),
)
for index, edge in enumerate(export.edges):
has_canon_fields = any(
value is not None
for value in (
edge.canonical_type,
edge.canon_anchor,
edge.mapping_fit,
edge.display_only,
edge.evidence_state,
)
)
if has_canon_fields:
_require_fields(
errors,
f"edges[{index}]",
{
"mapping_fit": edge.mapping_fit,
"display_only": edge.display_only,
"evidence_state": edge.evidence_state,
},
("mapping_fit", "display_only", "evidence_state"),
)
if edge.edge_type in DISPLAY_ONLY_EDGE_TYPES and edge.display_only is not True:
errors.append(
f"edges[{index}] uses display-only edge type {edge.edge_type!r} without display_only=true"
)
if edge.display_only is True and edge.edge_type and not has_canon_fields:
errors.append(f"edges[{index}] is display-only but lacks canon metadata")
return errors
def _require_fields(
errors: list[str],
path: str,
item: dict[str, Any],
fields: tuple[str, ...],
) -> None:
for field in fields:
if item.get(field) in (None, ""):
errors.append(f"{path} missing required canon metadata field {field!r}")
def _source_value(payload: dict[str, Any], field: str) -> str | None:
source = payload.get("source")
if not isinstance(source, dict):
return None
value = source.get(field)
return str(value) if value else None
def _parse_datetime(value: Any) -> datetime | None:
if not isinstance(value, str) or not value:
return None
try:
normalized = value.replace("Z", "+00:00")
return datetime.fromisoformat(normalized)
except ValueError:
return None