diff --git a/api/main.py b/api/main.py index 92fbc79..7174f46 100644 --- a/api/main.py +++ b/api/main.py @@ -18,6 +18,7 @@ from api.routers import flows from api.routers import recently_on_scope from api.routers import reconciliation from api.routers import execution +from api.routers import fabric class ETagMiddleware(BaseHTTPMiddleware): @@ -104,6 +105,7 @@ app.include_router(interface_changes.router) app.include_router(flows.router) app.include_router(reconciliation.router) app.include_router(execution.router) +app.include_router(fabric.router) app.include_router(state.router) app.include_router(policy.router) diff --git a/api/models/__init__.py b/api/models/__init__.py index 5bf0fb7..84a103a 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -22,6 +22,7 @@ from api.models.doi_cache import DOICache from api.models.token_event import TokenEvent from api.models.interface_change import InterfaceChange from api.models.workplan_launch_request import WorkplanLaunchRequest +from api.models.fabric_graph import FabricGraphImport, FabricGraphNode, FabricGraphEdge __all__ = [ "Base", @@ -48,4 +49,5 @@ __all__ = [ "TokenEvent", "InterfaceChange", "WorkplanLaunchRequest", + "FabricGraphImport", "FabricGraphNode", "FabricGraphEdge", ] diff --git a/api/models/fabric_graph.py b/api/models/fabric_graph.py new file mode 100644 index 0000000..384389f --- /dev/null +++ b/api/models/fabric_graph.py @@ -0,0 +1,110 @@ +import uuid +from datetime import datetime + +from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String, Text, UniqueConstraint +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from api.models.base import Base, TimestampMixin, new_uuid + + +class FabricGraphImport(Base, TimestampMixin): + __tablename__ = "fabric_graph_imports" + __table_args__ = ( + UniqueConstraint( + "source_repo_slug", + "content_hash", + name="uq_fabric_graph_imports_source_hash", + ), + ) + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=new_uuid + ) + source_repo_slug: Mapped[str] = mapped_column(String(100), nullable=False, index=True) + source_url: Mapped[str | None] = mapped_column(Text, nullable=True) + source_commit: Mapped[str | None] = mapped_column(String(80), nullable=True, index=True) + source_path: Mapped[str | None] = mapped_column(Text, nullable=True) + api_version: Mapped[str | None] = mapped_column(String(100), nullable=True) + export_kind: Mapped[str | None] = mapped_column(String(100), nullable=True, index=True) + exported_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True) + content_hash: Mapped[str] = mapped_column(String(64), nullable=False, index=True) + node_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0, server_default="0") + edge_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0, server_default="0") + validation_status: Mapped[str] = mapped_column(String(20), nullable=False, default="valid", server_default="valid", index=True) + error_details: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + graph_json: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict, server_default="{}") + is_latest: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False, server_default="false", index=True) + last_seen_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + + nodes: Mapped[list["FabricGraphNode"]] = relationship( + "FabricGraphNode", back_populates="import_run", cascade="all, delete-orphan", lazy="select" + ) + edges: Mapped[list["FabricGraphEdge"]] = relationship( + "FabricGraphEdge", back_populates="import_run", cascade="all, delete-orphan", lazy="select" + ) + + +class FabricGraphNode(Base): + __tablename__ = "fabric_graph_nodes" + __table_args__ = ( + UniqueConstraint("import_id", "graph_id", name="uq_fabric_graph_nodes_import_graph_id"), + ) + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=new_uuid + ) + import_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("fabric_graph_imports.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + source_repo_slug: Mapped[str] = mapped_column(String(100), nullable=False, index=True) + graph_id: Mapped[str] = mapped_column(Text, nullable=False) + kind: Mapped[str] = mapped_column(String(100), nullable=False, index=True) + name: Mapped[str] = mapped_column(Text, nullable=False) + repo_slug: Mapped[str] = mapped_column(String(100), nullable=False, index=True) + domain_slug: Mapped[str] = mapped_column(String(100), nullable=False, index=True) + lifecycle: Mapped[str] = mapped_column(String(50), nullable=False, index=True) + canonical_type: Mapped[str | None] = mapped_column(String(100), nullable=True, index=True) + canon_category: Mapped[str | None] = mapped_column(String(100), nullable=True, index=True) + canon_anchor: Mapped[str | None] = mapped_column(Text, nullable=True) + mapping_fit: Mapped[str | None] = mapped_column(String(20), nullable=True, index=True) + evidence_state: Mapped[str | None] = mapped_column(String(20), nullable=True, index=True) + display_only: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False, server_default="false", index=True) + attributes: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict, server_default="{}") + raw_json: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict, server_default="{}") + + import_run: Mapped["FabricGraphImport"] = relationship("FabricGraphImport", back_populates="nodes") + + +class FabricGraphEdge(Base): + __tablename__ = "fabric_graph_edges" + __table_args__ = ( + UniqueConstraint("import_id", "edge_key", name="uq_fabric_graph_edges_import_edge_key"), + ) + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=new_uuid + ) + import_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("fabric_graph_imports.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + source_repo_slug: Mapped[str] = mapped_column(String(100), nullable=False, index=True) + edge_key: Mapped[str] = mapped_column(String(64), nullable=False, index=True) + from_graph_id: Mapped[str] = mapped_column(Text, nullable=False) + to_graph_id: Mapped[str] = mapped_column(Text, nullable=False) + edge_type: Mapped[str] = mapped_column(String(100), nullable=False, index=True) + canonical_type: Mapped[str | None] = mapped_column(String(100), nullable=True, index=True) + canon_anchor: Mapped[str | None] = mapped_column(Text, nullable=True) + mapping_fit: Mapped[str | None] = mapped_column(String(20), nullable=True, index=True) + evidence_state: Mapped[str | None] = mapped_column(String(20), nullable=True, index=True) + display_only: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False, server_default="false", index=True) + attributes: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict, server_default="{}") + raw_json: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict, server_default="{}") + + import_run: Mapped["FabricGraphImport"] = relationship("FabricGraphImport", back_populates="edges") diff --git a/api/routers/fabric.py b/api/routers/fabric.py new file mode 100644 index 0000000..5e44e65 --- /dev/null +++ b/api/routers/fabric.py @@ -0,0 +1,290 @@ +from __future__ import annotations + +from typing import Any + +import httpx +from fastapi import APIRouter, Body, Depends, HTTPException, Query +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from api.database import get_session +from api.models.fabric_graph import FabricGraphEdge, FabricGraphImport, FabricGraphNode +from api.schemas.fabric_graph import ( + FabricGraphEdgeRead, + FabricGraphImportRead, + FabricGraphIngestResult, + FabricGraphNodeRead, + FabricGraphPullRequest, + FabricGraphSummary, +) +from api.services.fabric_graph import ( + FabricGraphValidationError, + ingest_fabric_graph_export, + record_fabric_graph_error, + split_graph_ingest_body, +) + +router = APIRouter(prefix="/fabric", tags=["fabric"]) + + +@router.post("/graph-exports", response_model=FabricGraphIngestResult) +async def ingest_graph_export( + body: dict[str, Any] = Body(...), + source_repo_slug: str | None = Query(None), + source_url: str | None = Query(None), + session: AsyncSession = Depends(get_session), +) -> FabricGraphIngestResult: + graph, metadata = split_graph_ingest_body(body) + effective_source_repo_slug = ( + source_repo_slug + or metadata.get("source_repo_slug") + or metadata.get("repo_slug") + or "railiance-fabric" + ) + effective_source_url = source_url or metadata.get("source_url") + requested_by = str(metadata.get("requested_by") or "api") + try: + import_run, created, idempotent = await ingest_fabric_graph_export( + session, + graph, + source_repo_slug=str(effective_source_repo_slug), + source_url=str(effective_source_url) if effective_source_url else None, + requested_by=requested_by, + ) + except FabricGraphValidationError as exc: + raise HTTPException(status_code=422, detail=exc.detail) from exc + return FabricGraphIngestResult( + import_run=FabricGraphImportRead.model_validate(import_run), + created=created, + idempotent=idempotent, + node_count=import_run.node_count, + edge_count=import_run.edge_count, + ) + + +@router.post("/graph-exports/pull", response_model=FabricGraphIngestResult) +async def pull_graph_export( + body: FabricGraphPullRequest | None = None, + session: AsyncSession = Depends(get_session), +) -> FabricGraphIngestResult: + request = body or FabricGraphPullRequest() + try: + async with httpx.AsyncClient(timeout=15.0) as client: + response = await client.get(request.source_url) + response.raise_for_status() + payload = response.json() + except Exception as exc: + await record_fabric_graph_error( + session, + "Fabric graph export pull failed.", + source_repo_slug=request.source_repo_slug, + source_url=request.source_url, + error=str(exc), + requested_by=request.requested_by, + ) + raise HTTPException(status_code=502, detail=f"Fabric graph export pull failed: {exc}") from exc + + try: + import_run, created, idempotent = await ingest_fabric_graph_export( + session, + payload, + source_repo_slug=request.source_repo_slug, + source_url=request.source_url, + requested_by=request.requested_by, + ) + except FabricGraphValidationError as exc: + raise HTTPException(status_code=422, detail=exc.detail) from exc + return FabricGraphIngestResult( + import_run=FabricGraphImportRead.model_validate(import_run), + created=created, + idempotent=idempotent, + node_count=import_run.node_count, + edge_count=import_run.edge_count, + ) + + +@router.get("/graph-exports", response_model=list[FabricGraphImportRead]) +async def list_graph_imports( + source_repo_slug: str | None = None, + validation_status: str | None = None, + limit: int = Query(50, ge=1, le=500), + session: AsyncSession = Depends(get_session), +) -> list[FabricGraphImportRead]: + query = select(FabricGraphImport) + if source_repo_slug: + query = query.where(FabricGraphImport.source_repo_slug == source_repo_slug) + if validation_status: + query = query.where(FabricGraphImport.validation_status == validation_status) + query = query.order_by(FabricGraphImport.created_at.desc()).limit(limit) + result = await session.execute(query) + return [FabricGraphImportRead.model_validate(row) for row in result.scalars().all()] + + +@router.get("/graph-exports/latest", response_model=FabricGraphImportRead) +async def latest_graph_import( + source_repo_slug: str = "railiance-fabric", + session: AsyncSession = Depends(get_session), +) -> FabricGraphImportRead: + import_run = await _latest_valid_import(session, source_repo_slug) + if import_run is None: + raise HTTPException(status_code=404, detail=f"No valid Fabric graph import for '{source_repo_slug}'") + return FabricGraphImportRead.model_validate(import_run) + + +@router.get("/graph/nodes", response_model=list[FabricGraphNodeRead]) +async def list_graph_nodes( + source_repo_slug: str = "railiance-fabric", + domain: str | None = None, + repo: str | None = None, + canonical_category: str | None = None, + evidence_state: str | None = None, + mapping_fit: str | None = None, + kind: str | None = None, + limit: int = Query(100, ge=1, le=1000), + session: AsyncSession = Depends(get_session), +) -> list[FabricGraphNodeRead]: + import_run = await _latest_valid_import_or_404(session, source_repo_slug) + query = select(FabricGraphNode).where(FabricGraphNode.import_id == import_run.id) + if domain: + query = query.where(FabricGraphNode.domain_slug == domain) + if repo: + query = query.where(FabricGraphNode.repo_slug == repo) + if canonical_category: + query = query.where(FabricGraphNode.canon_category == canonical_category) + if evidence_state: + query = query.where(FabricGraphNode.evidence_state == evidence_state) + if mapping_fit: + query = query.where(FabricGraphNode.mapping_fit == mapping_fit) + if kind: + query = query.where(FabricGraphNode.kind == kind) + query = query.order_by(FabricGraphNode.graph_id).limit(limit) + result = await session.execute(query) + return [FabricGraphNodeRead.model_validate(row) for row in result.scalars().all()] + + +@router.get("/graph/edges", response_model=list[FabricGraphEdgeRead]) +async def list_graph_edges( + source_repo_slug: str = "railiance-fabric", + canonical_relationship: str | None = None, + edge_type: str | None = None, + evidence_state: str | None = None, + mapping_fit: str | None = None, + display_only: bool | None = None, + from_graph_id: str | None = None, + to_graph_id: str | None = None, + limit: int = Query(100, ge=1, le=1000), + session: AsyncSession = Depends(get_session), +) -> list[FabricGraphEdgeRead]: + import_run = await _latest_valid_import_or_404(session, source_repo_slug) + query = select(FabricGraphEdge).where(FabricGraphEdge.import_id == import_run.id) + if canonical_relationship: + query = query.where(FabricGraphEdge.canonical_type == canonical_relationship) + if edge_type: + query = query.where(FabricGraphEdge.edge_type == edge_type) + if evidence_state: + query = query.where(FabricGraphEdge.evidence_state == evidence_state) + if mapping_fit: + query = query.where(FabricGraphEdge.mapping_fit == mapping_fit) + if display_only is not None: + query = query.where(FabricGraphEdge.display_only == display_only) + if from_graph_id: + query = query.where(FabricGraphEdge.from_graph_id == from_graph_id) + if to_graph_id: + query = query.where(FabricGraphEdge.to_graph_id == to_graph_id) + query = query.order_by(FabricGraphEdge.from_graph_id, FabricGraphEdge.to_graph_id, FabricGraphEdge.edge_type).limit(limit) + result = await session.execute(query) + return [FabricGraphEdgeRead.model_validate(row) for row in result.scalars().all()] + + +@router.get("/graph/summary", response_model=FabricGraphSummary) +async def graph_summary( + source_repo_slug: str = "railiance-fabric", + session: AsyncSession = Depends(get_session), +) -> FabricGraphSummary: + import_run = await _latest_valid_import(session, source_repo_slug) + if import_run is None: + return FabricGraphSummary( + source_repo_slug=source_repo_slug, + latest_import=None, + node_count=0, + edge_count=0, + nodes_by_domain={}, + nodes_by_repo={}, + nodes_by_canon_category={}, + edges_by_canonical_type={}, + nodes_by_evidence_state={}, + edges_by_evidence_state={}, + nodes_by_mapping_fit={}, + edges_by_mapping_fit={}, + example_nodes=[], + example_edges=[], + ) + + return FabricGraphSummary( + source_repo_slug=source_repo_slug, + latest_import=FabricGraphImportRead.model_validate(import_run), + node_count=import_run.node_count, + edge_count=import_run.edge_count, + nodes_by_domain=await _counts(session, FabricGraphNode.domain_slug, import_run.id), + nodes_by_repo=await _counts(session, FabricGraphNode.repo_slug, import_run.id), + nodes_by_canon_category=await _counts(session, FabricGraphNode.canon_category, import_run.id), + edges_by_canonical_type=await _counts(session, FabricGraphEdge.canonical_type, import_run.id), + nodes_by_evidence_state=await _counts(session, FabricGraphNode.evidence_state, import_run.id), + edges_by_evidence_state=await _counts(session, FabricGraphEdge.evidence_state, import_run.id), + nodes_by_mapping_fit=await _counts(session, FabricGraphNode.mapping_fit, import_run.id), + edges_by_mapping_fit=await _counts(session, FabricGraphEdge.mapping_fit, import_run.id), + example_nodes=await _example_nodes(session, import_run.id), + example_edges=await _example_edges(session, import_run.id), + ) + + +async def _latest_valid_import(session: AsyncSession, source_repo_slug: str) -> FabricGraphImport | None: + result = await session.execute( + select(FabricGraphImport) + .where( + FabricGraphImport.source_repo_slug == source_repo_slug, + FabricGraphImport.validation_status == "valid", + FabricGraphImport.is_latest.is_(True), + ) + .order_by(FabricGraphImport.created_at.desc()) + .limit(1) + ) + return result.scalar_one_or_none() + + +async def _latest_valid_import_or_404(session: AsyncSession, source_repo_slug: str) -> FabricGraphImport: + import_run = await _latest_valid_import(session, source_repo_slug) + if import_run is None: + raise HTTPException(status_code=404, detail=f"No valid Fabric graph import for '{source_repo_slug}'") + return import_run + + +async def _counts(session: AsyncSession, column: Any, import_id: Any) -> dict[str, int]: + table = column.class_ + result = await session.execute( + select(column, func.count()) + .select_from(table) + .where(table.import_id == import_id, column.is_not(None)) + .group_by(column) + ) + return {str(key): int(count) for key, count in result.all() if key is not None} + + +async def _example_nodes(session: AsyncSession, import_id: Any) -> list[FabricGraphNodeRead]: + result = await session.execute( + select(FabricGraphNode) + .where(FabricGraphNode.import_id == import_id) + .order_by(FabricGraphNode.graph_id) + .limit(5) + ) + return [FabricGraphNodeRead.model_validate(row) for row in result.scalars().all()] + + +async def _example_edges(session: AsyncSession, import_id: Any) -> list[FabricGraphEdgeRead]: + result = await session.execute( + select(FabricGraphEdge) + .where(FabricGraphEdge.import_id == import_id) + .order_by(FabricGraphEdge.from_graph_id, FabricGraphEdge.to_graph_id, FabricGraphEdge.edge_type) + .limit(5) + ) + return [FabricGraphEdgeRead.model_validate(row) for row in result.scalars().all()] diff --git a/api/schemas/fabric_graph.py b/api/schemas/fabric_graph.py new file mode 100644 index 0000000..494065a --- /dev/null +++ b/api/schemas/fabric_graph.py @@ -0,0 +1,152 @@ +import uuid +from datetime import datetime +from typing import Any, Literal + +from pydantic import BaseModel, ConfigDict, Field + + +MappingFit = Literal["direct", "partial", "conflict", "gap", "unknown"] +EvidenceState = Literal["observed", "declared", "inferred", "proposed", "gap"] + + +class FabricGraphSource(BaseModel): + model_config = ConfigDict(extra="forbid") + + repo: str | None = None + commit: str | None = None + path: str | None = None + + +class FabricGraphNodePayload(BaseModel): + model_config = ConfigDict(extra="forbid") + + id: str + kind: str + name: str + repo: str + domain: str + lifecycle: str + canon_category: str | None = None + canon_anchor: str | None = None + mapping_fit: MappingFit | None = None + evidence_state: EvidenceState | None = None + attributes: dict[str, Any] = Field(default_factory=dict) + + +class FabricGraphEdgePayload(BaseModel): + model_config = ConfigDict(extra="forbid", populate_by_name=True) + + from_graph_id: str = Field(alias="from") + to_graph_id: str = Field(alias="to") + edge_type: str = Field(alias="type") + canonical_type: str | None = None + canon_anchor: str | None = None + mapping_fit: MappingFit | None = None + display_only: bool | None = None + evidence_state: EvidenceState | None = None + attributes: dict[str, Any] = Field(default_factory=dict) + + +class FabricGraphExportPayload(BaseModel): + model_config = ConfigDict(extra="forbid", populate_by_name=True) + + api_version: Literal["railiance.fabric/v1alpha1"] = Field(alias="apiVersion") + kind: Literal["FabricGraphExport"] + generated_at: datetime | None = None + source: FabricGraphSource | None = None + nodes: list[FabricGraphNodePayload] + edges: list[FabricGraphEdgePayload] + + +class FabricGraphPullRequest(BaseModel): + source_repo_slug: str = "railiance-fabric" + source_url: str = "http://127.0.0.1:8765/exports/state-hub" + requested_by: str = "operator" + + +class FabricGraphImportRead(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: uuid.UUID + source_repo_slug: str + source_url: str | None = None + source_commit: str | None = None + source_path: str | None = None + api_version: str | None = None + export_kind: str | None = None + exported_at: datetime | None = None + content_hash: str + node_count: int + edge_count: int + validation_status: str + error_details: dict | None = None + is_latest: bool + last_seen_at: datetime | None = None + created_at: datetime + updated_at: datetime + + +class FabricGraphIngestResult(BaseModel): + import_run: FabricGraphImportRead + created: bool + idempotent: bool + node_count: int + edge_count: int + + +class FabricGraphNodeRead(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: uuid.UUID + import_id: uuid.UUID + source_repo_slug: str + graph_id: str + kind: str + name: str + repo_slug: str + domain_slug: str + lifecycle: str + canonical_type: str | None = None + canon_category: str | None = None + canon_anchor: str | None = None + mapping_fit: str | None = None + evidence_state: str | None = None + display_only: bool + attributes: dict + raw_json: dict + + +class FabricGraphEdgeRead(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: uuid.UUID + import_id: uuid.UUID + source_repo_slug: str + edge_key: str + from_graph_id: str + to_graph_id: str + edge_type: str + canonical_type: str | None = None + canon_anchor: str | None = None + mapping_fit: str | None = None + evidence_state: str | None = None + display_only: bool + attributes: dict + raw_json: dict + + +class FabricGraphSummary(BaseModel): + source_repo_slug: str + latest_import: FabricGraphImportRead | None = None + node_count: int + edge_count: int + nodes_by_domain: dict[str, int] + nodes_by_repo: dict[str, int] + nodes_by_canon_category: dict[str, int] + edges_by_canonical_type: dict[str, int] + nodes_by_evidence_state: dict[str, int] + edges_by_evidence_state: dict[str, int] + nodes_by_mapping_fit: dict[str, int] + edges_by_mapping_fit: dict[str, int] + example_nodes: list[FabricGraphNodeRead] + example_edges: list[FabricGraphEdgeRead] diff --git a/api/services/fabric_graph.py b/api/services/fabric_graph.py new file mode 100644 index 0000000..3bdc617 --- /dev/null +++ b/api/services/fabric_graph.py @@ -0,0 +1,400 @@ +from __future__ import annotations + +import hashlib +import json +from datetime import datetime, timezone +from typing import Any + +from pydantic import ValidationError +from sqlalchemy import select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from api.models.fabric_graph import FabricGraphEdge, FabricGraphImport, FabricGraphNode +from api.models.progress_event import ProgressEvent +from api.schemas.fabric_graph import FabricGraphExportPayload + +DISPLAY_ONLY_EDGE_TYPES = { + "collapsed_into", + "declares", + "grouped_with", + "highlight_path", + "near", + "owns_deployment", + "same_color_group", +} + + +class FabricGraphValidationError(Exception): + def __init__(self, detail: dict[str, Any]) -> None: + super().__init__(str(detail)) + self.detail = detail + + +def split_graph_ingest_body(body: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]: + """Support both direct FabricGraphExport payloads and the documented wrapper.""" + if isinstance(body.get("graph"), dict): + metadata = {key: value for key, value in body.items() if key != "graph"} + return body["graph"], metadata + return body, {} + + +async def ingest_fabric_graph_export( + session: AsyncSession, + payload: dict[str, Any], + *, + source_repo_slug: str, + source_url: str | None, + requested_by: str, +) -> tuple[FabricGraphImport, bool, bool]: + now = datetime.now(timezone.utc) + content_hash = graph_content_hash(payload) + try: + export = validate_fabric_graph_export(payload) + except ValueError as exc: + import_run = await _record_invalid_import( + session, + payload, + source_repo_slug=source_repo_slug, + source_url=source_url, + content_hash=content_hash, + error=str(exc), + now=now, + requested_by=requested_by, + ) + raise FabricGraphValidationError( + { + "message": str(exc), + "import_id": str(import_run.id), + "validation_status": import_run.validation_status, + } + ) from exc + + existing = await _find_import(session, source_repo_slug, content_hash) + if existing and existing.validation_status == "valid": + await _mark_latest(session, existing) + existing.last_seen_at = now + await _record_progress( + session, + "Fabric graph export already ingested; refreshed latest marker.", + { + "source_repo_slug": source_repo_slug, + "source_url": source_url, + "content_hash": content_hash, + "node_count": existing.node_count, + "edge_count": existing.edge_count, + "requested_by": requested_by, + "idempotent": True, + }, + ) + await session.commit() + await session.refresh(existing) + return existing, False, True + + source = export.source + import_run = FabricGraphImport( + source_repo_slug=source_repo_slug, + source_url=source_url, + source_commit=source.commit if source else None, + source_path=source.path if source else None, + api_version=export.api_version, + export_kind=export.kind, + exported_at=export.generated_at, + content_hash=content_hash, + node_count=len(export.nodes), + edge_count=len(export.edges), + validation_status="valid", + error_details=None, + graph_json=export.model_dump(mode="json", by_alias=True), + is_latest=True, + last_seen_at=now, + ) + await _mark_previous_not_latest(session, source_repo_slug) + session.add(import_run) + await session.flush() + + for node in export.nodes: + raw = node.model_dump(mode="json") + session.add( + FabricGraphNode( + import_id=import_run.id, + source_repo_slug=source_repo_slug, + graph_id=node.id, + kind=node.kind, + name=node.name, + repo_slug=node.repo, + domain_slug=node.domain, + lifecycle=node.lifecycle, + canonical_type=raw.get("canonical_type"), + canon_category=node.canon_category, + canon_anchor=node.canon_anchor, + mapping_fit=node.mapping_fit, + evidence_state=node.evidence_state, + display_only=bool(raw.get("display_only", False)), + attributes=node.attributes, + raw_json=raw, + ) + ) + + for edge in export.edges: + raw = edge.model_dump(mode="json", by_alias=True) + session.add( + FabricGraphEdge( + import_id=import_run.id, + source_repo_slug=source_repo_slug, + edge_key=edge_key(raw), + from_graph_id=edge.from_graph_id, + to_graph_id=edge.to_graph_id, + edge_type=edge.edge_type, + canonical_type=edge.canonical_type, + canon_anchor=edge.canon_anchor, + mapping_fit=edge.mapping_fit, + evidence_state=edge.evidence_state, + display_only=bool(edge.display_only), + attributes=edge.attributes, + raw_json=raw, + ) + ) + + await _record_progress( + session, + "Fabric graph export ingested as State Hub read model.", + { + "source_repo_slug": source_repo_slug, + "source_url": source_url, + "content_hash": content_hash, + "node_count": len(export.nodes), + "edge_count": len(export.edges), + "requested_by": requested_by, + }, + ) + await session.commit() + await session.refresh(import_run) + return import_run, True, False + + +def validate_fabric_graph_export(payload: dict[str, Any]) -> FabricGraphExportPayload: + try: + export = FabricGraphExportPayload.model_validate(payload) + except ValidationError as exc: + first = exc.errors()[0] + location = ".".join(str(part) for part in first.get("loc", [])) or "" + message = first.get("msg", "invalid payload") + raise ValueError(f"invalid FabricGraphExport at {location}: {message}") from exc + + canon_errors = _canon_metadata_errors(export) + if canon_errors: + raise ValueError(f"invalid FabricGraphExport canon metadata: {canon_errors[0]}") + return export + + +def graph_content_hash(payload: dict[str, Any]) -> str: + canonical = _canonical_payload(payload) + raw = json.dumps(canonical, sort_keys=True, separators=(",", ":"), ensure_ascii=True) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + +def edge_key(edge: dict[str, Any]) -> str: + raw = json.dumps(edge, sort_keys=True, separators=(",", ":"), ensure_ascii=True) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + +async def record_fabric_graph_error( + session: AsyncSession, + summary: str, + *, + source_repo_slug: str, + source_url: str | None, + error: str, + requested_by: str, +) -> None: + await _record_progress( + session, + summary, + { + "source_repo_slug": source_repo_slug, + "source_url": source_url, + "error": error, + "requested_by": requested_by, + }, + ) + await session.commit() + + +async def _find_import( + session: AsyncSession, source_repo_slug: str, content_hash: str +) -> FabricGraphImport | None: + result = await session.execute( + select(FabricGraphImport).where( + FabricGraphImport.source_repo_slug == source_repo_slug, + FabricGraphImport.content_hash == content_hash, + ) + ) + return result.scalar_one_or_none() + + +async def _mark_latest(session: AsyncSession, import_run: FabricGraphImport) -> None: + await _mark_previous_not_latest(session, import_run.source_repo_slug, exclude_id=import_run.id) + import_run.is_latest = True + + +async def _mark_previous_not_latest( + session: AsyncSession, + source_repo_slug: str, + *, + exclude_id: Any | None = None, +) -> None: + stmt = update(FabricGraphImport).where(FabricGraphImport.source_repo_slug == source_repo_slug) + if exclude_id is not None: + stmt = stmt.where(FabricGraphImport.id != exclude_id) + await session.execute(stmt.values(is_latest=False)) + + +async def _record_invalid_import( + session: AsyncSession, + payload: dict[str, Any], + *, + source_repo_slug: str, + source_url: str | None, + content_hash: str, + error: str, + now: datetime, + requested_by: str, +) -> FabricGraphImport: + existing = await _find_import(session, source_repo_slug, content_hash) + if existing and existing.validation_status == "invalid": + existing.last_seen_at = now + import_run = existing + else: + import_run = FabricGraphImport( + source_repo_slug=source_repo_slug, + source_url=source_url, + source_commit=_source_value(payload, "commit"), + source_path=_source_value(payload, "path"), + api_version=str(payload.get("apiVersion")) if payload.get("apiVersion") else None, + export_kind=str(payload.get("kind")) if payload.get("kind") else None, + exported_at=_parse_datetime(payload.get("generated_at")), + content_hash=content_hash, + node_count=0, + edge_count=0, + validation_status="invalid", + error_details={"error": error}, + graph_json=payload, + is_latest=False, + last_seen_at=now, + ) + session.add(import_run) + await session.flush() + + await _record_progress( + session, + "Fabric graph export rejected during validation.", + { + "source_repo_slug": source_repo_slug, + "source_url": source_url, + "content_hash": content_hash, + "error": error, + "requested_by": requested_by, + }, + ) + await session.commit() + await session.refresh(import_run) + return import_run + + +async def _record_progress(session: AsyncSession, summary: str, detail: dict[str, Any]) -> None: + session.add( + ProgressEvent( + event_type="fabric_graph_import", + summary=summary, + detail=detail, + author="state-hub", + ) + ) + + +def _canonical_payload(payload: dict[str, Any]) -> dict[str, Any]: + canonical = json.loads(json.dumps(payload, sort_keys=True, default=str)) + canonical.pop("generated_at", None) + return canonical + + +def _canon_metadata_errors(export: FabricGraphExportPayload) -> list[str]: + errors: list[str] = [] + for index, node in enumerate(export.nodes): + if any( + value is not None + for value in ( + node.canon_category, + node.canon_anchor, + node.mapping_fit, + node.evidence_state, + ) + ): + _require_fields( + errors, + f"nodes[{index}]", + { + "canon_category": node.canon_category, + "mapping_fit": node.mapping_fit, + "evidence_state": node.evidence_state, + }, + ("canon_category", "mapping_fit", "evidence_state"), + ) + for index, edge in enumerate(export.edges): + has_canon_fields = any( + value is not None + for value in ( + edge.canonical_type, + edge.canon_anchor, + edge.mapping_fit, + edge.display_only, + edge.evidence_state, + ) + ) + if has_canon_fields: + _require_fields( + errors, + f"edges[{index}]", + { + "mapping_fit": edge.mapping_fit, + "display_only": edge.display_only, + "evidence_state": edge.evidence_state, + }, + ("mapping_fit", "display_only", "evidence_state"), + ) + if edge.edge_type in DISPLAY_ONLY_EDGE_TYPES and edge.display_only is not True: + errors.append( + f"edges[{index}] uses display-only edge type {edge.edge_type!r} without display_only=true" + ) + if edge.display_only is True and edge.edge_type and not has_canon_fields: + errors.append(f"edges[{index}] is display-only but lacks canon metadata") + return errors + + +def _require_fields( + errors: list[str], + path: str, + item: dict[str, Any], + fields: tuple[str, ...], +) -> None: + for field in fields: + if item.get(field) in (None, ""): + errors.append(f"{path} missing required canon metadata field {field!r}") + + +def _source_value(payload: dict[str, Any], field: str) -> str | None: + source = payload.get("source") + if not isinstance(source, dict): + return None + value = source.get(field) + return str(value) if value else None + + +def _parse_datetime(value: Any) -> datetime | None: + if not isinstance(value, str) or not value: + return None + try: + normalized = value.replace("Z", "+00:00") + return datetime.fromisoformat(normalized) + except ValueError: + return None diff --git a/docs/fabric-graph-read-model.md b/docs/fabric-graph-read-model.md new file mode 100644 index 0000000..79ab8a9 --- /dev/null +++ b/docs/fabric-graph-read-model.md @@ -0,0 +1,67 @@ +# Fabric Graph Read Model + +State Hub stores Railiance Fabric graph exports as a read model. Fabric remains +the source of truth; the State Hub tables are cache/index/query state for +dashboard and operator use. + +## Refresh After Fabric Reset/Reingest + +Start or confirm the Railiance Fabric registry is serving its export: + +```bash +curl -s http://127.0.0.1:8765/exports/state-hub +``` + +Pull the current export into State Hub: + +```bash +curl -s -X POST http://127.0.0.1:8000/fabric/graph-exports/pull \ + -H "Content-Type: application/json" \ + -d '{ + "source_repo_slug": "railiance-fabric", + "source_url": "http://127.0.0.1:8765/exports/state-hub", + "requested_by": "operator" + }' +``` + +For a saved export file, post the FabricGraphExport JSON directly: + +```bash +curl -s -X POST "http://127.0.0.1:8000/fabric/graph-exports?source_repo_slug=railiance-fabric" \ + -H "Content-Type: application/json" \ + --data-binary @fabric-state-hub-export.json +``` + +## Query Checks + +Latest import and counts: + +```bash +curl -s http://127.0.0.1:8000/fabric/graph-exports/latest +curl -s http://127.0.0.1:8000/fabric/graph/summary +``` + +Representative relationship queries: + +```bash +curl -s "http://127.0.0.1:8000/fabric/graph/edges?canonical_relationship=exposes" +curl -s "http://127.0.0.1:8000/fabric/graph/edges?canonical_relationship=depends_on" +curl -s "http://127.0.0.1:8000/fabric/graph/edges?canonical_relationship=implements" +``` + +Node filters: + +```bash +curl -s "http://127.0.0.1:8000/fabric/graph/nodes?repo=state-hub" +curl -s "http://127.0.0.1:8000/fabric/graph/nodes?domain=custodian" +curl -s "http://127.0.0.1:8000/fabric/graph/nodes?canonical_category=service" +``` + +## Guarantees + +- malformed exports create a failed import/progress record but do not write + graph node or edge rows +- repeated imports of the same graph content are idempotent +- latest selection is per `source_repo_slug` +- read endpoints do not mutate workstreams, tasks, messages, decisions, or + progress rows diff --git a/migrations/versions/y2t3u4v5w6x7_fabric_graph_read_model.py b/migrations/versions/y2t3u4v5w6x7_fabric_graph_read_model.py new file mode 100644 index 0000000..8b7d305 --- /dev/null +++ b/migrations/versions/y2t3u4v5w6x7_fabric_graph_read_model.py @@ -0,0 +1,143 @@ +"""add Fabric graph read model + +Revision ID: y2t3u4v5w6x7 +Revises: x1s2t3u4v5w6 +Create Date: 2026-05-23 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB, UUID + +revision = "y2t3u4v5w6x7" +down_revision = "x1s2t3u4v5w6" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "fabric_graph_imports", + sa.Column("id", UUID(as_uuid=True), primary_key=True, nullable=False), + sa.Column("source_repo_slug", sa.String(length=100), nullable=False), + sa.Column("source_url", sa.Text(), nullable=True), + sa.Column("source_commit", sa.String(length=80), nullable=True), + sa.Column("source_path", sa.Text(), nullable=True), + sa.Column("api_version", sa.String(length=100), nullable=True), + sa.Column("export_kind", sa.String(length=100), nullable=True), + sa.Column("exported_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("content_hash", sa.String(length=64), nullable=False), + sa.Column("node_count", sa.Integer(), nullable=False, server_default="0"), + sa.Column("edge_count", sa.Integer(), nullable=False, server_default="0"), + sa.Column("validation_status", sa.String(length=20), nullable=False, server_default="valid"), + sa.Column("error_details", JSONB(), nullable=True), + sa.Column("graph_json", JSONB(), nullable=False, server_default="{}"), + sa.Column("is_latest", sa.Boolean(), nullable=False, server_default="false"), + sa.Column("last_seen_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.UniqueConstraint("source_repo_slug", "content_hash", name="uq_fabric_graph_imports_source_hash"), + ) + op.create_index("ix_fabric_graph_imports_source_repo_slug", "fabric_graph_imports", ["source_repo_slug"]) + op.create_index("ix_fabric_graph_imports_source_commit", "fabric_graph_imports", ["source_commit"]) + op.create_index("ix_fabric_graph_imports_export_kind", "fabric_graph_imports", ["export_kind"]) + op.create_index("ix_fabric_graph_imports_exported_at", "fabric_graph_imports", ["exported_at"]) + op.create_index("ix_fabric_graph_imports_content_hash", "fabric_graph_imports", ["content_hash"]) + op.create_index("ix_fabric_graph_imports_validation_status", "fabric_graph_imports", ["validation_status"]) + op.create_index("ix_fabric_graph_imports_is_latest", "fabric_graph_imports", ["is_latest"]) + + op.create_table( + "fabric_graph_nodes", + sa.Column("id", UUID(as_uuid=True), primary_key=True, nullable=False), + sa.Column("import_id", UUID(as_uuid=True), nullable=False), + sa.Column("source_repo_slug", sa.String(length=100), nullable=False), + sa.Column("graph_id", sa.Text(), nullable=False), + sa.Column("kind", sa.String(length=100), nullable=False), + sa.Column("name", sa.Text(), nullable=False), + sa.Column("repo_slug", sa.String(length=100), nullable=False), + sa.Column("domain_slug", sa.String(length=100), nullable=False), + sa.Column("lifecycle", sa.String(length=50), nullable=False), + sa.Column("canonical_type", sa.String(length=100), nullable=True), + sa.Column("canon_category", sa.String(length=100), nullable=True), + sa.Column("canon_anchor", sa.Text(), nullable=True), + sa.Column("mapping_fit", sa.String(length=20), nullable=True), + sa.Column("evidence_state", sa.String(length=20), nullable=True), + sa.Column("display_only", sa.Boolean(), nullable=False, server_default="false"), + sa.Column("attributes", JSONB(), nullable=False, server_default="{}"), + sa.Column("raw_json", JSONB(), nullable=False, server_default="{}"), + sa.ForeignKeyConstraint(["import_id"], ["fabric_graph_imports.id"], ondelete="CASCADE"), + sa.UniqueConstraint("import_id", "graph_id", name="uq_fabric_graph_nodes_import_graph_id"), + ) + op.create_index("ix_fabric_graph_nodes_import_id", "fabric_graph_nodes", ["import_id"]) + op.create_index("ix_fabric_graph_nodes_source_repo_slug", "fabric_graph_nodes", ["source_repo_slug"]) + op.create_index("ix_fabric_graph_nodes_kind", "fabric_graph_nodes", ["kind"]) + op.create_index("ix_fabric_graph_nodes_repo_slug", "fabric_graph_nodes", ["repo_slug"]) + op.create_index("ix_fabric_graph_nodes_domain_slug", "fabric_graph_nodes", ["domain_slug"]) + op.create_index("ix_fabric_graph_nodes_lifecycle", "fabric_graph_nodes", ["lifecycle"]) + op.create_index("ix_fabric_graph_nodes_canonical_type", "fabric_graph_nodes", ["canonical_type"]) + op.create_index("ix_fabric_graph_nodes_canon_category", "fabric_graph_nodes", ["canon_category"]) + op.create_index("ix_fabric_graph_nodes_mapping_fit", "fabric_graph_nodes", ["mapping_fit"]) + op.create_index("ix_fabric_graph_nodes_evidence_state", "fabric_graph_nodes", ["evidence_state"]) + op.create_index("ix_fabric_graph_nodes_display_only", "fabric_graph_nodes", ["display_only"]) + + op.create_table( + "fabric_graph_edges", + sa.Column("id", UUID(as_uuid=True), primary_key=True, nullable=False), + sa.Column("import_id", UUID(as_uuid=True), nullable=False), + sa.Column("source_repo_slug", sa.String(length=100), nullable=False), + sa.Column("edge_key", sa.String(length=64), nullable=False), + sa.Column("from_graph_id", sa.Text(), nullable=False), + sa.Column("to_graph_id", sa.Text(), nullable=False), + sa.Column("edge_type", sa.String(length=100), nullable=False), + sa.Column("canonical_type", sa.String(length=100), nullable=True), + sa.Column("canon_anchor", sa.Text(), nullable=True), + sa.Column("mapping_fit", sa.String(length=20), nullable=True), + sa.Column("evidence_state", sa.String(length=20), nullable=True), + sa.Column("display_only", sa.Boolean(), nullable=False, server_default="false"), + sa.Column("attributes", JSONB(), nullable=False, server_default="{}"), + sa.Column("raw_json", JSONB(), nullable=False, server_default="{}"), + sa.ForeignKeyConstraint(["import_id"], ["fabric_graph_imports.id"], ondelete="CASCADE"), + sa.UniqueConstraint("import_id", "edge_key", name="uq_fabric_graph_edges_import_edge_key"), + ) + op.create_index("ix_fabric_graph_edges_import_id", "fabric_graph_edges", ["import_id"]) + op.create_index("ix_fabric_graph_edges_source_repo_slug", "fabric_graph_edges", ["source_repo_slug"]) + op.create_index("ix_fabric_graph_edges_edge_key", "fabric_graph_edges", ["edge_key"]) + op.create_index("ix_fabric_graph_edges_edge_type", "fabric_graph_edges", ["edge_type"]) + op.create_index("ix_fabric_graph_edges_canonical_type", "fabric_graph_edges", ["canonical_type"]) + op.create_index("ix_fabric_graph_edges_mapping_fit", "fabric_graph_edges", ["mapping_fit"]) + op.create_index("ix_fabric_graph_edges_evidence_state", "fabric_graph_edges", ["evidence_state"]) + op.create_index("ix_fabric_graph_edges_display_only", "fabric_graph_edges", ["display_only"]) + + +def downgrade() -> None: + op.drop_index("ix_fabric_graph_edges_display_only", table_name="fabric_graph_edges") + op.drop_index("ix_fabric_graph_edges_evidence_state", table_name="fabric_graph_edges") + op.drop_index("ix_fabric_graph_edges_mapping_fit", table_name="fabric_graph_edges") + op.drop_index("ix_fabric_graph_edges_canonical_type", table_name="fabric_graph_edges") + op.drop_index("ix_fabric_graph_edges_edge_type", table_name="fabric_graph_edges") + op.drop_index("ix_fabric_graph_edges_edge_key", table_name="fabric_graph_edges") + op.drop_index("ix_fabric_graph_edges_source_repo_slug", table_name="fabric_graph_edges") + op.drop_index("ix_fabric_graph_edges_import_id", table_name="fabric_graph_edges") + op.drop_table("fabric_graph_edges") + + op.drop_index("ix_fabric_graph_nodes_display_only", table_name="fabric_graph_nodes") + op.drop_index("ix_fabric_graph_nodes_evidence_state", table_name="fabric_graph_nodes") + op.drop_index("ix_fabric_graph_nodes_mapping_fit", table_name="fabric_graph_nodes") + op.drop_index("ix_fabric_graph_nodes_canon_category", table_name="fabric_graph_nodes") + op.drop_index("ix_fabric_graph_nodes_canonical_type", table_name="fabric_graph_nodes") + op.drop_index("ix_fabric_graph_nodes_lifecycle", table_name="fabric_graph_nodes") + op.drop_index("ix_fabric_graph_nodes_domain_slug", table_name="fabric_graph_nodes") + op.drop_index("ix_fabric_graph_nodes_repo_slug", table_name="fabric_graph_nodes") + op.drop_index("ix_fabric_graph_nodes_kind", table_name="fabric_graph_nodes") + op.drop_index("ix_fabric_graph_nodes_source_repo_slug", table_name="fabric_graph_nodes") + op.drop_index("ix_fabric_graph_nodes_import_id", table_name="fabric_graph_nodes") + op.drop_table("fabric_graph_nodes") + + op.drop_index("ix_fabric_graph_imports_is_latest", table_name="fabric_graph_imports") + op.drop_index("ix_fabric_graph_imports_validation_status", table_name="fabric_graph_imports") + op.drop_index("ix_fabric_graph_imports_content_hash", table_name="fabric_graph_imports") + op.drop_index("ix_fabric_graph_imports_exported_at", table_name="fabric_graph_imports") + op.drop_index("ix_fabric_graph_imports_export_kind", table_name="fabric_graph_imports") + op.drop_index("ix_fabric_graph_imports_source_commit", table_name="fabric_graph_imports") + op.drop_index("ix_fabric_graph_imports_source_repo_slug", table_name="fabric_graph_imports") + op.drop_table("fabric_graph_imports") diff --git a/tests/test_routers_core.py b/tests/test_routers_core.py index 494cb7b..d6b4bbc 100644 --- a/tests/test_routers_core.py +++ b/tests/test_routers_core.py @@ -942,3 +942,224 @@ class TestExecutionQueueEndpoints: r = await client.get(f"/execution/launch-requests?workstream_id={ws['id']}") assert len(r.json()) == 1 + + +def _fabric_graph_export(generated_at="2026-05-23T12:00:00Z", extra_node=False): + nodes = [ + { + "id": "the-custodian.state-hub", + "kind": "ServiceDeclaration", + "name": "State Hub", + "repo": "state-hub", + "domain": "custodian", + "lifecycle": "active", + "canon_category": "service", + "canon_anchor": "state-hub", + "mapping_fit": "direct", + "evidence_state": "declared", + "attributes": {"state_hub_repo_id": "state-hub"}, + }, + { + "id": "the-custodian.state-hub.http-api", + "kind": "InterfaceDeclaration", + "name": "State Hub HTTP API", + "repo": "state-hub", + "domain": "custodian", + "lifecycle": "active", + "canon_category": "interface", + "mapping_fit": "direct", + "evidence_state": "observed", + "attributes": {}, + }, + { + "id": "the-custodian.state-hub.coordination", + "kind": "CapabilityDeclaration", + "name": "Coordination", + "repo": "state-hub", + "domain": "custodian", + "lifecycle": "active", + "canon_category": "capability", + "mapping_fit": "direct", + "evidence_state": "declared", + "attributes": {}, + }, + ] + edges = [ + { + "from": "the-custodian.state-hub", + "to": "the-custodian.state-hub.http-api", + "type": "exposes", + "canonical_type": "exposes", + "canon_anchor": "state-hub-http", + "mapping_fit": "direct", + "display_only": False, + "evidence_state": "observed", + "attributes": {}, + }, + { + "from": "the-custodian.state-hub", + "to": "the-custodian.state-hub.coordination", + "type": "provides", + "canonical_type": "implements", + "mapping_fit": "direct", + "display_only": False, + "evidence_state": "declared", + "attributes": {}, + }, + { + "from": "the-custodian.state-hub.coordination", + "to": "the-custodian.state-hub.http-api", + "type": "depends_on", + "canonical_type": "depends_on", + "mapping_fit": "direct", + "display_only": False, + "evidence_state": "declared", + "attributes": {}, + }, + ] + if extra_node: + nodes.append( + { + "id": "railiance.fabric.registry", + "kind": "ServiceDeclaration", + "name": "Railiance Fabric Registry", + "repo": "railiance-fabric", + "domain": "custodian", + "lifecycle": "active", + "canon_category": "service", + "mapping_fit": "direct", + "evidence_state": "observed", + "attributes": {}, + } + ) + edges.append( + { + "from": "railiance.fabric.registry", + "to": "the-custodian.state-hub", + "type": "exposes", + "canonical_type": "exposes", + "mapping_fit": "direct", + "display_only": False, + "evidence_state": "observed", + "attributes": {}, + } + ) + return { + "apiVersion": "railiance.fabric/v1alpha1", + "kind": "FabricGraphExport", + "generated_at": generated_at, + "source": { + "repo": "registry", + "commit": "abc123", + "path": ".railiance-fabric/registry.sqlite3", + }, + "nodes": nodes, + "edges": edges, + } + + +class TestFabricGraphReadModel: + async def test_validation_failure_records_failed_import_without_read_model_rows(self, client): + payload = _fabric_graph_export() + payload.pop("kind") + + r = await client.post("/fabric/graph-exports", json=payload) + + assert r.status_code == 422 + body = r.json()["detail"] + assert body["validation_status"] == "invalid" + assert body["import_id"] + + r = await client.get("/fabric/graph-exports?validation_status=invalid") + assert r.status_code == 200 + imports = r.json() + assert len(imports) == 1 + assert imports[0]["node_count"] == 0 + assert imports[0]["edge_count"] == 0 + assert imports[0]["error_details"]["error"].startswith("invalid FabricGraphExport") + + r = await client.get("/fabric/graph/nodes") + assert r.status_code == 404 + + r = await client.get("/progress/?event_type=fabric_graph_import") + assert r.status_code == 200 + assert "rejected" in r.json()[0]["summary"] + + async def test_idempotent_reingest_uses_canonical_graph_content_hash(self, client): + first = _fabric_graph_export(generated_at="2026-05-23T12:00:00Z") + second = _fabric_graph_export(generated_at="2026-05-23T12:05:00Z") + + r = await client.post("/fabric/graph-exports", json=first) + assert r.status_code == 200, r.text + first_body = r.json() + assert first_body["created"] is True + assert first_body["idempotent"] is False + + r = await client.post("/fabric/graph-exports", json=second) + assert r.status_code == 200, r.text + second_body = r.json() + assert second_body["created"] is False + assert second_body["idempotent"] is True + assert second_body["import_run"]["id"] == first_body["import_run"]["id"] + + r = await client.get("/fabric/graph-exports") + assert len(r.json()) == 1 + r = await client.get("/fabric/graph/nodes") + assert len(r.json()) == 3 + + async def test_latest_import_selection_tracks_new_graph_content(self, client): + r = await client.post("/fabric/graph-exports", json=_fabric_graph_export()) + assert r.status_code == 200, r.text + first_id = r.json()["import_run"]["id"] + + r = await client.post("/fabric/graph-exports", json=_fabric_graph_export(extra_node=True)) + assert r.status_code == 200, r.text + second_id = r.json()["import_run"]["id"] + assert second_id != first_id + + r = await client.get("/fabric/graph-exports/latest") + latest = r.json() + assert latest["id"] == second_id + assert latest["node_count"] == 4 + assert latest["edge_count"] == 4 + + r = await client.get("/fabric/graph-exports") + latest_flags = {row["id"]: row["is_latest"] for row in r.json()} + assert latest_flags[first_id] is False + assert latest_flags[second_id] is True + + async def test_read_only_queries_filter_graph_without_mutating_state_hub_entities(self, client): + await _create_domain(client) + topic = await _create_topic(client) + ws = await _create_workstream(client, topic["id"], status="ready") + task = await _create_task(client, ws["id"]) + r = await client.post("/fabric/graph-exports", json=_fabric_graph_export(extra_node=True)) + assert r.status_code == 200, r.text + + before_progress = await client.get("/progress/") + before_progress_count = len(before_progress.json()) + + r = await client.get("/fabric/graph/summary") + assert r.status_code == 200 + summary = r.json() + assert summary["node_count"] == 4 + assert summary["edge_count"] == 4 + assert summary["nodes_by_repo"]["state-hub"] == 3 + assert summary["edges_by_canonical_type"]["exposes"] == 2 + + r = await client.get("/fabric/graph/nodes?repo=state-hub&canonical_category=service") + assert r.status_code == 200 + assert [node["graph_id"] for node in r.json()] == ["the-custodian.state-hub"] + + for relationship in ("exposes", "depends_on", "implements"): + r = await client.get(f"/fabric/graph/edges?canonical_relationship={relationship}") + assert r.status_code == 200 + assert len(r.json()) >= 1 + + after_progress = await client.get("/progress/") + assert len(after_progress.json()) == before_progress_count + + r = await client.get(f"/workstreams/{ws['id']}") + assert r.json()["status"] == "ready" + r = await client.get(f"/tasks/{task['id']}") + assert r.json()["status"] == "todo" diff --git a/workplans/STATE-WP-0050-railiance-fabric-graph-read-model-ingest.md b/workplans/STATE-WP-0050-railiance-fabric-graph-read-model-ingest.md index f756dda..3f738f0 100644 --- a/workplans/STATE-WP-0050-railiance-fabric-graph-read-model-ingest.md +++ b/workplans/STATE-WP-0050-railiance-fabric-graph-read-model-ingest.md @@ -4,7 +4,7 @@ type: workplan title: "Railiance Fabric Graph Read Model Ingest" domain: custodian repo: state-hub -status: ready +status: finished owner: codex topic_slug: custodian created: "2026-05-23" @@ -46,7 +46,7 @@ Current verified Fabric export baseline from 2026-05-23: ```task id: STATE-WP-0050-T01 -status: todo +status: done priority: high state_hub_task_id: "d80b74bd-57ee-4e7a-81d7-406c02da52bc" ``` @@ -65,11 +65,23 @@ Requirements: - Make repeated ingest of the same export idempotent. - Include migrations and repository/service helpers. +Result: + +- Added `fabric_graph_imports`, `fabric_graph_nodes`, and + `fabric_graph_edges` read-model tables with import metadata, canonical + content hash identity, source metadata, validation state, raw payload + retention, and materialized node/edge query columns. +- Preserved canon metadata fields on materialized nodes and edges, including + canonical type/category/anchor, mapping fit, evidence state, and + display-only flags. +- Added service helpers that validate, hash, materialize, and idempotently + refresh latest markers for repeated graph imports. + ## T02 - Ingest API and pull job ```task id: STATE-WP-0050-T02 -status: todo +status: done priority: high state_hub_task_id: "3a94655d-703a-4aec-b724-46cafce14fdb" ``` @@ -88,11 +100,21 @@ Requirements: - Keep the integration explicit to `railiance-fabric` at first, while leaving source metadata general enough for future graph producers. +Result: + +- Added `POST /fabric/graph-exports` for direct `FabricGraphExport` payloads + and the documented wrapper shape. +- Added `POST /fabric/graph-exports/pull` with a default pull source of + `http://127.0.0.1:8765/exports/state-hub` and source repo + `railiance-fabric`. +- Invalid exports record failed import metadata and a `fabric_graph_import` + progress row without writing graph nodes or edges. + ## T03 - Query surfaces and dashboard readiness ```task id: STATE-WP-0050-T03 -status: todo +status: done priority: medium state_hub_task_id: "6b4ed6fe-bc84-43c8-a4b0-55ee93918bac" ``` @@ -109,11 +131,22 @@ Requirements: - Do not allow graph read-model queries to mutate State Hub workstreams, tasks, messages, decisions, or progress rows. +Result: + +- Added read-only query endpoints for import listing/latest status, graph + summary counts, graph nodes, and graph edges. +- Node queries filter by source repo, domain, repo, canonical category, + evidence state, mapping fit, and kind. +- Edge queries filter by source repo, canonical relationship, edge type, + evidence state, mapping fit, display-only state, and endpoints. +- Summary output includes dashboard-ready counts plus representative node and + edge examples. + ## T04 - Verification with RAIL-FAB-WP-0016 export ```task id: STATE-WP-0050-T04 -status: todo +status: done priority: high state_hub_task_id: "fdf5275a-f04d-43a3-b18a-12cfe0dcc2f7" ``` @@ -131,6 +164,20 @@ Requirements: - Document the operator command for refreshing the State Hub graph read model after a Fabric reset/reingest. +Result: + +- Verified against the live Railiance Fabric registry export on + `http://127.0.0.1:8765/exports/state-hub` via a temporary State Hub server: + latest import `130ffb56-7e30-4963-a3b1-b7527f685b45` stored 49 nodes and + 58 edges. +- Verified idempotent live reingest returned the same import id. +- Confirmed relationship queries returned `exposes` 31, `depends_on` 15, and + `implements` 12. +- Added regression tests covering validation failure, idempotent reingest, + latest import selection, and read-only query behavior. +- Documented the operator refresh command in + `docs/fabric-graph-read-model.md`. + ## Acceptance - State Hub has a documented endpoint or job for importing the