Files
artifact-store/src/artifactstore/registry/__init__.py

1117 lines
39 KiB
Python

"""Registry orchestrator (ADR-0002 + ADR-0003 + ADR-0004).
The :class:`Registry` is the library-shaped entry point. It combines
``identity`` + ``manifest`` + ``events`` + ``dataplane`` into the six
operations the HTTP API and CLI both consume:
* ``create_package``
* ``ingest_file``
* ``finalize_package``
* ``get_manifest_bytes`` (canonical CBOR or JCS projection)
* ``get_file``
* ``tail_events``
Every mutating operation writes one event and updates the materialised
views inside the same database transaction, so the event log remains the
source of truth (ADR-0002).
"""
from __future__ import annotations
import uuid
from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any, cast
from uuid import UUID
import cbor2
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncEngine
from artifactstore.dataplane.spi import DataPlane, IngestHints
from artifactstore.db.schema import (
artifact_files,
artifact_packages,
metadata_schemas,
retention_classes,
retention_state,
storage_locations,
)
from artifactstore.db.schema import (
events as events_t,
)
from artifactstore.events import RegistryViewWriter, fetch_since, make_event, tail, write
from artifactstore.events.model import Event
from artifactstore.identity import ContentAddress
from artifactstore.manifest import (
MANIFEST_VERSION,
FileEntry,
Manifest,
Package,
Provenance,
RetentionSummary,
)
from artifactstore.manifest import (
StorageReceipt as ManifestStorageReceipt,
)
from artifactstore.manifest import (
encode as manifest_encode,
)
from artifactstore.manifest import (
manifest_digest as compute_manifest_digest,
)
from artifactstore.manifest.codec import decode as manifest_decode
from artifactstore.manifest.projection import jcs_projection
from artifactstore.retention import RetentionPolicy
from artifactstore.storage.spi import BackendStatus
__all__ = [
"DuplicateRelativePathError",
"FileNotFoundError",
"FileRecord",
"IllegalPackageStateError",
"MetadataSchemaRecord",
"PackageNotFoundError",
"PackageRecord",
"Registry",
"RetentionClassRecord",
"RetentionStateError",
"RetentionStateRecord",
"StorageVerificationRecord",
]
class PackageNotFoundError(KeyError):
"""Raised when a package id has no row in artifact_packages."""
class FileNotFoundError(KeyError):
"""Raised when a file id has no row in artifact_files / storage_locations."""
class IllegalPackageStateError(ValueError):
"""Raised when an operation is rejected because of package status."""
class DuplicateRelativePathError(ValueError):
"""Raised when ingest_file would create a duplicate ``relative_path`` in a package."""
class RetentionStateError(ValueError):
"""Raised when a retention lifecycle operation is invalid."""
@dataclass(frozen=True, slots=True)
class MetadataSchemaRecord:
"""Registered package metadata schema."""
id: UUID
slug: str
json_schema: dict[str, Any]
created_at: datetime | None
@dataclass(frozen=True, slots=True)
class PackageRecord:
"""Materialised package row projected into the registry API."""
id: UUID
name: str
producer: str
subject: str
retention_class: str
metadata: dict[str, Any]
status: str
manifest_digest_hex: str | None
created_at: datetime | None
finalized_at: datetime | None
expires_at: datetime | None
last_event_sequence: int
metadata_schema_id: UUID | None = None
@dataclass(frozen=True, slots=True)
class FileRecord:
"""Materialised file row plus its primary storage location."""
id: UUID
package_id: UUID
relative_path: str
media_type: str
size_bytes: int
digest_algorithm: str
digest_primary_hex: str
digest_sha256_hex: str
created_at: datetime | None
backend_id: str
content_address: str
object_key: str
retrieval_tier: str
storage_status: str
@dataclass(frozen=True, slots=True)
class RetentionClassRecord:
"""Configured retention-class row."""
class_id: str
default_duration_seconds: int | None
deletion_strategy: str
@dataclass(frozen=True, slots=True)
class RetentionStateRecord:
"""Materialised retention state for one package."""
package_id: UUID
current_expires_at: datetime | None
effective_class: str
active_hold_id: UUID | None
eligible_for_deletion: bool
@dataclass(frozen=True, slots=True)
class StorageVerificationRecord:
"""Result of verifying one storage location."""
storage_location_id: UUID
file_id: UUID
backend_id: str
content_address: str
verified: bool
mismatch: str | None
_RETENTION_EVENT_TYPES = (
"v1.retention.default_applied",
"v1.retention.extended",
"v1.retention.hold_applied",
"v1.retention.hold_released",
"v1.retention.deletion_eligible",
)
class Registry:
"""Library-shaped orchestrator over events, dataplane, and views."""
def __init__(
self,
engine: AsyncEngine,
dataplane: DataPlane,
view_writer: RegistryViewWriter | None = None,
retention_policy: RetentionPolicy | None = None,
backend_selector: Callable[[str, str], str | None] | None = None,
) -> None:
self._engine = engine
self._dataplane = dataplane
self._view_writer = view_writer or RegistryViewWriter()
self._retention_policy = retention_policy or RetentionPolicy()
self._backend_selector = backend_selector
# ---- mutating operations ------------------------------------------------
async def create_package(
self,
*,
name: str,
producer: str,
subject: str,
retention_class: str,
actor: str,
metadata: dict[str, Any] | None = None,
metadata_schema_slug: str | None = None,
) -> UUID:
"""Create a new package; returns its ``UUID``."""
retention_class_row = await self._get_retention_class(retention_class)
package_metadata = metadata or {}
metadata_schema_id = await self._validate_metadata_schema(
metadata_schema_slug,
package_metadata,
)
package_id = uuid.uuid4()
payload = cbor2.dumps(
{
"name": name,
"producer": producer,
"subject": subject,
"retention_class": retention_class,
"metadata": package_metadata,
"metadata_schema_id": str(metadata_schema_id) if metadata_schema_id else None,
},
canonical=True,
)
event = make_event(
event_type="v1.package.created",
subject_kind="package",
subject_id=package_id,
actor=actor,
payload=payload,
)
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
if written.created_at is None:
raise RuntimeError("created package event missing created_at")
decision = self._retention_policy.default_for(
retention_class=retention_class,
class_default_seconds=retention_class_row.default_duration_seconds,
base_time=_ensure_aware(written.created_at),
)
default_payload = cbor2.dumps(
{
"retention_class": decision.retention_class,
"default_duration_seconds": decision.default_duration_seconds,
"expires_at": _iso(decision.expires_at),
"eligible_for_deletion": decision.eligible_for_deletion,
},
canonical=True,
)
default_event = make_event(
event_type="v1.retention.default_applied",
subject_kind="retention",
subject_id=package_id,
actor=actor,
payload=default_payload,
)
written_default = await write(conn, default_event)
await self._view_writer.apply(conn, written_default)
return package_id
async def ingest_file(
self,
package_id: UUID,
*,
relative_path: str,
media_type: str,
stream: AsyncIterator[bytes],
actor: str,
) -> UUID:
"""Stream a file into the package; returns the new file id."""
async with self._engine.connect() as conn:
pkg_row = (
await conn.execute(
select(artifact_packages).where(artifact_packages.c.id == package_id)
)
).first()
if pkg_row is None:
raise PackageNotFoundError(f"package not found: {package_id}")
if pkg_row.status != "created":
raise IllegalPackageStateError(
f"package {package_id} cannot accept files (status={pkg_row.status})"
)
existing = (
await conn.execute(
select(artifact_files).where(
artifact_files.c.package_id == package_id,
artifact_files.c.relative_path == relative_path,
)
)
).first()
if existing is not None:
raise DuplicateRelativePathError(
f"relative_path {relative_path!r} already exists in package {package_id}"
)
selected_backend = (
self._backend_selector(pkg_row.producer, pkg_row.retention_class)
if self._backend_selector is not None
else None
)
ingest = await self._dataplane.ingest_stream(
stream,
hints=IngestHints(backend_id=selected_backend),
)
file_id = uuid.uuid4()
storage_location_id = uuid.uuid4()
payload = cbor2.dumps(
{
"file_id": str(file_id),
"relative_path": relative_path,
"media_type": media_type,
"size_bytes": ingest.size_bytes,
"digest_algorithm": ingest.primary_digest.algorithm,
"digest_primary_hex": ingest.primary_digest.hex,
"digest_sha256_hex": ingest.sha256_digest.hex,
"backend_id": ingest.receipt.backend_id,
"object_key": ingest.receipt.object_key,
"storage_location_id": str(storage_location_id),
"retrieval_tier": "hot",
"storage_status": "recorded",
},
canonical=True,
)
event = make_event(
event_type="v1.file.ingested",
subject_kind="package",
subject_id=package_id,
actor=actor,
payload=payload,
)
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
return file_id
async def finalize_package(self, package_id: UUID, *, actor: str) -> ContentAddress:
"""Finalise a package: build manifest, write it as the
v1.package.finalized event payload, and update the package's
terminal state. Returns the manifest's content address."""
async with self._engine.connect() as conn:
pkg_row = (
await conn.execute(
select(artifact_packages).where(artifact_packages.c.id == package_id)
)
).first()
if pkg_row is None:
raise PackageNotFoundError(f"package not found: {package_id}")
if pkg_row.status != "created":
raise IllegalPackageStateError(
f"package {package_id} cannot be finalized (status={pkg_row.status})"
)
file_rows = (
await conn.execute(
select(artifact_files)
.where(artifact_files.c.package_id == package_id)
.order_by(artifact_files.c.relative_path)
)
).all()
file_ids = [r.id for r in file_rows]
location_rows = (
(
await conn.execute(
select(storage_locations)
.where(storage_locations.c.artifact_file_id.in_(file_ids))
.order_by(storage_locations.c.artifact_file_id)
)
).all()
if file_ids
else []
)
retention_row = (
await conn.execute(
select(retention_state).where(retention_state.c.package_id == package_id)
)
).first()
manifest = _build_manifest(
pkg_row=pkg_row,
file_rows=file_rows,
location_rows=location_rows,
retention_row=retention_row,
actor=actor,
)
manifest_bytes = manifest_encode(manifest)
manifest_addr = compute_manifest_digest(manifest)
event = make_event(
event_type="v1.package.finalized",
subject_kind="package",
subject_id=package_id,
actor=actor,
payload=manifest_bytes,
)
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
return manifest_addr
# ---- read-only operations ----------------------------------------------
async def list_packages(
self,
*,
producer: str | None = None,
subject: str | None = None,
retention_class: str | None = None,
metadata_key: str | None = None,
metadata_value: str | None = None,
) -> list[PackageRecord]:
"""List package materialised views, optionally filtered for producer
workflows. Metadata key filtering is performed after the portable SQL
filters so the same behavior works on SQLite and PostgreSQL."""
stmt = select(artifact_packages).order_by(
artifact_packages.c.created_at,
artifact_packages.c.id,
)
if producer is not None:
stmt = stmt.where(artifact_packages.c.producer == producer)
if subject is not None:
stmt = stmt.where(artifact_packages.c.subject == subject)
if retention_class is not None:
stmt = stmt.where(artifact_packages.c.retention_class == retention_class)
async with self._engine.connect() as conn:
rows = (await conn.execute(stmt)).all()
records = [_package_record_from_row(r) for r in rows]
if metadata_key is None:
return records
filtered: list[PackageRecord] = []
for record in records:
if metadata_key not in record.metadata:
continue
if metadata_value is not None and str(record.metadata[metadata_key]) != metadata_value:
continue
filtered.append(record)
return filtered
async def get_package(self, package_id: UUID) -> PackageRecord:
"""Return one package materialised view."""
async with self._engine.connect() as conn:
row = (
await conn.execute(
select(artifact_packages).where(artifact_packages.c.id == package_id)
)
).first()
if row is None:
raise PackageNotFoundError(f"package not found: {package_id}")
return _package_record_from_row(row)
async def get_file_metadata(self, file_id: UUID) -> FileRecord:
"""Return file metadata plus the primary storage location."""
stmt = (
select(
artifact_files.c.id.label("file_id"),
artifact_files.c.package_id,
artifact_files.c.relative_path,
artifact_files.c.media_type,
artifact_files.c.size_bytes,
artifact_files.c.digest_algorithm,
artifact_files.c.digest_primary,
artifact_files.c.digest_sha256,
artifact_files.c.created_at,
storage_locations.c.backend_id,
storage_locations.c.content_address,
storage_locations.c.object_key,
storage_locations.c.retrieval_tier,
storage_locations.c.status.label("storage_status"),
)
.join(
storage_locations,
storage_locations.c.artifact_file_id == artifact_files.c.id,
)
.where(artifact_files.c.id == file_id)
.limit(1)
)
async with self._engine.connect() as conn:
row = (await conn.execute(stmt)).first()
if row is None:
raise FileNotFoundError(f"file not found: {file_id}")
return FileRecord(
id=row.file_id,
package_id=row.package_id,
relative_path=row.relative_path,
media_type=row.media_type,
size_bytes=row.size_bytes,
digest_algorithm=row.digest_algorithm,
digest_primary_hex=row.digest_primary.hex(),
digest_sha256_hex=row.digest_sha256.hex(),
created_at=row.created_at,
backend_id=row.backend_id,
content_address=row.content_address,
object_key=row.object_key,
retrieval_tier=row.retrieval_tier,
storage_status=row.storage_status,
)
async def list_retention_classes(self) -> list[RetentionClassRecord]:
"""Return configured retention classes."""
async with self._engine.connect() as conn:
rows = (
await conn.execute(
select(retention_classes).order_by(retention_classes.c.class_id)
)
).all()
return [
RetentionClassRecord(
class_id=r.class_id,
default_duration_seconds=r.default_duration_seconds,
deletion_strategy=r.deletion_strategy,
)
for r in rows
]
async def register_metadata_schema(
self,
*,
slug: str,
json_schema: dict[str, Any],
) -> UUID:
"""Register a package metadata JSON Schema, idempotent by slug."""
schema_id = uuid.uuid4()
async with self._engine.begin() as conn:
existing = (
await conn.execute(
select(metadata_schemas.c.id).where(metadata_schemas.c.slug == slug)
)
).first()
if existing is not None:
return UUID(str(existing.id))
await conn.execute(
metadata_schemas.insert().values(
id=schema_id,
slug=slug,
json_schema=json_schema,
)
)
return schema_id
async def get_metadata_schema(self, slug: str) -> MetadataSchemaRecord:
"""Return one registered metadata schema by slug."""
async with self._engine.connect() as conn:
row = (
await conn.execute(
select(metadata_schemas).where(metadata_schemas.c.slug == slug)
)
).first()
if row is None:
raise KeyError(f"metadata schema not found: {slug}")
return MetadataSchemaRecord(
id=row.id,
slug=row.slug,
json_schema=dict(row.json_schema),
created_at=row.created_at,
)
async def get_retention_state(self, package_id: UUID) -> RetentionStateRecord:
"""Return the retention materialised view for one package."""
async with self._engine.connect() as conn:
row = (
await conn.execute(
select(retention_state).where(retention_state.c.package_id == package_id)
)
).first()
if row is None:
raise PackageNotFoundError(f"package not found: {package_id}")
return _retention_state_record_from_row(row)
async def extend_retention(
self,
package_id: UUID,
*,
new_expires_at: datetime,
reason: str,
actor: str,
) -> RetentionStateRecord:
"""Extend package retention to a strictly later expiry."""
clean_reason = _require_reason(reason)
target_expires_at = _ensure_aware(new_expires_at)
current = await self.get_retention_state(package_id)
if current.current_expires_at is None:
raise RetentionStateError(
f"package {package_id} has no current expiry to extend"
)
current_expires_at = _ensure_aware(current.current_expires_at)
if target_expires_at <= current_expires_at:
raise RetentionStateError("new_expires_at must be strictly later than current expiry")
payload = cbor2.dumps(
{
"previous_expires_at": _iso(current_expires_at),
"new_expires_at": _iso(target_expires_at),
"reason": clean_reason,
},
canonical=True,
)
event = make_event(
event_type="v1.retention.extended",
subject_kind="retention",
subject_id=package_id,
actor=actor,
payload=payload,
)
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
return await self.get_retention_state(package_id)
async def apply_retention_hold(
self,
package_id: UUID,
*,
reason: str,
actor: str,
) -> UUID:
"""Apply one active hold to a package and return the hold id."""
clean_reason = _require_reason(reason)
current = await self.get_retention_state(package_id)
if current.active_hold_id is not None:
raise RetentionStateError(
f"package {package_id} already has active hold {current.active_hold_id}"
)
hold_id = uuid.uuid4()
payload = cbor2.dumps(
{
"hold_id": str(hold_id),
"reason": clean_reason,
},
canonical=True,
)
event = make_event(
event_type="v1.retention.hold_applied",
subject_kind="retention",
subject_id=package_id,
actor=actor,
payload=payload,
)
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
return hold_id
async def release_retention_hold(
self,
package_id: UUID,
hold_id: UUID,
*,
reason: str,
actor: str,
now: datetime | None = None,
) -> RetentionStateRecord:
"""Release the active hold, emitting eligibility if the package is expired."""
clean_reason = _require_reason(reason)
current = await self.get_retention_state(package_id)
if current.active_hold_id != hold_id:
raise RetentionStateError(f"hold {hold_id} is not active on package {package_id}")
payload = cbor2.dumps(
{
"hold_id": str(hold_id),
"reason": clean_reason,
},
canonical=True,
)
release_event = make_event(
event_type="v1.retention.hold_released",
subject_kind="retention",
subject_id=package_id,
actor=actor,
payload=payload,
)
effective_now = _ensure_aware(now or datetime.now(UTC))
expired_after_release = (
current.current_expires_at is not None
and _ensure_aware(current.current_expires_at) <= effective_now
and not current.eligible_for_deletion
)
async with self._engine.begin() as conn:
written_release = await write(conn, release_event)
await self._view_writer.apply(conn, written_release)
if expired_after_release:
eligible_event = make_event(
event_type="v1.retention.deletion_eligible",
subject_kind="retention",
subject_id=package_id,
actor=actor,
payload=_deletion_eligible_payload(
expires_at=current.current_expires_at,
reason="hold released after expiry",
),
)
written_eligible = await write(conn, eligible_event)
await self._view_writer.apply(conn, written_eligible)
return await self.get_retention_state(package_id)
async def sweep_deletion_eligibility(
self,
*,
now: datetime | None = None,
actor: str = "retention-sweeper",
) -> list[UUID]:
"""Mark expired, unheld packages as eligible for deletion."""
effective_now = _ensure_aware(now or datetime.now(UTC))
marked: list[UUID] = []
async with self._engine.begin() as conn:
rows = (await conn.execute(select(retention_state))).all()
for row in rows:
record = _retention_state_record_from_row(row)
if record.eligible_for_deletion or record.active_hold_id is not None:
continue
if record.current_expires_at is None:
continue
if _ensure_aware(record.current_expires_at) > effective_now:
continue
event = make_event(
event_type="v1.retention.deletion_eligible",
subject_kind="retention",
subject_id=record.package_id,
actor=actor,
payload=_deletion_eligible_payload(
expires_at=record.current_expires_at,
reason="retention expiry reached",
),
)
written = await write(conn, event)
await self._view_writer.apply(conn, written)
marked.append(record.package_id)
return marked
async def retention_history(self, package_id: UUID) -> list[Event]:
"""Return all retention events for a package, ordered by sequence."""
await self.get_package(package_id)
async with self._engine.connect() as conn:
rows = (
await conn.execute(
select(events_t)
.where(
events_t.c.subject_id == package_id,
events_t.c.event_type.in_(_RETENTION_EVENT_TYPES),
)
.order_by(events_t.c.sequence)
)
).all()
return [
Event(
event_type=r.event_type,
subject_kind=r.subject_kind,
subject_id=r.subject_id,
actor=r.actor,
payload=r.payload,
payload_digest=r.payload_digest,
sequence=r.sequence,
created_at=r.created_at,
)
for r in rows
]
async def verify_storage_locations(
self,
*,
backend_id: str | None = None,
actor: str = "storage-verifier",
) -> list[StorageVerificationRecord]:
"""Re-read storage locations and emit verification events."""
stmt = select(storage_locations)
if backend_id is not None:
stmt = stmt.where(storage_locations.c.backend_id == backend_id)
async with self._engine.connect() as conn:
rows = (await conn.execute(stmt.order_by(storage_locations.c.id))).all()
results: list[StorageVerificationRecord] = []
for row in rows:
ca = ContentAddress(row.content_address)
verified = False
mismatch: str | None = None
actual_size_bytes: int | None = None
actual_primary_hex: str | None = None
actual_sha256_hex: str | None = None
try:
result = await self._dataplane.verify_object(ca, backend_id=row.backend_id)
verified = result.verified
mismatch = result.mismatch
actual_size_bytes = result.actual_size_bytes
actual_primary_hex = result.actual_primary_digest.hex
actual_sha256_hex = result.actual_sha256_digest.hex
except Exception as exc:
mismatch = f"{type(exc).__name__}: {exc}"
payload = cbor2.dumps(
{
"storage_location_id": str(row.id),
"file_id": str(row.artifact_file_id),
"backend_id": row.backend_id,
"content_address": row.content_address,
"verified": verified,
"mismatch": mismatch,
"actual_size_bytes": actual_size_bytes,
"actual_primary_hex": actual_primary_hex,
"actual_sha256_hex": actual_sha256_hex,
},
canonical=True,
)
event = make_event(
event_type="v1.storage.location_verified",
subject_kind="storage",
subject_id=row.artifact_file_id,
actor=actor,
payload=payload,
)
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
results.append(
StorageVerificationRecord(
storage_location_id=row.id,
file_id=row.artifact_file_id,
backend_id=row.backend_id,
content_address=row.content_address,
verified=verified,
mismatch=mismatch,
)
)
return results
async def get_manifest_bytes(self, package_id: UUID, *, format: str = "cbor") -> bytes:
"""Return the finalised manifest. ``format`` is ``cbor`` (canonical
CBOR, the wire form) or ``json`` (the JCS projection)."""
async with self._engine.connect() as conn:
row = (
await conn.execute(
select(events_t.c.payload)
.where(
events_t.c.event_type == "v1.package.finalized",
events_t.c.subject_id == package_id,
)
.order_by(events_t.c.sequence.desc())
.limit(1)
)
).first()
if row is None:
raise PackageNotFoundError(f"no finalized manifest for package {package_id}")
payload_bytes: bytes = row.payload
if format == "cbor":
return payload_bytes
if format == "json":
return jcs_projection(manifest_decode(payload_bytes))
raise ValueError(f"unknown manifest format: {format!r} (expected 'cbor' or 'json')")
async def get_file(
self,
file_id: UUID,
*,
byte_range: tuple[int, int] | None = None,
) -> AsyncIterator[bytes]:
"""Return an async byte iterator for the bytes of a stored file."""
record = await self.get_file_metadata(file_id)
ca = ContentAddress(record.content_address)
return await self._dataplane.serve_object(
ca,
byte_range=byte_range,
backend_id=record.backend_id,
)
async def fetch_events(
self,
*,
since_sequence: int = 0,
limit: int = 100,
) -> list[Event]:
"""Fetch one ordered batch of events with sequence greater than
``since_sequence``."""
async with self._engine.connect() as conn:
return await fetch_since(conn, since_sequence=since_sequence, limit=limit)
def tail_events(
self,
*,
since_sequence: int = 0,
poll_interval_seconds: float = 0.5,
) -> AsyncIterator[Event]:
"""Long-poll the event log; pass-through to :func:`events.tail`."""
return tail(
self._engine,
since_sequence=since_sequence,
poll_interval_seconds=poll_interval_seconds,
)
# ---- health / lifecycle -------------------------------------------------
async def db_health(self) -> tuple[bool, str]:
"""Probe the database with ``SELECT 1``.
Returns ``(healthy, detail)``; ``detail`` is "ok" on success and the
exception message on failure.
"""
try:
async with self._engine.connect() as conn:
await conn.execute(text("SELECT 1"))
except Exception as exc:
return False, f"{type(exc).__name__}: {exc}"
return True, "ok"
async def backend_health(self) -> BackendStatus:
"""Probe the configured storage backend through the data plane."""
return await self._dataplane.backend_health()
async def backend_health_all(self) -> list[BackendStatus]:
"""Probe every configured storage backend when the data plane supports it."""
probe_all = getattr(self._dataplane, "backend_health_all", None)
if probe_all is None:
return [await self.backend_health()]
typed_probe = cast(Callable[[], Awaitable[list[BackendStatus]]], probe_all)
return await typed_probe()
async def failed_storage_locations_count(self) -> int:
"""Count storage locations currently marked failed."""
async with self._engine.connect() as conn:
rows = (
await conn.execute(
select(storage_locations.c.id).where(storage_locations.c.status == "failed")
)
).all()
return len(rows)
async def dispose(self) -> None:
"""Release the engine's connection pool. Idempotent."""
await self._engine.dispose()
# ---- internals ----------------------------------------------------------
async def _get_retention_class(self, retention_class: str) -> RetentionClassRecord:
async with self._engine.connect() as conn:
row = (
await conn.execute(
select(retention_classes).where(retention_classes.c.class_id == retention_class)
)
).first()
if row is None:
raise ValueError(f"unknown retention class: {retention_class!r}")
return RetentionClassRecord(
class_id=row.class_id,
default_duration_seconds=row.default_duration_seconds,
deletion_strategy=row.deletion_strategy,
)
async def _validate_metadata_schema(
self,
slug: str | None,
metadata: dict[str, Any],
) -> UUID | None:
if slug is None:
return None
try:
schema = await self.get_metadata_schema(slug)
except KeyError as exc:
raise ValueError(str(exc)) from exc
required = schema.json_schema.get("required", [])
if not isinstance(required, list):
raise ValueError(f"metadata schema {slug!r} has invalid required list")
missing = [key for key in required if isinstance(key, str) and key not in metadata]
if missing:
raise ValueError(f"metadata missing required schema keys: {', '.join(missing)}")
return schema.id
def _iso(value: datetime | None) -> str | None:
if value is None:
return None
return _ensure_aware(value).isoformat()
def _ensure_aware(value: datetime) -> datetime:
if value.tzinfo is None:
return value.replace(tzinfo=UTC)
return value.astimezone(UTC)
def _parse_iso_datetime(value: str | None) -> datetime | None:
if value is None:
return None
return _ensure_aware(datetime.fromisoformat(value))
def _require_reason(reason: str) -> str:
clean = reason.strip()
if not clean:
raise RetentionStateError("reason is required")
return clean
def _deletion_eligible_payload(*, expires_at: datetime | None, reason: str) -> bytes:
return cbor2.dumps(
{
"expires_at": _iso(expires_at),
"reason": reason,
},
canonical=True,
)
def _package_record_from_row(row: Any) -> PackageRecord:
return PackageRecord(
id=row.id,
name=row.name,
producer=row.producer,
subject=row.subject,
retention_class=row.retention_class,
metadata=dict(row.metadata) if row.metadata else {},
status=row.status,
manifest_digest_hex=row.manifest_digest.hex() if row.manifest_digest else None,
created_at=row.created_at,
finalized_at=row.finalized_at,
expires_at=row.expires_at,
last_event_sequence=row.last_event_sequence,
metadata_schema_id=row.metadata_schema_id,
)
def _retention_state_record_from_row(row: Any) -> RetentionStateRecord:
return RetentionStateRecord(
package_id=row.package_id,
current_expires_at=row.current_expires_at,
effective_class=row.effective_class,
active_hold_id=row.active_hold_id,
eligible_for_deletion=bool(row.eligible_for_deletion),
)
def _build_manifest(
*,
pkg_row: Any,
file_rows: Sequence[Any],
location_rows: Sequence[Any],
retention_row: Any | None,
actor: str,
) -> Manifest:
files_entries = [
FileEntry(
id=str(r.id),
relative_path=r.relative_path,
media_type=r.media_type,
size_bytes=r.size_bytes,
digest_algorithm=r.digest_algorithm,
digest_primary_hex=r.digest_primary.hex(),
digest_sha256_hex=r.digest_sha256.hex(),
)
for r in file_rows
]
receipts = [
ManifestStorageReceipt(
file_id=str(r.artifact_file_id),
backend_id=r.backend_id,
content_address=r.content_address,
retrieval_tier=r.retrieval_tier,
status=r.status,
)
for r in location_rows
]
retention_class = (
retention_row.effective_class if retention_row is not None else pkg_row.retention_class
)
retention_summary = RetentionSummary(
retention_class=retention_class,
expires_at=_iso(retention_row.current_expires_at) if retention_row else None,
active_holds=[],
last_retention_event_sequence=None,
)
finalized_at_iso = datetime.now(UTC).isoformat()
return Manifest(
manifest_version=MANIFEST_VERSION,
package=Package(
id=str(pkg_row.id),
name=pkg_row.name,
producer=pkg_row.producer,
subject=pkg_row.subject,
retention_class=pkg_row.retention_class,
status="finalized",
created_at=_iso(pkg_row.created_at) or "",
finalized_at=finalized_at_iso,
expires_at=_iso(pkg_row.expires_at),
metadata=dict(pkg_row.metadata) if pkg_row.metadata else {},
metadata_schema_id=str(pkg_row.metadata_schema_id)
if pkg_row.metadata_schema_id
else None,
),
files=files_entries,
storage_receipts=receipts,
retention_summary=retention_summary,
provenance=Provenance(
source_commits={},
tool_versions={},
environment={},
ingest_actor=actor,
ingest_timestamps={"finalized_at": finalized_at_iso},
),
)