"""Registry orchestrator (ADR-0002 + ADR-0003 + ADR-0004). The :class:`Registry` is the library-shaped entry point. It combines ``identity`` + ``manifest`` + ``events`` + ``dataplane`` into the six operations the HTTP API and CLI both consume: * ``create_package`` * ``ingest_file`` * ``finalize_package`` * ``get_manifest_bytes`` (canonical CBOR or JCS projection) * ``get_file`` * ``tail_events`` Every mutating operation writes one event and updates the materialised views inside the same database transaction, so the event log remains the source of truth (ADR-0002). """ from __future__ import annotations import uuid from collections.abc import AsyncIterator, Sequence from datetime import UTC, datetime from typing import Any from uuid import UUID import cbor2 from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncEngine from artifactstore.dataplane.spi import DataPlane from artifactstore.db.schema import ( artifact_files, artifact_packages, retention_classes, retention_state, storage_locations, ) from artifactstore.db.schema import ( events as events_t, ) from artifactstore.events import RegistryViewWriter, make_event, tail, write from artifactstore.events.model import Event from artifactstore.identity import ContentAddress from artifactstore.manifest import ( MANIFEST_VERSION, FileEntry, Manifest, Package, Provenance, RetentionSummary, ) from artifactstore.manifest import ( StorageReceipt as ManifestStorageReceipt, ) from artifactstore.manifest import ( encode as manifest_encode, ) from artifactstore.manifest import ( manifest_digest as compute_manifest_digest, ) from artifactstore.manifest.codec import decode as manifest_decode from artifactstore.manifest.projection import jcs_projection from artifactstore.storage.spi import BackendStatus __all__ = ["Registry"] class PackageNotFoundError(KeyError): """Raised when a package id has no row in artifact_packages.""" class FileNotFoundError(KeyError): """Raised when a file id has no row in artifact_files / storage_locations.""" class IllegalPackageStateError(ValueError): """Raised when an operation is rejected because of package status.""" class DuplicateRelativePathError(ValueError): """Raised when ingest_file would create a duplicate ``relative_path`` in a package.""" class Registry: """Library-shaped orchestrator over events, dataplane, and views.""" def __init__( self, engine: AsyncEngine, dataplane: DataPlane, view_writer: RegistryViewWriter | None = None, ) -> None: self._engine = engine self._dataplane = dataplane self._view_writer = view_writer or RegistryViewWriter() # ---- mutating operations ------------------------------------------------ async def create_package( self, *, name: str, producer: str, subject: str, retention_class: str, actor: str, metadata: dict[str, Any] | None = None, ) -> UUID: """Create a new package; returns its ``UUID``.""" await self._validate_retention_class(retention_class) package_id = uuid.uuid4() payload = cbor2.dumps( { "name": name, "producer": producer, "subject": subject, "retention_class": retention_class, "metadata": metadata or {}, }, canonical=True, ) event = make_event( event_type="v1.package.created", subject_kind="package", subject_id=package_id, actor=actor, payload=payload, ) async with self._engine.begin() as conn: written = await write(conn, event) await self._view_writer.apply(conn, written) return package_id async def ingest_file( self, package_id: UUID, *, relative_path: str, media_type: str, stream: AsyncIterator[bytes], actor: str, ) -> UUID: """Stream a file into the package; returns the new file id.""" async with self._engine.connect() as conn: pkg_row = ( await conn.execute( select(artifact_packages).where(artifact_packages.c.id == package_id) ) ).first() if pkg_row is None: raise PackageNotFoundError(f"package not found: {package_id}") if pkg_row.status != "created": raise IllegalPackageStateError( f"package {package_id} cannot accept files (status={pkg_row.status})" ) existing = ( await conn.execute( select(artifact_files).where( artifact_files.c.package_id == package_id, artifact_files.c.relative_path == relative_path, ) ) ).first() if existing is not None: raise DuplicateRelativePathError( f"relative_path {relative_path!r} already exists in package {package_id}" ) ingest = await self._dataplane.ingest_stream(stream) file_id = uuid.uuid4() storage_location_id = uuid.uuid4() payload = cbor2.dumps( { "file_id": str(file_id), "relative_path": relative_path, "media_type": media_type, "size_bytes": ingest.size_bytes, "digest_algorithm": ingest.primary_digest.algorithm, "digest_primary_hex": ingest.primary_digest.hex, "digest_sha256_hex": ingest.sha256_digest.hex, "backend_id": ingest.receipt.backend_id, "object_key": ingest.receipt.object_key, "storage_location_id": str(storage_location_id), "retrieval_tier": "hot", "storage_status": "recorded", }, canonical=True, ) event = make_event( event_type="v1.file.ingested", subject_kind="package", subject_id=package_id, actor=actor, payload=payload, ) async with self._engine.begin() as conn: written = await write(conn, event) await self._view_writer.apply(conn, written) return file_id async def finalize_package(self, package_id: UUID, *, actor: str) -> ContentAddress: """Finalise a package: build manifest, write it as the v1.package.finalized event payload, and update the package's terminal state. Returns the manifest's content address.""" async with self._engine.connect() as conn: pkg_row = ( await conn.execute( select(artifact_packages).where(artifact_packages.c.id == package_id) ) ).first() if pkg_row is None: raise PackageNotFoundError(f"package not found: {package_id}") if pkg_row.status != "created": raise IllegalPackageStateError( f"package {package_id} cannot be finalized (status={pkg_row.status})" ) file_rows = ( await conn.execute( select(artifact_files) .where(artifact_files.c.package_id == package_id) .order_by(artifact_files.c.relative_path) ) ).all() file_ids = [r.id for r in file_rows] location_rows = ( ( await conn.execute( select(storage_locations) .where(storage_locations.c.artifact_file_id.in_(file_ids)) .order_by(storage_locations.c.artifact_file_id) ) ).all() if file_ids else [] ) retention_row = ( await conn.execute( select(retention_state).where(retention_state.c.package_id == package_id) ) ).first() manifest = _build_manifest( pkg_row=pkg_row, file_rows=file_rows, location_rows=location_rows, retention_row=retention_row, actor=actor, ) manifest_bytes = manifest_encode(manifest) manifest_addr = compute_manifest_digest(manifest) event = make_event( event_type="v1.package.finalized", subject_kind="package", subject_id=package_id, actor=actor, payload=manifest_bytes, ) async with self._engine.begin() as conn: written = await write(conn, event) await self._view_writer.apply(conn, written) return manifest_addr # ---- read-only operations ---------------------------------------------- async def get_manifest_bytes(self, package_id: UUID, *, format: str = "cbor") -> bytes: """Return the finalised manifest. ``format`` is ``cbor`` (canonical CBOR, the wire form) or ``json`` (the JCS projection).""" async with self._engine.connect() as conn: row = ( await conn.execute( select(events_t.c.payload) .where( events_t.c.event_type == "v1.package.finalized", events_t.c.subject_id == package_id, ) .order_by(events_t.c.sequence.desc()) .limit(1) ) ).first() if row is None: raise PackageNotFoundError(f"no finalized manifest for package {package_id}") payload_bytes: bytes = row.payload if format == "cbor": return payload_bytes if format == "json": return jcs_projection(manifest_decode(payload_bytes)) raise ValueError(f"unknown manifest format: {format!r} (expected 'cbor' or 'json')") async def get_file(self, file_id: UUID) -> AsyncIterator[bytes]: """Return an async byte iterator for the bytes of a stored file.""" async with self._engine.connect() as conn: row = ( await conn.execute( select(storage_locations) .where(storage_locations.c.artifact_file_id == file_id) .limit(1) ) ).first() if row is None: raise FileNotFoundError(f"file not found: {file_id}") ca = ContentAddress(row.content_address) return await self._dataplane.serve_object(ca) def tail_events( self, *, since_sequence: int = 0, poll_interval_seconds: float = 0.5, ) -> AsyncIterator[Event]: """Long-poll the event log; pass-through to :func:`events.tail`.""" return tail( self._engine, since_sequence=since_sequence, poll_interval_seconds=poll_interval_seconds, ) # ---- health / lifecycle ------------------------------------------------- async def db_health(self) -> tuple[bool, str]: """Probe the database with ``SELECT 1``. Returns ``(healthy, detail)``; ``detail`` is "ok" on success and the exception message on failure. """ try: async with self._engine.connect() as conn: await conn.execute(text("SELECT 1")) except Exception as exc: return False, f"{type(exc).__name__}: {exc}" return True, "ok" async def backend_health(self) -> BackendStatus: """Probe the configured storage backend through the data plane.""" return await self._dataplane.backend_health() async def dispose(self) -> None: """Release the engine's connection pool. Idempotent.""" await self._engine.dispose() # ---- internals ---------------------------------------------------------- async def _validate_retention_class(self, retention_class: str) -> None: async with self._engine.connect() as conn: row = ( await conn.execute( select(retention_classes.c.class_id).where( retention_classes.c.class_id == retention_class ) ) ).first() if row is None: raise ValueError(f"unknown retention class: {retention_class!r}") def _iso(value: datetime | None) -> str | None: if value is None: return None if value.tzinfo is None: value = value.replace(tzinfo=UTC) return value.isoformat() def _build_manifest( *, pkg_row: Any, file_rows: Sequence[Any], location_rows: Sequence[Any], retention_row: Any | None, actor: str, ) -> Manifest: files_entries = [ FileEntry( id=str(r.id), relative_path=r.relative_path, media_type=r.media_type, size_bytes=r.size_bytes, digest_algorithm=r.digest_algorithm, digest_primary_hex=r.digest_primary.hex(), digest_sha256_hex=r.digest_sha256.hex(), ) for r in file_rows ] receipts = [ ManifestStorageReceipt( file_id=str(r.artifact_file_id), backend_id=r.backend_id, content_address=r.content_address, retrieval_tier=r.retrieval_tier, status=r.status, ) for r in location_rows ] retention_class = ( retention_row.effective_class if retention_row is not None else pkg_row.retention_class ) retention_summary = RetentionSummary( retention_class=retention_class, expires_at=_iso(retention_row.current_expires_at) if retention_row else None, active_holds=[], last_retention_event_sequence=None, ) finalized_at_iso = datetime.now(UTC).isoformat() return Manifest( manifest_version=MANIFEST_VERSION, package=Package( id=str(pkg_row.id), name=pkg_row.name, producer=pkg_row.producer, subject=pkg_row.subject, retention_class=pkg_row.retention_class, status="finalized", created_at=_iso(pkg_row.created_at) or "", finalized_at=finalized_at_iso, expires_at=_iso(pkg_row.expires_at), metadata=dict(pkg_row.metadata) if pkg_row.metadata else {}, metadata_schema_id=str(pkg_row.metadata_schema_id) if pkg_row.metadata_schema_id else None, ), files=files_entries, storage_receipts=receipts, retention_summary=retention_summary, provenance=Provenance( source_commits={}, tool_versions={}, environment={}, ingest_actor=actor, ingest_timestamps={"finalized_at": finalized_at_iso}, ), )