Implement HTTP ingestion and retention lifecycle

This commit is contained in:
2026-05-16 23:10:21 +02:00
parent 2173f702c1
commit c33baa3635
15 changed files with 2478 additions and 69 deletions

View File

@@ -20,6 +20,7 @@ from __future__ import annotations
import uuid
from collections.abc import AsyncIterator, Sequence
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
@@ -39,7 +40,7 @@ from artifactstore.db.schema import (
from artifactstore.db.schema import (
events as events_t,
)
from artifactstore.events import RegistryViewWriter, make_event, tail, write
from artifactstore.events import RegistryViewWriter, fetch_since, make_event, tail, write
from artifactstore.events.model import Event
from artifactstore.identity import ContentAddress
from artifactstore.manifest import (
@@ -61,9 +62,21 @@ from artifactstore.manifest import (
)
from artifactstore.manifest.codec import decode as manifest_decode
from artifactstore.manifest.projection import jcs_projection
from artifactstore.retention import RetentionPolicy
from artifactstore.storage.spi import BackendStatus
__all__ = ["Registry"]
__all__ = [
"DuplicateRelativePathError",
"FileNotFoundError",
"FileRecord",
"IllegalPackageStateError",
"PackageNotFoundError",
"PackageRecord",
"Registry",
"RetentionClassRecord",
"RetentionStateError",
"RetentionStateRecord",
]
class PackageNotFoundError(KeyError):
@@ -82,6 +95,78 @@ class DuplicateRelativePathError(ValueError):
"""Raised when ingest_file would create a duplicate ``relative_path`` in a package."""
class RetentionStateError(ValueError):
"""Raised when a retention lifecycle operation is invalid."""
@dataclass(frozen=True, slots=True)
class PackageRecord:
"""Materialised package row projected into the registry API."""
id: UUID
name: str
producer: str
subject: str
retention_class: str
metadata: dict[str, Any]
status: str
manifest_digest_hex: str | None
created_at: datetime | None
finalized_at: datetime | None
expires_at: datetime | None
last_event_sequence: int
metadata_schema_id: UUID | None = None
@dataclass(frozen=True, slots=True)
class FileRecord:
"""Materialised file row plus its primary storage location."""
id: UUID
package_id: UUID
relative_path: str
media_type: str
size_bytes: int
digest_algorithm: str
digest_primary_hex: str
digest_sha256_hex: str
created_at: datetime | None
backend_id: str
content_address: str
object_key: str
retrieval_tier: str
storage_status: str
@dataclass(frozen=True, slots=True)
class RetentionClassRecord:
"""Configured retention-class row."""
class_id: str
default_duration_seconds: int | None
deletion_strategy: str
@dataclass(frozen=True, slots=True)
class RetentionStateRecord:
"""Materialised retention state for one package."""
package_id: UUID
current_expires_at: datetime | None
effective_class: str
active_hold_id: UUID | None
eligible_for_deletion: bool
_RETENTION_EVENT_TYPES = (
"v1.retention.default_applied",
"v1.retention.extended",
"v1.retention.hold_applied",
"v1.retention.hold_released",
"v1.retention.deletion_eligible",
)
class Registry:
"""Library-shaped orchestrator over events, dataplane, and views."""
@@ -90,10 +175,12 @@ class Registry:
engine: AsyncEngine,
dataplane: DataPlane,
view_writer: RegistryViewWriter | None = None,
retention_policy: RetentionPolicy | None = None,
) -> None:
self._engine = engine
self._dataplane = dataplane
self._view_writer = view_writer or RegistryViewWriter()
self._retention_policy = retention_policy or RetentionPolicy()
# ---- mutating operations ------------------------------------------------
@@ -108,7 +195,7 @@ class Registry:
metadata: dict[str, Any] | None = None,
) -> UUID:
"""Create a new package; returns its ``UUID``."""
await self._validate_retention_class(retention_class)
retention_class_row = await self._get_retention_class(retention_class)
package_id = uuid.uuid4()
payload = cbor2.dumps(
{
@@ -130,6 +217,31 @@ class Registry:
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
if written.created_at is None:
raise RuntimeError("created package event missing created_at")
decision = self._retention_policy.default_for(
retention_class=retention_class,
class_default_seconds=retention_class_row.default_duration_seconds,
base_time=_ensure_aware(written.created_at),
)
default_payload = cbor2.dumps(
{
"retention_class": decision.retention_class,
"default_duration_seconds": decision.default_duration_seconds,
"expires_at": _iso(decision.expires_at),
"eligible_for_deletion": decision.eligible_for_deletion,
},
canonical=True,
)
default_event = make_event(
event_type="v1.retention.default_applied",
subject_kind="retention",
subject_id=package_id,
actor=actor,
payload=default_payload,
)
written_default = await write(conn, default_event)
await self._view_writer.apply(conn, written_default)
return package_id
async def ingest_file(
@@ -265,6 +377,320 @@ class Registry:
# ---- read-only operations ----------------------------------------------
async def list_packages(
self,
*,
producer: str | None = None,
subject: str | None = None,
retention_class: str | None = None,
metadata_key: str | None = None,
metadata_value: str | None = None,
) -> list[PackageRecord]:
"""List package materialised views, optionally filtered for producer
workflows. Metadata key filtering is performed after the portable SQL
filters so the same behavior works on SQLite and PostgreSQL."""
stmt = select(artifact_packages).order_by(
artifact_packages.c.created_at,
artifact_packages.c.id,
)
if producer is not None:
stmt = stmt.where(artifact_packages.c.producer == producer)
if subject is not None:
stmt = stmt.where(artifact_packages.c.subject == subject)
if retention_class is not None:
stmt = stmt.where(artifact_packages.c.retention_class == retention_class)
async with self._engine.connect() as conn:
rows = (await conn.execute(stmt)).all()
records = [_package_record_from_row(r) for r in rows]
if metadata_key is None:
return records
filtered: list[PackageRecord] = []
for record in records:
if metadata_key not in record.metadata:
continue
if metadata_value is not None and str(record.metadata[metadata_key]) != metadata_value:
continue
filtered.append(record)
return filtered
async def get_package(self, package_id: UUID) -> PackageRecord:
"""Return one package materialised view."""
async with self._engine.connect() as conn:
row = (
await conn.execute(
select(artifact_packages).where(artifact_packages.c.id == package_id)
)
).first()
if row is None:
raise PackageNotFoundError(f"package not found: {package_id}")
return _package_record_from_row(row)
async def get_file_metadata(self, file_id: UUID) -> FileRecord:
"""Return file metadata plus the primary storage location."""
stmt = (
select(
artifact_files.c.id.label("file_id"),
artifact_files.c.package_id,
artifact_files.c.relative_path,
artifact_files.c.media_type,
artifact_files.c.size_bytes,
artifact_files.c.digest_algorithm,
artifact_files.c.digest_primary,
artifact_files.c.digest_sha256,
artifact_files.c.created_at,
storage_locations.c.backend_id,
storage_locations.c.content_address,
storage_locations.c.object_key,
storage_locations.c.retrieval_tier,
storage_locations.c.status.label("storage_status"),
)
.join(
storage_locations,
storage_locations.c.artifact_file_id == artifact_files.c.id,
)
.where(artifact_files.c.id == file_id)
.limit(1)
)
async with self._engine.connect() as conn:
row = (await conn.execute(stmt)).first()
if row is None:
raise FileNotFoundError(f"file not found: {file_id}")
return FileRecord(
id=row.file_id,
package_id=row.package_id,
relative_path=row.relative_path,
media_type=row.media_type,
size_bytes=row.size_bytes,
digest_algorithm=row.digest_algorithm,
digest_primary_hex=row.digest_primary.hex(),
digest_sha256_hex=row.digest_sha256.hex(),
created_at=row.created_at,
backend_id=row.backend_id,
content_address=row.content_address,
object_key=row.object_key,
retrieval_tier=row.retrieval_tier,
storage_status=row.storage_status,
)
async def list_retention_classes(self) -> list[RetentionClassRecord]:
"""Return configured retention classes."""
async with self._engine.connect() as conn:
rows = (
await conn.execute(
select(retention_classes).order_by(retention_classes.c.class_id)
)
).all()
return [
RetentionClassRecord(
class_id=r.class_id,
default_duration_seconds=r.default_duration_seconds,
deletion_strategy=r.deletion_strategy,
)
for r in rows
]
async def get_retention_state(self, package_id: UUID) -> RetentionStateRecord:
"""Return the retention materialised view for one package."""
async with self._engine.connect() as conn:
row = (
await conn.execute(
select(retention_state).where(retention_state.c.package_id == package_id)
)
).first()
if row is None:
raise PackageNotFoundError(f"package not found: {package_id}")
return _retention_state_record_from_row(row)
async def extend_retention(
self,
package_id: UUID,
*,
new_expires_at: datetime,
reason: str,
actor: str,
) -> RetentionStateRecord:
"""Extend package retention to a strictly later expiry."""
clean_reason = _require_reason(reason)
target_expires_at = _ensure_aware(new_expires_at)
current = await self.get_retention_state(package_id)
if current.current_expires_at is None:
raise RetentionStateError(
f"package {package_id} has no current expiry to extend"
)
current_expires_at = _ensure_aware(current.current_expires_at)
if target_expires_at <= current_expires_at:
raise RetentionStateError("new_expires_at must be strictly later than current expiry")
payload = cbor2.dumps(
{
"previous_expires_at": _iso(current_expires_at),
"new_expires_at": _iso(target_expires_at),
"reason": clean_reason,
},
canonical=True,
)
event = make_event(
event_type="v1.retention.extended",
subject_kind="retention",
subject_id=package_id,
actor=actor,
payload=payload,
)
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
return await self.get_retention_state(package_id)
async def apply_retention_hold(
self,
package_id: UUID,
*,
reason: str,
actor: str,
) -> UUID:
"""Apply one active hold to a package and return the hold id."""
clean_reason = _require_reason(reason)
current = await self.get_retention_state(package_id)
if current.active_hold_id is not None:
raise RetentionStateError(
f"package {package_id} already has active hold {current.active_hold_id}"
)
hold_id = uuid.uuid4()
payload = cbor2.dumps(
{
"hold_id": str(hold_id),
"reason": clean_reason,
},
canonical=True,
)
event = make_event(
event_type="v1.retention.hold_applied",
subject_kind="retention",
subject_id=package_id,
actor=actor,
payload=payload,
)
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
return hold_id
async def release_retention_hold(
self,
package_id: UUID,
hold_id: UUID,
*,
reason: str,
actor: str,
now: datetime | None = None,
) -> RetentionStateRecord:
"""Release the active hold, emitting eligibility if the package is expired."""
clean_reason = _require_reason(reason)
current = await self.get_retention_state(package_id)
if current.active_hold_id != hold_id:
raise RetentionStateError(f"hold {hold_id} is not active on package {package_id}")
payload = cbor2.dumps(
{
"hold_id": str(hold_id),
"reason": clean_reason,
},
canonical=True,
)
release_event = make_event(
event_type="v1.retention.hold_released",
subject_kind="retention",
subject_id=package_id,
actor=actor,
payload=payload,
)
effective_now = _ensure_aware(now or datetime.now(UTC))
expired_after_release = (
current.current_expires_at is not None
and _ensure_aware(current.current_expires_at) <= effective_now
and not current.eligible_for_deletion
)
async with self._engine.begin() as conn:
written_release = await write(conn, release_event)
await self._view_writer.apply(conn, written_release)
if expired_after_release:
eligible_event = make_event(
event_type="v1.retention.deletion_eligible",
subject_kind="retention",
subject_id=package_id,
actor=actor,
payload=_deletion_eligible_payload(
expires_at=current.current_expires_at,
reason="hold released after expiry",
),
)
written_eligible = await write(conn, eligible_event)
await self._view_writer.apply(conn, written_eligible)
return await self.get_retention_state(package_id)
async def sweep_deletion_eligibility(
self,
*,
now: datetime | None = None,
actor: str = "retention-sweeper",
) -> list[UUID]:
"""Mark expired, unheld packages as eligible for deletion."""
effective_now = _ensure_aware(now or datetime.now(UTC))
marked: list[UUID] = []
async with self._engine.begin() as conn:
rows = (await conn.execute(select(retention_state))).all()
for row in rows:
record = _retention_state_record_from_row(row)
if record.eligible_for_deletion or record.active_hold_id is not None:
continue
if record.current_expires_at is None:
continue
if _ensure_aware(record.current_expires_at) > effective_now:
continue
event = make_event(
event_type="v1.retention.deletion_eligible",
subject_kind="retention",
subject_id=record.package_id,
actor=actor,
payload=_deletion_eligible_payload(
expires_at=record.current_expires_at,
reason="retention expiry reached",
),
)
written = await write(conn, event)
await self._view_writer.apply(conn, written)
marked.append(record.package_id)
return marked
async def retention_history(self, package_id: UUID) -> list[Event]:
"""Return all retention events for a package, ordered by sequence."""
await self.get_package(package_id)
async with self._engine.connect() as conn:
rows = (
await conn.execute(
select(events_t)
.where(
events_t.c.subject_id == package_id,
events_t.c.event_type.in_(_RETENTION_EVENT_TYPES),
)
.order_by(events_t.c.sequence)
)
).all()
return [
Event(
event_type=r.event_type,
subject_kind=r.subject_kind,
subject_id=r.subject_id,
actor=r.actor,
payload=r.payload,
payload_digest=r.payload_digest,
sequence=r.sequence,
created_at=r.created_at,
)
for r in rows
]
async def get_manifest_bytes(self, package_id: UUID, *, format: str = "cbor") -> bytes:
"""Return the finalised manifest. ``format`` is ``cbor`` (canonical
CBOR, the wire form) or ``json`` (the JCS projection)."""
@@ -289,20 +715,27 @@ class Registry:
return jcs_projection(manifest_decode(payload_bytes))
raise ValueError(f"unknown manifest format: {format!r} (expected 'cbor' or 'json')")
async def get_file(self, file_id: UUID) -> AsyncIterator[bytes]:
async def get_file(
self,
file_id: UUID,
*,
byte_range: tuple[int, int] | None = None,
) -> AsyncIterator[bytes]:
"""Return an async byte iterator for the bytes of a stored file."""
record = await self.get_file_metadata(file_id)
ca = ContentAddress(record.content_address)
return await self._dataplane.serve_object(ca, byte_range=byte_range)
async def fetch_events(
self,
*,
since_sequence: int = 0,
limit: int = 100,
) -> list[Event]:
"""Fetch one ordered batch of events with sequence greater than
``since_sequence``."""
async with self._engine.connect() as conn:
row = (
await conn.execute(
select(storage_locations)
.where(storage_locations.c.artifact_file_id == file_id)
.limit(1)
)
).first()
if row is None:
raise FileNotFoundError(f"file not found: {file_id}")
ca = ContentAddress(row.content_address)
return await self._dataplane.serve_object(ca)
return await fetch_since(conn, since_sequence=since_sequence, limit=limit)
def tail_events(
self,
@@ -342,25 +775,83 @@ class Registry:
# ---- internals ----------------------------------------------------------
async def _validate_retention_class(self, retention_class: str) -> None:
async def _get_retention_class(self, retention_class: str) -> RetentionClassRecord:
async with self._engine.connect() as conn:
row = (
await conn.execute(
select(retention_classes.c.class_id).where(
retention_classes.c.class_id == retention_class
)
select(retention_classes).where(retention_classes.c.class_id == retention_class)
)
).first()
if row is None:
raise ValueError(f"unknown retention class: {retention_class!r}")
return RetentionClassRecord(
class_id=row.class_id,
default_duration_seconds=row.default_duration_seconds,
deletion_strategy=row.deletion_strategy,
)
def _iso(value: datetime | None) -> str | None:
if value is None:
return None
return _ensure_aware(value).isoformat()
def _ensure_aware(value: datetime) -> datetime:
if value.tzinfo is None:
value = value.replace(tzinfo=UTC)
return value.isoformat()
return value.replace(tzinfo=UTC)
return value.astimezone(UTC)
def _parse_iso_datetime(value: str | None) -> datetime | None:
if value is None:
return None
return _ensure_aware(datetime.fromisoformat(value))
def _require_reason(reason: str) -> str:
clean = reason.strip()
if not clean:
raise RetentionStateError("reason is required")
return clean
def _deletion_eligible_payload(*, expires_at: datetime | None, reason: str) -> bytes:
return cbor2.dumps(
{
"expires_at": _iso(expires_at),
"reason": reason,
},
canonical=True,
)
def _package_record_from_row(row: Any) -> PackageRecord:
return PackageRecord(
id=row.id,
name=row.name,
producer=row.producer,
subject=row.subject,
retention_class=row.retention_class,
metadata=dict(row.metadata) if row.metadata else {},
status=row.status,
manifest_digest_hex=row.manifest_digest.hex() if row.manifest_digest else None,
created_at=row.created_at,
finalized_at=row.finalized_at,
expires_at=row.expires_at,
last_event_sequence=row.last_event_sequence,
metadata_schema_id=row.metadata_schema_id,
)
def _retention_state_record_from_row(row: Any) -> RetentionStateRecord:
return RetentionStateRecord(
package_id=row.package_id,
current_expires_at=row.current_expires_at,
effective_class=row.effective_class,
active_hold_id=row.active_hold_id,
eligible_for_deletion=bool(row.eligible_for_deletion),
)
def _build_manifest(