diff --git a/.env.example b/.env.example
index ffa1b09..29f1309 100644
--- a/.env.example
+++ b/.env.example
@@ -14,3 +14,20 @@ ARTIFACTSTORE_STORAGE_LOCAL_ROOT=./var/storage
# Python logging level: DEBUG | INFO | WARNING | ERROR
ARTIFACTSTORE_LOG_LEVEL=INFO
+
+# Shared-secret bearer tokens for the HTTP API. Comma- or newline-separated.
+# Protected endpoints return 401 until at least one token is configured.
+ARTIFACTSTORE_AUTH_TOKENS=dev-token
+
+# Read endpoints are authenticated by default. Set true only for local demos.
+ARTIFACTSTORE_ANON_READ=false
+
+# Defaults used by `artifactstore push` and `artifactstore manifest`.
+ARTIFACTSTORE_API_URL=http://127.0.0.1:8000
+ARTIFACTSTORE_API_TOKEN=dev-token
+
+# Optional TOML file overriding retention class default durations.
+ARTIFACTSTORE_RETENTION_CONFIG_PATH=
+
+# Default interval for external schedulers that run `artifactstore retention sweep`.
+ARTIFACTSTORE_RETENTION_SWEEP_INTERVAL_SECONDS=3600
diff --git a/docs/OPERATOR.md b/docs/OPERATOR.md
index c58282d..729e4a3 100644
--- a/docs/OPERATOR.md
+++ b/docs/OPERATOR.md
@@ -1,12 +1,10 @@
# Operator Guide
-Status: v0.1 (WP-0001 baseline)
+Status: v0.1 (WP-0003 baseline)
Updated: 2026-05-16
This guide is the user manual for running `artifact-store` v0.1 — the
-library + CLI + minimal HTTP app that landed in WP-0001. Ingest, finalize,
-and retrieve workflows go through the Python library today; the HTTP
-upload API arrives in WP-0002.
+library, CLI, HTTP ingestion API, manifest surface, and retention lifecycle.
For architectural background see
[ARCHITECTURE-BLUEPRINT.md](ARCHITECTURE-BLUEPRINT.md), the ADRs under
@@ -50,9 +48,42 @@ All settings are prefixed with ``ARTIFACTSTORE_`` and read by
| `ARTIFACTSTORE_DATABASE_URL` | `sqlite+aiosqlite:///./var/artifactstore.db` | SQLAlchemy async URL. Alembic translates `+aiosqlite` and `+asyncpg` to their sync drivers at migrate-time. |
| `ARTIFACTSTORE_STORAGE_LOCAL_ROOT`| `./var/storage` | Root directory for the local filesystem storage backend. Created on first use. |
| `ARTIFACTSTORE_LOG_LEVEL` | `INFO` | Python logging level (`DEBUG` / `INFO` / `WARNING` / `ERROR`). |
+| `ARTIFACTSTORE_AUTH_TOKENS` | empty | Comma- or newline-separated shared-secret bearer tokens for the HTTP API. |
+| `ARTIFACTSTORE_ANON_READ` | `false` | Set `true` only for local demos where read endpoints may be anonymous. |
+| `ARTIFACTSTORE_API_URL` | `http://127.0.0.1:8000` | Default API base URL used by HTTP-backed CLI commands. |
+| `ARTIFACTSTORE_API_TOKEN` | empty | Default bearer token used by HTTP-backed CLI commands. |
+| `ARTIFACTSTORE_RETENTION_CONFIG_PATH` | empty | Optional TOML file overriding retention-class default durations. |
+| `ARTIFACTSTORE_RETENTION_SWEEP_INTERVAL_SECONDS` | `3600` | Default interval for external schedulers that invoke the retention sweeper. |
See [`.env.example`](../.env.example) for the canonical template.
+### Retention policy TOML
+
+By default, retention durations come from the seeded `retention_classes`
+rows. Operators can override the default duration per class with
+`ARTIFACTSTORE_RETENTION_CONFIG_PATH`:
+
+```toml
+[retention_classes.transient]
+default_duration_seconds = 86400
+
+[retention_classes."raw-evidence"]
+default_duration_seconds = 7776000
+
+[retention_classes."summary-evidence"]
+default_duration_seconds = 31536000
+
+[retention_classes."release-evidence"]
+default_duration_seconds = 220752000
+
+[retention_classes."permanent-record"]
+# Omit default_duration_seconds for no expiry.
+```
+
+Run `artifactstore retention sweep` from cron or another scheduler to mark
+expired, unheld packages eligible for deletion. This work only records
+eligibility; it never deletes bytes.
+
## Database backends
### SQLite (development default)
@@ -113,21 +144,27 @@ lands in WP-0004.
| `artifactstore migrate` | Run `alembic upgrade head` against the configured database. |
| `artifactstore replay` | Truncate every materialised view and rebuild it from the event log; prints the highest sequence applied. |
| `artifactstore health` | JSON liveness summary (db, backend, status). Same payload as the HTTP `/health` endpoint. |
+| `artifactstore push
` | Push a directory through the HTTP API and finalize the package. |
+| `artifactstore manifest ` | Fetch the JSON manifest projection through the HTTP API. |
+| `artifactstore retention sweep` | Run one deletion-eligibility sweep against the configured DB. |
The CLI is a thin client over `artifactstore.registry.Registry`
(see [ADR-0005](adr/0005-v1-tech-stack.md)).
## HTTP reference (v0.1)
-| Route | Purpose |
-|----------------|---------|
-| `GET /` | Service banner (scaffold marker). |
-| `GET /health` | Liveness summary. Returns ``{status, db, backend, version}``. `status` is `ok` only when both the DB probe (`SELECT 1`) and the backend `health()` succeed. |
-| `GET /docs` | FastAPI's interactive OpenAPI docs (`/openapi.json` underneath). |
+| Route family | Purpose |
+|-----------------------|---------|
+| `GET /`, `GET /health` | Anonymous service banner and liveness summary. |
+| `GET /docs`, `GET /openapi.json` | FastAPI's interactive OpenAPI docs and generated schema. |
+| `/packages...` | Create, list, inspect, upload files to, finalize, and retrieve manifests for packages. |
+| `/files...` | File metadata and byte downloads, including single-range reads. |
+| `/uploads...` | Upload-session wire shape for whole-body v1 uploads. |
+| `/packages/{id}/retention...` | Extend retention, apply/release holds, and read retention history. |
+| `GET /events` | Long-poll event feed, CBOR by default or JSON with `Accept: application/json`. |
-Package CRUD, file upload/download, manifest retrieval, retention controls,
-and the event stream all land in WP-0002–WP-0003. Today they are reachable
-via the Python library.
+All non-health routes require a bearer token unless
+`ARTIFACTSTORE_ANON_READ=true` is set for read endpoints.
## End-to-end smoke test (Python library)
diff --git a/src/artifactstore/api/http/__init__.py b/src/artifactstore/api/http/__init__.py
index 23a1605..351f474 100644
--- a/src/artifactstore/api/http/__init__.py
+++ b/src/artifactstore/api/http/__init__.py
@@ -1,41 +1,138 @@
-"""FastAPI application — HTTP surface for the registry.
-
-T014 ships a minimal app with two routes:
-
-* ``GET /`` — service banner.
-* ``GET /health`` — registry liveness + DB connectivity + storage backend.
-
-Richer endpoints (package CRUD, file upload, manifest retrieval, event
-stream) land in workplan WP-0002. The app is built through
-:func:`create_app` so tests can inject their own settings.
-"""
+"""FastAPI application — HTTP surface for the registry."""
from __future__ import annotations
-from contextlib import asynccontextmanager
-from typing import Any
+import asyncio
+import re
+import secrets
+from collections.abc import AsyncIterator, Mapping
+from contextlib import asynccontextmanager, suppress
+from dataclasses import dataclass
+from datetime import UTC, datetime
+from email import policy
+from email.parser import BytesParser
+from http import HTTPStatus
+from typing import Any, NoReturn, cast
+from uuid import UUID, uuid4
-from fastapi import Depends, FastAPI, Request
+import cbor2
+import jcs
+from fastapi import Depends, FastAPI, Header, HTTPException, Query, Request, Response, status
+from fastapi.exceptions import RequestValidationError
+from fastapi.responses import JSONResponse, StreamingResponse
+from pydantic import BaseModel, Field
from artifactstore import __version__
from artifactstore.app import build_registry
-from artifactstore.config import Settings
-from artifactstore.registry import Registry
+from artifactstore.config import Settings, get_settings
+from artifactstore.events.model import Event
+from artifactstore.registry import (
+ DuplicateRelativePathError,
+ FileNotFoundError,
+ FileRecord,
+ IllegalPackageStateError,
+ PackageNotFoundError,
+ PackageRecord,
+ Registry,
+ RetentionClassRecord,
+ RetentionStateError,
+ RetentionStateRecord,
+)
__all__ = ["app", "create_app"]
+_RANGE_RE = re.compile(r"^bytes=(?P\d*)-(?P\d*)$")
+_CONTENT_RANGE_RE = re.compile(r"^bytes (?P\d+)-(?P\d+)/(?P\d+|\*)$")
+_MAX_EVENT_LIMIT = 500
+
+
+class PackageCreate(BaseModel):
+ name: str = Field(min_length=1)
+ producer: str = Field(min_length=1)
+ subject: str = Field(min_length=1)
+ retention_class: str = Field(min_length=1)
+ metadata: dict[str, Any] = Field(default_factory=dict)
+
+
+class UploadCreate(BaseModel):
+ expected_size_bytes: int | None = Field(default=None, ge=0)
+ media_type: str | None = None
+
+
+class UploadComplete(BaseModel):
+ package_id: UUID
+ relative_path: str = Field(min_length=1)
+ media_type: str | None = None
+
+
+class RetentionExtensionCreate(BaseModel):
+ new_expires_at: datetime
+ reason: str = Field(min_length=1)
+
+
+class RetentionHoldCreate(BaseModel):
+ reason: str = Field(min_length=1)
+
+
+class RetentionHoldRelease(BaseModel):
+ reason: str = Field(min_length=1)
+
+
+@dataclass(slots=True)
+class MultipartFile:
+ filename: str
+ content_type: str
+ content: bytes
+ fields: dict[str, str]
+
+
+@dataclass(slots=True)
+class UploadSession:
+ upload_id: UUID
+ buffer: bytearray
+ expected_size_bytes: int | None
+ media_type: str | None
+ status: str = "open"
+
def get_registry(request: Request) -> Registry:
return request.app.state.registry # type: ignore[no-any-return]
+def get_http_settings(request: Request) -> Settings:
+ return request.app.state.settings # type: ignore[no-any-return]
+
+
+async def require_read_auth(
+ request: Request,
+ authorization: str | None = Header(default=None, alias="Authorization"),
+) -> str:
+ settings = get_http_settings(request)
+ if settings.anon_read:
+ return "anonymous"
+ return _require_bearer(settings, authorization)
+
+
+async def require_write_auth(
+ request: Request,
+ authorization: str | None = Header(default=None, alias="Authorization"),
+) -> str:
+ settings = get_http_settings(request)
+ return _require_bearer(settings, authorization)
+
+
def create_app(settings: Settings | None = None) -> FastAPI:
"""Build the FastAPI app. Lifespan owns the registry instance."""
+ effective_settings = settings or get_settings()
+
@asynccontextmanager
- async def lifespan(application: FastAPI) -> Any:
- registry = build_registry(settings)
+ async def lifespan(application: FastAPI) -> AsyncIterator[None]:
+ registry = build_registry(effective_settings)
application.state.registry = registry
+ application.state.settings = effective_settings
+ application.state.upload_sessions = {}
+ application.state.upload_lock = asyncio.Lock()
try:
yield
finally:
@@ -47,6 +144,26 @@ def create_app(settings: Settings | None = None) -> FastAPI:
lifespan=lifespan,
)
+ @application.exception_handler(HTTPException)
+ async def http_exception_handler(_request: Request, exc: HTTPException) -> JSONResponse:
+ return _problem_response(
+ status_code=exc.status_code,
+ title=_status_phrase(exc.status_code),
+ detail=str(exc.detail),
+ headers=exc.headers,
+ )
+
+ @application.exception_handler(RequestValidationError)
+ async def validation_exception_handler(
+ _request: Request,
+ exc: RequestValidationError,
+ ) -> JSONResponse:
+ return _problem_response(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ title="Validation error",
+ detail=str(exc.errors()),
+ )
+
@application.get("/")
def root() -> dict[str, str]:
return {
@@ -74,7 +191,656 @@ def create_app(settings: Settings | None = None) -> FastAPI:
},
}
+ @application.get("/backends")
+ async def backends(
+ _actor: str = Depends(require_read_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> dict[str, Any]:
+ backend_status = await registry.backend_health()
+ return {
+ "backends": [
+ {
+ "backend_id": backend_status.backend_id,
+ "healthy": backend_status.healthy,
+ "detail": backend_status.detail,
+ "free_bytes": backend_status.free_bytes,
+ "total_bytes": backend_status.total_bytes,
+ }
+ ]
+ }
+
+ @application.get("/retention-classes")
+ async def retention_classes(
+ _actor: str = Depends(require_read_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> dict[str, Any]:
+ classes = await registry.list_retention_classes()
+ return {"retention_classes": [_retention_class_dict(c) for c in classes]}
+
+ @application.post("/packages", status_code=status.HTTP_201_CREATED)
+ async def create_package(
+ body: PackageCreate,
+ actor: str = Depends(require_write_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> dict[str, Any]:
+ try:
+ package_id = await registry.create_package(
+ name=body.name,
+ producer=body.producer,
+ subject=body.subject,
+ retention_class=body.retention_class,
+ actor=actor,
+ metadata=body.metadata,
+ )
+ return _package_dict(await registry.get_package(package_id))
+ except ValueError as exc:
+ _raise_problem(status.HTTP_400_BAD_REQUEST, str(exc))
+
+ @application.get("/packages")
+ async def list_packages(
+ producer: str | None = None,
+ subject: str | None = None,
+ retention_class: str | None = None,
+ metadata_key: str | None = None,
+ metadata_value: str | None = None,
+ _actor: str = Depends(require_read_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> dict[str, Any]:
+ packages = await registry.list_packages(
+ producer=producer,
+ subject=subject,
+ retention_class=retention_class,
+ metadata_key=metadata_key,
+ metadata_value=metadata_value,
+ )
+ return {"packages": [_package_dict(p) for p in packages]}
+
+ @application.get("/packages/{package_id}")
+ async def get_package(
+ package_id: UUID,
+ _actor: str = Depends(require_read_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> dict[str, Any]:
+ try:
+ return _package_dict(await registry.get_package(package_id))
+ except PackageNotFoundError as exc:
+ _raise_problem(status.HTTP_404_NOT_FOUND, str(exc))
+
+ @application.post("/packages/{package_id}/files", status_code=status.HTTP_201_CREATED)
+ async def upload_package_file(
+ package_id: UUID,
+ request: Request,
+ actor: str = Depends(require_write_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> dict[str, Any]:
+ multipart = await _read_single_file_multipart(request)
+ relative_path = multipart.fields.get("relative_path") or multipart.filename
+ media_type = multipart.fields.get("media_type") or multipart.content_type
+ try:
+ file_id = await registry.ingest_file(
+ package_id,
+ relative_path=relative_path,
+ media_type=media_type or "application/octet-stream",
+ stream=_bytes_stream(multipart.content),
+ actor=actor,
+ )
+ return _file_dict(await registry.get_file_metadata(file_id))
+ except PackageNotFoundError as exc:
+ _raise_problem(status.HTTP_404_NOT_FOUND, str(exc))
+ except DuplicateRelativePathError as exc:
+ _raise_problem(status.HTTP_409_CONFLICT, str(exc))
+ except IllegalPackageStateError as exc:
+ _raise_problem(status.HTTP_409_CONFLICT, str(exc))
+
+ @application.post("/packages/{package_id}/finalize")
+ async def finalize_package(
+ package_id: UUID,
+ actor: str = Depends(require_write_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> dict[str, Any]:
+ try:
+ await registry.finalize_package(package_id, actor=actor)
+ return _package_dict(await registry.get_package(package_id))
+ except PackageNotFoundError as exc:
+ _raise_problem(status.HTTP_404_NOT_FOUND, str(exc))
+ except IllegalPackageStateError as exc:
+ _raise_problem(status.HTTP_409_CONFLICT, str(exc))
+
+ @application.post("/packages/{package_id}/retention/extensions")
+ async def extend_retention(
+ package_id: UUID,
+ body: RetentionExtensionCreate,
+ actor: str = Depends(require_write_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> dict[str, Any]:
+ try:
+ state = await registry.extend_retention(
+ package_id,
+ new_expires_at=body.new_expires_at,
+ reason=body.reason,
+ actor=actor,
+ )
+ return _retention_state_dict(state)
+ except PackageNotFoundError as exc:
+ _raise_problem(status.HTTP_404_NOT_FOUND, str(exc))
+ except RetentionStateError as exc:
+ _raise_problem(status.HTTP_409_CONFLICT, str(exc))
+
+ @application.post("/packages/{package_id}/retention/holds", status_code=status.HTTP_201_CREATED)
+ async def apply_hold(
+ package_id: UUID,
+ body: RetentionHoldCreate,
+ actor: str = Depends(require_write_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> dict[str, Any]:
+ try:
+ hold_id = await registry.apply_retention_hold(
+ package_id,
+ reason=body.reason,
+ actor=actor,
+ )
+ state = await registry.get_retention_state(package_id)
+ return {"hold_id": str(hold_id), "retention": _retention_state_dict(state)}
+ except PackageNotFoundError as exc:
+ _raise_problem(status.HTTP_404_NOT_FOUND, str(exc))
+ except RetentionStateError as exc:
+ _raise_problem(status.HTTP_409_CONFLICT, str(exc))
+
+ @application.post("/packages/{package_id}/retention/holds/{hold_id}/release")
+ async def release_hold(
+ package_id: UUID,
+ hold_id: UUID,
+ body: RetentionHoldRelease,
+ actor: str = Depends(require_write_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> dict[str, Any]:
+ try:
+ state = await registry.release_retention_hold(
+ package_id,
+ hold_id,
+ reason=body.reason,
+ actor=actor,
+ )
+ return _retention_state_dict(state)
+ except PackageNotFoundError as exc:
+ _raise_problem(status.HTTP_404_NOT_FOUND, str(exc))
+ except RetentionStateError as exc:
+ _raise_problem(status.HTTP_409_CONFLICT, str(exc))
+
+ @application.get("/packages/{package_id}/retention/history")
+ async def retention_history(
+ package_id: UUID,
+ request: Request,
+ _actor: str = Depends(require_read_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> Response:
+ try:
+ history = await registry.retention_history(package_id)
+ except PackageNotFoundError as exc:
+ _raise_problem(status.HTTP_404_NOT_FOUND, str(exc))
+ payload = {"events": [_event_dict(event) for event in history]}
+ accept = request.headers.get("accept", "")
+ if "application/cbor" in accept:
+ return Response(
+ content=cbor2.dumps(payload, canonical=True),
+ media_type="application/cbor",
+ )
+ return Response(content=jcs.canonicalize(payload), media_type="application/json")
+
+ @application.get("/packages/{package_id}/manifest")
+ async def get_manifest_cbor(
+ package_id: UUID,
+ _actor: str = Depends(require_read_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> Response:
+ try:
+ payload = await registry.get_manifest_bytes(package_id, format="cbor")
+ except PackageNotFoundError as exc:
+ _raise_problem(status.HTTP_404_NOT_FOUND, str(exc))
+ return Response(content=payload, media_type="application/cbor")
+
+ @application.get("/packages/{package_id}/manifest.json")
+ async def get_manifest_json(
+ package_id: UUID,
+ _actor: str = Depends(require_read_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> Response:
+ try:
+ payload = await registry.get_manifest_bytes(package_id, format="json")
+ except PackageNotFoundError as exc:
+ _raise_problem(status.HTTP_404_NOT_FOUND, str(exc))
+ return Response(content=payload, media_type="application/json")
+
+ @application.get("/files/{file_id}")
+ async def get_file_metadata(
+ file_id: UUID,
+ _actor: str = Depends(require_read_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> dict[str, Any]:
+ try:
+ return _file_dict(await registry.get_file_metadata(file_id))
+ except FileNotFoundError as exc:
+ _raise_problem(status.HTTP_404_NOT_FOUND, str(exc))
+
+ @application.get("/files/{file_id}/download")
+ async def download_file(
+ file_id: UUID,
+ range_header: str | None = Header(default=None, alias="Range"),
+ if_none_match: str | None = Header(default=None, alias="If-None-Match"),
+ _actor: str = Depends(require_read_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> Response:
+ try:
+ file_record = await registry.get_file_metadata(file_id)
+ except FileNotFoundError as exc:
+ _raise_problem(status.HTTP_404_NOT_FOUND, str(exc))
+ etag = _etag(file_record.content_address)
+ headers = {
+ "Accept-Ranges": "bytes",
+ "ETag": etag,
+ "Content-Type": file_record.media_type,
+ }
+ if if_none_match and _etag_matches(if_none_match, file_record.content_address):
+ return Response(status_code=status.HTTP_304_NOT_MODIFIED, headers=headers)
+
+ byte_range = _parse_range(range_header, file_record.size_bytes)
+ status_code = status.HTTP_200_OK
+ if byte_range is None:
+ headers["Content-Length"] = str(file_record.size_bytes)
+ else:
+ start, end = byte_range
+ headers["Content-Range"] = f"bytes {start}-{end}/{file_record.size_bytes}"
+ headers["Content-Length"] = str(end - start + 1)
+ status_code = status.HTTP_206_PARTIAL_CONTENT
+
+ stream = await registry.get_file(file_id, byte_range=byte_range)
+ return StreamingResponse(stream, status_code=status_code, headers=headers)
+
+ @application.post("/uploads", status_code=status.HTTP_201_CREATED)
+ async def open_upload(
+ body: UploadCreate,
+ request: Request,
+ _actor: str = Depends(require_write_auth),
+ ) -> dict[str, Any]:
+ upload_id = uuid4()
+ session = UploadSession(
+ upload_id=upload_id,
+ buffer=bytearray(),
+ expected_size_bytes=body.expected_size_bytes,
+ media_type=body.media_type,
+ )
+ async with request.app.state.upload_lock:
+ request.app.state.upload_sessions[str(upload_id)] = session
+ return {
+ "upload_id": str(upload_id),
+ "status": session.status,
+ "content_upload_url": f"/uploads/{upload_id}",
+ }
+
+ @application.patch("/uploads/{upload_id}")
+ async def patch_upload(
+ upload_id: UUID,
+ request: Request,
+ content_range: str | None = Header(default=None, alias="Content-Range"),
+ _actor: str = Depends(require_write_auth),
+ ) -> dict[str, Any]:
+ if content_range is None:
+ _raise_problem(status.HTTP_400_BAD_REQUEST, "Content-Range header is required")
+ body = await request.body()
+ start, end, total = _parse_content_range(content_range)
+ if end - start + 1 != len(body):
+ _raise_problem(
+ status.HTTP_400_BAD_REQUEST,
+ "Content-Range length does not match request body length",
+ )
+ async with request.app.state.upload_lock:
+ session = _upload_session(request, upload_id)
+ if start != len(session.buffer):
+ _raise_problem(
+ status.HTTP_409_CONFLICT,
+ f"upload offset mismatch: expected {len(session.buffer)}, got {start}",
+ )
+ session.buffer.extend(body)
+ if total is not None and len(session.buffer) > total:
+ _raise_problem(status.HTTP_400_BAD_REQUEST, "upload exceeds declared total size")
+ session.status = (
+ "uploaded" if total is not None and len(session.buffer) == total else "open"
+ )
+ return {
+ "upload_id": str(upload_id),
+ "status": session.status,
+ "received_bytes": len(session.buffer),
+ "expected_size_bytes": total,
+ }
+
+ @application.post("/uploads/{upload_id}/complete", status_code=status.HTTP_201_CREATED)
+ async def complete_upload(
+ upload_id: UUID,
+ body: UploadComplete,
+ request: Request,
+ actor: str = Depends(require_write_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> dict[str, Any]:
+ async with request.app.state.upload_lock:
+ session = _upload_session(request, upload_id)
+ payload = bytes(session.buffer)
+ media_type = body.media_type or session.media_type or "application/octet-stream"
+ try:
+ file_id = await registry.ingest_file(
+ body.package_id,
+ relative_path=body.relative_path,
+ media_type=media_type,
+ stream=_bytes_stream(payload),
+ actor=actor,
+ )
+ except PackageNotFoundError as exc:
+ _raise_problem(status.HTTP_404_NOT_FOUND, str(exc))
+ except DuplicateRelativePathError as exc:
+ _raise_problem(status.HTTP_409_CONFLICT, str(exc))
+ except IllegalPackageStateError as exc:
+ _raise_problem(status.HTTP_409_CONFLICT, str(exc))
+ async with request.app.state.upload_lock:
+ request.app.state.upload_sessions.pop(str(upload_id), None)
+ return _file_dict(await registry.get_file_metadata(file_id))
+
+ @application.get("/events")
+ async def events(
+ request: Request,
+ since: int = Query(default=0, ge=0),
+ limit: int = Query(default=100, ge=1, le=_MAX_EVENT_LIMIT),
+ wait_seconds: float = Query(default=1.0, ge=0.0, le=30.0),
+ _actor: str = Depends(require_read_auth),
+ registry: Registry = Depends(get_registry),
+ ) -> Response:
+ batch = await _long_poll_events(
+ registry,
+ since_sequence=since,
+ limit=limit,
+ wait_seconds=wait_seconds,
+ )
+ payload = {"events": [_event_dict(e) for e in batch]}
+ accept = request.headers.get("accept", "")
+ if "application/json" in accept:
+ return Response(content=jcs.canonicalize(payload), media_type="application/json")
+ return Response(content=cbor2.dumps(payload, canonical=True), media_type="application/cbor")
+
return application
+def _require_bearer(settings: Settings, authorization: str | None) -> str:
+ tokens = settings.bearer_tokens
+ if not tokens:
+ _raise_problem(
+ status.HTTP_401_UNAUTHORIZED,
+ "No bearer tokens are configured",
+ headers={"WWW-Authenticate": "Bearer"},
+ )
+ if authorization is None or not authorization.lower().startswith("bearer "):
+ _raise_problem(
+ status.HTTP_401_UNAUTHORIZED,
+ "Bearer token required",
+ headers={"WWW-Authenticate": "Bearer"},
+ )
+ supplied = authorization.partition(" ")[2].strip()
+ if not any(secrets.compare_digest(supplied, token) for token in tokens):
+ _raise_problem(
+ status.HTTP_401_UNAUTHORIZED,
+ "Invalid bearer token",
+ headers={"WWW-Authenticate": "Bearer"},
+ )
+ return "bearer"
+
+
+def _raise_problem(
+ status_code: int,
+ detail: str,
+ *,
+ headers: Mapping[str, str] | None = None,
+) -> NoReturn:
+ raise HTTPException(status_code=status_code, detail=detail, headers=headers)
+
+
+def _problem_response(
+ *,
+ status_code: int,
+ title: str,
+ detail: str,
+ headers: Mapping[str, str] | None = None,
+) -> JSONResponse:
+ return JSONResponse(
+ status_code=status_code,
+ content={
+ "type": "about:blank",
+ "title": title,
+ "status": status_code,
+ "detail": detail,
+ },
+ media_type="application/problem+json",
+ headers=headers,
+ )
+
+
+def _status_phrase(status_code: int) -> str:
+ try:
+ return HTTPStatus(status_code).phrase
+ except ValueError:
+ return "HTTP error"
+
+
+def _package_dict(record: PackageRecord) -> dict[str, Any]:
+ return {
+ "id": str(record.id),
+ "name": record.name,
+ "producer": record.producer,
+ "subject": record.subject,
+ "retention_class": record.retention_class,
+ "metadata_schema_id": str(record.metadata_schema_id) if record.metadata_schema_id else None,
+ "metadata": record.metadata,
+ "status": record.status,
+ "manifest_digest": f"blake3:{record.manifest_digest_hex}"
+ if record.manifest_digest_hex
+ else None,
+ "created_at": _iso(record.created_at),
+ "finalized_at": _iso(record.finalized_at),
+ "expires_at": _iso(record.expires_at),
+ "last_event_sequence": record.last_event_sequence,
+ }
+
+
+def _file_dict(record: FileRecord) -> dict[str, Any]:
+ return {
+ "id": str(record.id),
+ "package_id": str(record.package_id),
+ "relative_path": record.relative_path,
+ "media_type": record.media_type,
+ "size_bytes": record.size_bytes,
+ "digest_algorithm": record.digest_algorithm,
+ "digest_primary_hex": record.digest_primary_hex,
+ "digest_sha256_hex": record.digest_sha256_hex,
+ "content_address": record.content_address,
+ "created_at": _iso(record.created_at),
+ "storage": {
+ "backend_id": record.backend_id,
+ "object_key": record.object_key,
+ "retrieval_tier": record.retrieval_tier,
+ "status": record.storage_status,
+ },
+ }
+
+
+def _retention_class_dict(record: RetentionClassRecord) -> dict[str, Any]:
+ return {
+ "class_id": record.class_id,
+ "default_duration_seconds": record.default_duration_seconds,
+ "deletion_strategy": record.deletion_strategy,
+ }
+
+
+def _retention_state_dict(record: RetentionStateRecord) -> dict[str, Any]:
+ return {
+ "package_id": str(record.package_id),
+ "current_expires_at": _iso(record.current_expires_at),
+ "effective_class": record.effective_class,
+ "active_hold_id": str(record.active_hold_id) if record.active_hold_id else None,
+ "eligible_for_deletion": record.eligible_for_deletion,
+ }
+
+
+def _event_dict(event: Event) -> dict[str, Any]:
+ payload = cbor2.loads(event.payload)
+ return {
+ "sequence": event.sequence,
+ "created_at": _iso(event.created_at),
+ "event_type": event.event_type,
+ "subject_kind": event.subject_kind,
+ "subject_id": str(event.subject_id) if event.subject_id else None,
+ "actor": event.actor,
+ "payload_digest_hex": event.payload_digest.hex(),
+ "payload": payload,
+ }
+
+
+def _iso(value: datetime | None) -> str | None:
+ if value is None:
+ return None
+ if value.tzinfo is None:
+ value = value.replace(tzinfo=UTC)
+ return value.isoformat()
+
+
+async def _read_single_file_multipart(request: Request) -> MultipartFile:
+ content_type = request.headers.get("content-type", "")
+ if not content_type.lower().startswith("multipart/form-data"):
+ _raise_problem(status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, "multipart/form-data is required")
+ raw = await request.body()
+ message = cast(
+ Any,
+ BytesParser(policy=policy.default).parsebytes(
+ f"Content-Type: {content_type}\r\nMIME-Version: 1.0\r\n\r\n".encode() + raw
+ ),
+ )
+ if not message.is_multipart():
+ _raise_problem(status.HTTP_400_BAD_REQUEST, "request body is not multipart")
+
+ fields: dict[str, str] = {}
+ chosen: tuple[str, str, bytes] | None = None
+ for part in message.iter_parts():
+ if part.get_content_disposition() != "form-data":
+ continue
+ name = cast(str | None, part.get_param("name", header="content-disposition"))
+ filename = part.get_filename()
+ payload = cast(bytes, part.get_payload(decode=True) or b"")
+ if filename is None:
+ if name is not None:
+ fields[name] = payload.decode(part.get_content_charset() or "utf-8")
+ continue
+ if chosen is None or name == "file":
+ chosen = (filename, part.get_content_type(), payload)
+
+ if chosen is None:
+ _raise_problem(status.HTTP_400_BAD_REQUEST, "multipart file part is required")
+ filename, part_content_type, payload = chosen
+ return MultipartFile(
+ filename=filename,
+ content_type=part_content_type or "application/octet-stream",
+ content=payload,
+ fields=fields,
+ )
+
+
+async def _bytes_stream(data: bytes) -> AsyncIterator[bytes]:
+ yield data
+
+
+def _etag(content_address: str) -> str:
+ return f'"{content_address}"'
+
+
+def _etag_matches(header_value: str, content_address: str) -> bool:
+ candidates = [v.strip() for v in header_value.split(",")]
+ return (
+ "*" in candidates
+ or content_address in candidates
+ or _etag(content_address) in candidates
+ )
+
+
+def _parse_range(header_value: str | None, size_bytes: int) -> tuple[int, int] | None:
+ if header_value is None:
+ return None
+ match = _RANGE_RE.match(header_value.strip())
+ if match is None:
+ _raise_problem(
+ status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE,
+ "only a single bytes range is supported",
+ )
+ start_raw = match.group("start")
+ end_raw = match.group("end")
+ if size_bytes <= 0:
+ _raise_problem(status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, "empty file has no range")
+ if start_raw == "":
+ if end_raw == "":
+ _raise_problem(status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, "invalid range")
+ suffix = int(end_raw)
+ if suffix <= 0:
+ _raise_problem(status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, "invalid suffix range")
+ start = max(size_bytes - suffix, 0)
+ end = size_bytes - 1
+ else:
+ start = int(start_raw)
+ end = int(end_raw) if end_raw else size_bytes - 1
+ if start >= size_bytes or end < start:
+ _raise_problem(status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, "range not satisfiable")
+ return start, min(end, size_bytes - 1)
+
+
+def _parse_content_range(header_value: str) -> tuple[int, int, int | None]:
+ match = _CONTENT_RANGE_RE.match(header_value.strip())
+ if match is None:
+ _raise_problem(status.HTTP_400_BAD_REQUEST, "invalid Content-Range header")
+ start = int(match.group("start"))
+ end = int(match.group("end"))
+ total_raw = match.group("total")
+ total = None if total_raw == "*" else int(total_raw)
+ if end < start:
+ _raise_problem(status.HTTP_400_BAD_REQUEST, "invalid Content-Range bounds")
+ if total is not None and end >= total:
+ _raise_problem(status.HTTP_400_BAD_REQUEST, "Content-Range exceeds total size")
+ return start, end, total
+
+
+def _upload_session(request: Request, upload_id: UUID) -> UploadSession:
+ session = request.app.state.upload_sessions.get(str(upload_id))
+ if session is None:
+ _raise_problem(status.HTTP_404_NOT_FOUND, f"upload not found: {upload_id}")
+ return session # type: ignore[no-any-return]
+
+
+async def _long_poll_events(
+ registry: Registry,
+ *,
+ since_sequence: int,
+ limit: int,
+ wait_seconds: float,
+) -> list[Event]:
+ batch = await registry.fetch_events(since_sequence=since_sequence, limit=limit)
+ if batch or wait_seconds == 0:
+ return batch
+
+ collected: list[Event] = []
+
+ async def collect() -> None:
+ async for event in registry.tail_events(
+ since_sequence=since_sequence,
+ poll_interval_seconds=0.05,
+ ):
+ collected.append(event)
+ if len(collected) >= limit:
+ break
+
+ with suppress(TimeoutError):
+ await asyncio.wait_for(collect(), timeout=wait_seconds)
+ return collected
+
+
app = create_app()
diff --git a/src/artifactstore/app.py b/src/artifactstore/app.py
index 27c11ad..6aeddb0 100644
--- a/src/artifactstore/app.py
+++ b/src/artifactstore/app.py
@@ -12,6 +12,7 @@ from artifactstore.dataplane import InProcessDataPlane
from artifactstore.db.engine import create_engine
from artifactstore.events import RegistryViewWriter
from artifactstore.registry import Registry
+from artifactstore.retention import RetentionPolicy
from artifactstore.storage import LocalBackend
__all__ = ["build_registry"]
@@ -24,4 +25,5 @@ def build_registry(settings: Settings | None = None) -> Registry:
backend = LocalBackend(effective.storage_local_root, backend_id="local")
dataplane = InProcessDataPlane(backend)
view_writer = RegistryViewWriter()
- return Registry(engine, dataplane, view_writer)
+ retention_policy = RetentionPolicy.from_toml(effective.retention_config_path)
+ return Registry(engine, dataplane, view_writer, retention_policy)
diff --git a/src/artifactstore/cli/__init__.py b/src/artifactstore/cli/__init__.py
index ce7d7b1..41097f3 100644
--- a/src/artifactstore/cli/__init__.py
+++ b/src/artifactstore/cli/__init__.py
@@ -10,9 +10,14 @@ from __future__ import annotations
import asyncio
import json
+import mimetypes
+import urllib.error
+import urllib.request
+import uuid
from pathlib import Path
from typing import Any
+import click
import typer
from artifactstore import __version__
@@ -28,6 +33,8 @@ app = typer.Typer(
help="artifact-store: artifact registry and storage gateway",
no_args_is_help=True,
)
+retention_app = typer.Typer(help="Retention lifecycle commands", no_args_is_help=True)
+app.add_typer(retention_app, name="retention")
@app.callback()
@@ -74,6 +81,107 @@ def health() -> None:
typer.echo(json.dumps(payload, indent=2))
+@app.command()
+def push(
+ directory: Path = typer.Argument(
+ ...,
+ exists=True,
+ file_okay=False,
+ dir_okay=True,
+ readable=True,
+ help="Directory to push as one artifact package.",
+ ),
+ producer: str = typer.Option(..., "--producer", help="Producer slug for the package."),
+ subject: str = typer.Option(..., "--subject", help="Subject identifier for the package."),
+ retention_class: str = typer.Option(
+ "raw-evidence",
+ "--retention-class",
+ help="Retention class id to apply.",
+ ),
+ name: str | None = typer.Option(None, "--name", help="Package name; defaults to dir name."),
+ api_url: str | None = typer.Option(None, "--api-url", help="artifact-store base URL."),
+ token: str | None = typer.Option(None, "--token", help="Bearer token for the HTTP API."),
+) -> None:
+ """Push a directory through the HTTP API and finalize the package."""
+ settings = get_settings()
+ base_url = api_url or settings.api_url
+ bearer = token or settings.api_token
+ if not bearer:
+ raise click.ClickException("provide --token or ARTIFACTSTORE_API_TOKEN")
+
+ files = sorted(path for path in directory.rglob("*") if path.is_file())
+ package = _http_json(
+ "POST",
+ base_url,
+ "/packages",
+ bearer,
+ {
+ "name": name or directory.name,
+ "producer": producer,
+ "subject": subject,
+ "retention_class": retention_class,
+ "metadata": {"source_directory": str(directory)},
+ },
+ )
+ package_id = str(package["id"])
+
+ for path in files:
+ rel_path = path.relative_to(directory).as_posix()
+ media_type = mimetypes.guess_type(path.name)[0] or "application/octet-stream"
+ _http_multipart(
+ base_url,
+ f"/packages/{package_id}/files",
+ bearer,
+ fields={"relative_path": rel_path, "media_type": media_type},
+ file_field="file",
+ file_name=path.name,
+ file_content_type=media_type,
+ file_bytes=path.read_bytes(),
+ )
+
+ finalized = _http_json("POST", base_url, f"/packages/{package_id}/finalize", bearer, {})
+ typer.echo(
+ json.dumps(
+ {
+ "package_id": package_id,
+ "manifest_digest": finalized.get("manifest_digest"),
+ "files": len(files),
+ },
+ indent=2,
+ )
+ )
+
+
+@app.command()
+def manifest(
+ package_id: str = typer.Argument(..., help="Package UUID."),
+ api_url: str | None = typer.Option(None, "--api-url", help="artifact-store base URL."),
+ token: str | None = typer.Option(None, "--token", help="Bearer token for the HTTP API."),
+) -> None:
+ """Print a package manifest JSON projection from the HTTP API."""
+ settings = get_settings()
+ base_url = api_url or settings.api_url
+ bearer = token or settings.api_token
+ if not bearer:
+ raise click.ClickException("provide --token or ARTIFACTSTORE_API_TOKEN")
+ payload = _http_bytes(
+ "GET",
+ base_url,
+ f"/packages/{package_id}/manifest.json",
+ bearer,
+ headers={"Accept": "application/json"},
+ )
+ typer.echo(payload.decode("utf-8"))
+
+
+@retention_app.command("sweep")
+def retention_sweep() -> None:
+ """Run the deletion-eligibility sweeper once against the configured DB."""
+ settings = get_settings()
+ marked = asyncio.run(_retention_sweep_async(settings))
+ typer.echo(json.dumps({"marked_package_ids": marked, "marked_count": len(marked)}, indent=2))
+
+
# ---- internals -------------------------------------------------------------
@@ -110,5 +218,121 @@ async def _health_async(settings: Settings) -> dict[str, Any]:
}
+async def _retention_sweep_async(settings: Settings) -> list[str]:
+ from artifactstore.app import build_registry
+
+ registry: Registry = build_registry(settings)
+ try:
+ marked = await registry.sweep_deletion_eligibility()
+ finally:
+ await registry.dispose()
+ return [str(package_id) for package_id in marked]
+
+
+def _http_json(
+ method: str,
+ base_url: str,
+ path: str,
+ token: str,
+ payload: dict[str, Any],
+) -> dict[str, Any]:
+ response = _http_bytes(
+ method,
+ base_url,
+ path,
+ token,
+ body=json.dumps(payload).encode("utf-8"),
+ headers={"Content-Type": "application/json", "Accept": "application/json"},
+ )
+ decoded = json.loads(response)
+ if not isinstance(decoded, dict):
+ raise click.ClickException(f"expected JSON object from {path}")
+ return decoded
+
+
+def _http_multipart(
+ base_url: str,
+ path: str,
+ token: str,
+ *,
+ fields: dict[str, str],
+ file_field: str,
+ file_name: str,
+ file_content_type: str,
+ file_bytes: bytes,
+) -> dict[str, Any]:
+ boundary = f"artifactstore-{uuid.uuid4().hex}"
+ body = bytearray()
+ for name, value in fields.items():
+ body.extend(f"--{boundary}\r\n".encode("ascii"))
+ body.extend(
+ f'Content-Disposition: form-data; name="{_quote_header_value(name)}"\r\n\r\n'.encode()
+ )
+ body.extend(value.encode())
+ body.extend(b"\r\n")
+ body.extend(f"--{boundary}\r\n".encode("ascii"))
+ body.extend(
+ (
+ f'Content-Disposition: form-data; name="{_quote_header_value(file_field)}"; '
+ f'filename="{_quote_header_value(file_name)}"\r\n'
+ f"Content-Type: {file_content_type}\r\n\r\n"
+ ).encode()
+ )
+ body.extend(file_bytes)
+ body.extend(b"\r\n")
+ body.extend(f"--{boundary}--\r\n".encode("ascii"))
+
+ response = _http_bytes(
+ "POST",
+ base_url,
+ path,
+ token,
+ body=bytes(body),
+ headers={
+ "Content-Type": f"multipart/form-data; boundary={boundary}",
+ "Accept": "application/json",
+ },
+ )
+ decoded = json.loads(response)
+ if not isinstance(decoded, dict):
+ raise click.ClickException(f"expected JSON object from {path}")
+ return decoded
+
+
+def _http_bytes(
+ method: str,
+ base_url: str,
+ path: str,
+ token: str,
+ *,
+ body: bytes | None = None,
+ headers: dict[str, str] | None = None,
+) -> bytes:
+ url = f"{base_url.rstrip('/')}/{path.lstrip('/')}"
+ effective_headers = dict(headers or {})
+ effective_headers["Authorization"] = f"Bearer {token}"
+ request = urllib.request.Request(
+ url,
+ data=body,
+ headers=effective_headers,
+ method=method,
+ )
+ try:
+ with urllib.request.urlopen(request, timeout=60) as response:
+ data = response.read()
+ if isinstance(data, bytes):
+ return data
+ return bytes(data)
+ except urllib.error.HTTPError as exc:
+ detail = exc.read().decode("utf-8", errors="replace")
+ raise click.ClickException(f"HTTP {exc.code} from {path}: {detail}") from exc
+ except urllib.error.URLError as exc:
+ raise click.ClickException(f"could not reach {url}: {exc.reason}") from exc
+
+
+def _quote_header_value(value: str) -> str:
+ return value.replace("\\", "\\\\").replace('"', '\\"')
+
+
if __name__ == "__main__": # pragma: no cover
app()
diff --git a/src/artifactstore/config.py b/src/artifactstore/config.py
index a209301..bca4d43 100644
--- a/src/artifactstore/config.py
+++ b/src/artifactstore/config.py
@@ -23,6 +23,21 @@ class Settings(BaseSettings):
database_url: str = "sqlite+aiosqlite:///./var/artifactstore.db"
storage_local_root: str = "./var/storage"
log_level: str = "INFO"
+ auth_tokens: str = ""
+ anon_read: bool = False
+ api_url: str = "http://127.0.0.1:8000"
+ api_token: str = ""
+ retention_config_path: str = ""
+ retention_sweep_interval_seconds: int = 3600
+
+ @property
+ def bearer_tokens(self) -> frozenset[str]:
+ """Configured shared-secret bearer tokens, parsed from CSV / newline text."""
+ return frozenset(
+ token.strip()
+ for token in self.auth_tokens.replace("\n", ",").split(",")
+ if token.strip()
+ )
def get_settings() -> Settings:
diff --git a/src/artifactstore/events/views.py b/src/artifactstore/events/views.py
index 31ca335..afecaac 100644
--- a/src/artifactstore/events/views.py
+++ b/src/artifactstore/events/views.py
@@ -19,6 +19,7 @@ workplans without changing this module's public surface.
from __future__ import annotations
+from datetime import UTC, datetime
from uuid import UUID
import cbor2
@@ -146,8 +147,125 @@ async def _apply_package_finalized(connection: AsyncConnection, event: Event) ->
)
+async def _apply_retention_default_applied(
+ connection: AsyncConnection,
+ event: Event,
+) -> None:
+ if event.subject_id is None:
+ raise ValueError("v1.retention.default_applied event must have subject_id")
+ payload = cbor2.loads(event.payload)
+ expires_at = _parse_iso(payload["expires_at"])
+ await connection.execute(
+ update(artifact_packages)
+ .where(artifact_packages.c.id == event.subject_id)
+ .values(
+ expires_at=expires_at,
+ last_event_sequence=event.sequence,
+ )
+ )
+ await connection.execute(
+ update(retention_state)
+ .where(retention_state.c.package_id == event.subject_id)
+ .values(
+ current_expires_at=expires_at,
+ effective_class=payload["retention_class"],
+ active_hold_id=None,
+ eligible_for_deletion=bool(payload.get("eligible_for_deletion", False)),
+ )
+ )
+
+
+async def _apply_retention_extended(connection: AsyncConnection, event: Event) -> None:
+ if event.subject_id is None:
+ raise ValueError("v1.retention.extended event must have subject_id")
+ payload = cbor2.loads(event.payload)
+ expires_at = _parse_iso(payload["new_expires_at"])
+ await connection.execute(
+ update(artifact_packages)
+ .where(artifact_packages.c.id == event.subject_id)
+ .values(
+ expires_at=expires_at,
+ last_event_sequence=event.sequence,
+ )
+ )
+ await connection.execute(
+ update(retention_state)
+ .where(retention_state.c.package_id == event.subject_id)
+ .values(
+ current_expires_at=expires_at,
+ eligible_for_deletion=False,
+ )
+ )
+
+
+async def _apply_retention_hold_applied(connection: AsyncConnection, event: Event) -> None:
+ if event.subject_id is None:
+ raise ValueError("v1.retention.hold_applied event must have subject_id")
+ payload = cbor2.loads(event.payload)
+ await connection.execute(
+ update(retention_state)
+ .where(retention_state.c.package_id == event.subject_id)
+ .values(
+ active_hold_id=UUID(payload["hold_id"]),
+ eligible_for_deletion=False,
+ )
+ )
+ await connection.execute(
+ update(artifact_packages)
+ .where(artifact_packages.c.id == event.subject_id)
+ .values(last_event_sequence=event.sequence)
+ )
+
+
+async def _apply_retention_hold_released(connection: AsyncConnection, event: Event) -> None:
+ if event.subject_id is None:
+ raise ValueError("v1.retention.hold_released event must have subject_id")
+ await connection.execute(
+ update(retention_state)
+ .where(retention_state.c.package_id == event.subject_id)
+ .values(active_hold_id=None)
+ )
+ await connection.execute(
+ update(artifact_packages)
+ .where(artifact_packages.c.id == event.subject_id)
+ .values(last_event_sequence=event.sequence)
+ )
+
+
+async def _apply_retention_deletion_eligible(
+ connection: AsyncConnection,
+ event: Event,
+) -> None:
+ if event.subject_id is None:
+ raise ValueError("v1.retention.deletion_eligible event must have subject_id")
+ await connection.execute(
+ update(retention_state)
+ .where(retention_state.c.package_id == event.subject_id)
+ .values(eligible_for_deletion=True)
+ )
+ await connection.execute(
+ update(artifact_packages)
+ .where(artifact_packages.c.id == event.subject_id)
+ .values(last_event_sequence=event.sequence)
+ )
+
+
+def _parse_iso(value: str | None) -> datetime | None:
+ if value is None:
+ return None
+ parsed = datetime.fromisoformat(value)
+ if parsed.tzinfo is None:
+ return parsed.replace(tzinfo=UTC)
+ return parsed.astimezone(UTC)
+
+
_HANDLERS = {
"v1.package.created": _apply_package_created,
"v1.file.ingested": _apply_file_ingested,
"v1.package.finalized": _apply_package_finalized,
+ "v1.retention.default_applied": _apply_retention_default_applied,
+ "v1.retention.extended": _apply_retention_extended,
+ "v1.retention.hold_applied": _apply_retention_hold_applied,
+ "v1.retention.hold_released": _apply_retention_hold_released,
+ "v1.retention.deletion_eligible": _apply_retention_deletion_eligible,
}
diff --git a/src/artifactstore/registry/__init__.py b/src/artifactstore/registry/__init__.py
index 9d04ef4..f3c359d 100644
--- a/src/artifactstore/registry/__init__.py
+++ b/src/artifactstore/registry/__init__.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import uuid
from collections.abc import AsyncIterator, Sequence
+from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
@@ -39,7 +40,7 @@ from artifactstore.db.schema import (
from artifactstore.db.schema import (
events as events_t,
)
-from artifactstore.events import RegistryViewWriter, make_event, tail, write
+from artifactstore.events import RegistryViewWriter, fetch_since, make_event, tail, write
from artifactstore.events.model import Event
from artifactstore.identity import ContentAddress
from artifactstore.manifest import (
@@ -61,9 +62,21 @@ from artifactstore.manifest import (
)
from artifactstore.manifest.codec import decode as manifest_decode
from artifactstore.manifest.projection import jcs_projection
+from artifactstore.retention import RetentionPolicy
from artifactstore.storage.spi import BackendStatus
-__all__ = ["Registry"]
+__all__ = [
+ "DuplicateRelativePathError",
+ "FileNotFoundError",
+ "FileRecord",
+ "IllegalPackageStateError",
+ "PackageNotFoundError",
+ "PackageRecord",
+ "Registry",
+ "RetentionClassRecord",
+ "RetentionStateError",
+ "RetentionStateRecord",
+]
class PackageNotFoundError(KeyError):
@@ -82,6 +95,78 @@ class DuplicateRelativePathError(ValueError):
"""Raised when ingest_file would create a duplicate ``relative_path`` in a package."""
+class RetentionStateError(ValueError):
+ """Raised when a retention lifecycle operation is invalid."""
+
+
+@dataclass(frozen=True, slots=True)
+class PackageRecord:
+ """Materialised package row projected into the registry API."""
+
+ id: UUID
+ name: str
+ producer: str
+ subject: str
+ retention_class: str
+ metadata: dict[str, Any]
+ status: str
+ manifest_digest_hex: str | None
+ created_at: datetime | None
+ finalized_at: datetime | None
+ expires_at: datetime | None
+ last_event_sequence: int
+ metadata_schema_id: UUID | None = None
+
+
+@dataclass(frozen=True, slots=True)
+class FileRecord:
+ """Materialised file row plus its primary storage location."""
+
+ id: UUID
+ package_id: UUID
+ relative_path: str
+ media_type: str
+ size_bytes: int
+ digest_algorithm: str
+ digest_primary_hex: str
+ digest_sha256_hex: str
+ created_at: datetime | None
+ backend_id: str
+ content_address: str
+ object_key: str
+ retrieval_tier: str
+ storage_status: str
+
+
+@dataclass(frozen=True, slots=True)
+class RetentionClassRecord:
+ """Configured retention-class row."""
+
+ class_id: str
+ default_duration_seconds: int | None
+ deletion_strategy: str
+
+
+@dataclass(frozen=True, slots=True)
+class RetentionStateRecord:
+ """Materialised retention state for one package."""
+
+ package_id: UUID
+ current_expires_at: datetime | None
+ effective_class: str
+ active_hold_id: UUID | None
+ eligible_for_deletion: bool
+
+
+_RETENTION_EVENT_TYPES = (
+ "v1.retention.default_applied",
+ "v1.retention.extended",
+ "v1.retention.hold_applied",
+ "v1.retention.hold_released",
+ "v1.retention.deletion_eligible",
+)
+
+
class Registry:
"""Library-shaped orchestrator over events, dataplane, and views."""
@@ -90,10 +175,12 @@ class Registry:
engine: AsyncEngine,
dataplane: DataPlane,
view_writer: RegistryViewWriter | None = None,
+ retention_policy: RetentionPolicy | None = None,
) -> None:
self._engine = engine
self._dataplane = dataplane
self._view_writer = view_writer or RegistryViewWriter()
+ self._retention_policy = retention_policy or RetentionPolicy()
# ---- mutating operations ------------------------------------------------
@@ -108,7 +195,7 @@ class Registry:
metadata: dict[str, Any] | None = None,
) -> UUID:
"""Create a new package; returns its ``UUID``."""
- await self._validate_retention_class(retention_class)
+ retention_class_row = await self._get_retention_class(retention_class)
package_id = uuid.uuid4()
payload = cbor2.dumps(
{
@@ -130,6 +217,31 @@ class Registry:
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
+ if written.created_at is None:
+ raise RuntimeError("created package event missing created_at")
+ decision = self._retention_policy.default_for(
+ retention_class=retention_class,
+ class_default_seconds=retention_class_row.default_duration_seconds,
+ base_time=_ensure_aware(written.created_at),
+ )
+ default_payload = cbor2.dumps(
+ {
+ "retention_class": decision.retention_class,
+ "default_duration_seconds": decision.default_duration_seconds,
+ "expires_at": _iso(decision.expires_at),
+ "eligible_for_deletion": decision.eligible_for_deletion,
+ },
+ canonical=True,
+ )
+ default_event = make_event(
+ event_type="v1.retention.default_applied",
+ subject_kind="retention",
+ subject_id=package_id,
+ actor=actor,
+ payload=default_payload,
+ )
+ written_default = await write(conn, default_event)
+ await self._view_writer.apply(conn, written_default)
return package_id
async def ingest_file(
@@ -265,6 +377,320 @@ class Registry:
# ---- read-only operations ----------------------------------------------
+ async def list_packages(
+ self,
+ *,
+ producer: str | None = None,
+ subject: str | None = None,
+ retention_class: str | None = None,
+ metadata_key: str | None = None,
+ metadata_value: str | None = None,
+ ) -> list[PackageRecord]:
+ """List package materialised views, optionally filtered for producer
+ workflows. Metadata key filtering is performed after the portable SQL
+ filters so the same behavior works on SQLite and PostgreSQL."""
+ stmt = select(artifact_packages).order_by(
+ artifact_packages.c.created_at,
+ artifact_packages.c.id,
+ )
+ if producer is not None:
+ stmt = stmt.where(artifact_packages.c.producer == producer)
+ if subject is not None:
+ stmt = stmt.where(artifact_packages.c.subject == subject)
+ if retention_class is not None:
+ stmt = stmt.where(artifact_packages.c.retention_class == retention_class)
+ async with self._engine.connect() as conn:
+ rows = (await conn.execute(stmt)).all()
+
+ records = [_package_record_from_row(r) for r in rows]
+ if metadata_key is None:
+ return records
+ filtered: list[PackageRecord] = []
+ for record in records:
+ if metadata_key not in record.metadata:
+ continue
+ if metadata_value is not None and str(record.metadata[metadata_key]) != metadata_value:
+ continue
+ filtered.append(record)
+ return filtered
+
+ async def get_package(self, package_id: UUID) -> PackageRecord:
+ """Return one package materialised view."""
+ async with self._engine.connect() as conn:
+ row = (
+ await conn.execute(
+ select(artifact_packages).where(artifact_packages.c.id == package_id)
+ )
+ ).first()
+ if row is None:
+ raise PackageNotFoundError(f"package not found: {package_id}")
+ return _package_record_from_row(row)
+
+ async def get_file_metadata(self, file_id: UUID) -> FileRecord:
+ """Return file metadata plus the primary storage location."""
+ stmt = (
+ select(
+ artifact_files.c.id.label("file_id"),
+ artifact_files.c.package_id,
+ artifact_files.c.relative_path,
+ artifact_files.c.media_type,
+ artifact_files.c.size_bytes,
+ artifact_files.c.digest_algorithm,
+ artifact_files.c.digest_primary,
+ artifact_files.c.digest_sha256,
+ artifact_files.c.created_at,
+ storage_locations.c.backend_id,
+ storage_locations.c.content_address,
+ storage_locations.c.object_key,
+ storage_locations.c.retrieval_tier,
+ storage_locations.c.status.label("storage_status"),
+ )
+ .join(
+ storage_locations,
+ storage_locations.c.artifact_file_id == artifact_files.c.id,
+ )
+ .where(artifact_files.c.id == file_id)
+ .limit(1)
+ )
+ async with self._engine.connect() as conn:
+ row = (await conn.execute(stmt)).first()
+ if row is None:
+ raise FileNotFoundError(f"file not found: {file_id}")
+ return FileRecord(
+ id=row.file_id,
+ package_id=row.package_id,
+ relative_path=row.relative_path,
+ media_type=row.media_type,
+ size_bytes=row.size_bytes,
+ digest_algorithm=row.digest_algorithm,
+ digest_primary_hex=row.digest_primary.hex(),
+ digest_sha256_hex=row.digest_sha256.hex(),
+ created_at=row.created_at,
+ backend_id=row.backend_id,
+ content_address=row.content_address,
+ object_key=row.object_key,
+ retrieval_tier=row.retrieval_tier,
+ storage_status=row.storage_status,
+ )
+
+ async def list_retention_classes(self) -> list[RetentionClassRecord]:
+ """Return configured retention classes."""
+ async with self._engine.connect() as conn:
+ rows = (
+ await conn.execute(
+ select(retention_classes).order_by(retention_classes.c.class_id)
+ )
+ ).all()
+ return [
+ RetentionClassRecord(
+ class_id=r.class_id,
+ default_duration_seconds=r.default_duration_seconds,
+ deletion_strategy=r.deletion_strategy,
+ )
+ for r in rows
+ ]
+
+ async def get_retention_state(self, package_id: UUID) -> RetentionStateRecord:
+ """Return the retention materialised view for one package."""
+ async with self._engine.connect() as conn:
+ row = (
+ await conn.execute(
+ select(retention_state).where(retention_state.c.package_id == package_id)
+ )
+ ).first()
+ if row is None:
+ raise PackageNotFoundError(f"package not found: {package_id}")
+ return _retention_state_record_from_row(row)
+
+ async def extend_retention(
+ self,
+ package_id: UUID,
+ *,
+ new_expires_at: datetime,
+ reason: str,
+ actor: str,
+ ) -> RetentionStateRecord:
+ """Extend package retention to a strictly later expiry."""
+ clean_reason = _require_reason(reason)
+ target_expires_at = _ensure_aware(new_expires_at)
+ current = await self.get_retention_state(package_id)
+ if current.current_expires_at is None:
+ raise RetentionStateError(
+ f"package {package_id} has no current expiry to extend"
+ )
+ current_expires_at = _ensure_aware(current.current_expires_at)
+ if target_expires_at <= current_expires_at:
+ raise RetentionStateError("new_expires_at must be strictly later than current expiry")
+
+ payload = cbor2.dumps(
+ {
+ "previous_expires_at": _iso(current_expires_at),
+ "new_expires_at": _iso(target_expires_at),
+ "reason": clean_reason,
+ },
+ canonical=True,
+ )
+ event = make_event(
+ event_type="v1.retention.extended",
+ subject_kind="retention",
+ subject_id=package_id,
+ actor=actor,
+ payload=payload,
+ )
+ async with self._engine.begin() as conn:
+ written = await write(conn, event)
+ await self._view_writer.apply(conn, written)
+ return await self.get_retention_state(package_id)
+
+ async def apply_retention_hold(
+ self,
+ package_id: UUID,
+ *,
+ reason: str,
+ actor: str,
+ ) -> UUID:
+ """Apply one active hold to a package and return the hold id."""
+ clean_reason = _require_reason(reason)
+ current = await self.get_retention_state(package_id)
+ if current.active_hold_id is not None:
+ raise RetentionStateError(
+ f"package {package_id} already has active hold {current.active_hold_id}"
+ )
+ hold_id = uuid.uuid4()
+ payload = cbor2.dumps(
+ {
+ "hold_id": str(hold_id),
+ "reason": clean_reason,
+ },
+ canonical=True,
+ )
+ event = make_event(
+ event_type="v1.retention.hold_applied",
+ subject_kind="retention",
+ subject_id=package_id,
+ actor=actor,
+ payload=payload,
+ )
+ async with self._engine.begin() as conn:
+ written = await write(conn, event)
+ await self._view_writer.apply(conn, written)
+ return hold_id
+
+ async def release_retention_hold(
+ self,
+ package_id: UUID,
+ hold_id: UUID,
+ *,
+ reason: str,
+ actor: str,
+ now: datetime | None = None,
+ ) -> RetentionStateRecord:
+ """Release the active hold, emitting eligibility if the package is expired."""
+ clean_reason = _require_reason(reason)
+ current = await self.get_retention_state(package_id)
+ if current.active_hold_id != hold_id:
+ raise RetentionStateError(f"hold {hold_id} is not active on package {package_id}")
+
+ payload = cbor2.dumps(
+ {
+ "hold_id": str(hold_id),
+ "reason": clean_reason,
+ },
+ canonical=True,
+ )
+ release_event = make_event(
+ event_type="v1.retention.hold_released",
+ subject_kind="retention",
+ subject_id=package_id,
+ actor=actor,
+ payload=payload,
+ )
+ effective_now = _ensure_aware(now or datetime.now(UTC))
+ expired_after_release = (
+ current.current_expires_at is not None
+ and _ensure_aware(current.current_expires_at) <= effective_now
+ and not current.eligible_for_deletion
+ )
+ async with self._engine.begin() as conn:
+ written_release = await write(conn, release_event)
+ await self._view_writer.apply(conn, written_release)
+ if expired_after_release:
+ eligible_event = make_event(
+ event_type="v1.retention.deletion_eligible",
+ subject_kind="retention",
+ subject_id=package_id,
+ actor=actor,
+ payload=_deletion_eligible_payload(
+ expires_at=current.current_expires_at,
+ reason="hold released after expiry",
+ ),
+ )
+ written_eligible = await write(conn, eligible_event)
+ await self._view_writer.apply(conn, written_eligible)
+ return await self.get_retention_state(package_id)
+
+ async def sweep_deletion_eligibility(
+ self,
+ *,
+ now: datetime | None = None,
+ actor: str = "retention-sweeper",
+ ) -> list[UUID]:
+ """Mark expired, unheld packages as eligible for deletion."""
+ effective_now = _ensure_aware(now or datetime.now(UTC))
+ marked: list[UUID] = []
+ async with self._engine.begin() as conn:
+ rows = (await conn.execute(select(retention_state))).all()
+ for row in rows:
+ record = _retention_state_record_from_row(row)
+ if record.eligible_for_deletion or record.active_hold_id is not None:
+ continue
+ if record.current_expires_at is None:
+ continue
+ if _ensure_aware(record.current_expires_at) > effective_now:
+ continue
+ event = make_event(
+ event_type="v1.retention.deletion_eligible",
+ subject_kind="retention",
+ subject_id=record.package_id,
+ actor=actor,
+ payload=_deletion_eligible_payload(
+ expires_at=record.current_expires_at,
+ reason="retention expiry reached",
+ ),
+ )
+ written = await write(conn, event)
+ await self._view_writer.apply(conn, written)
+ marked.append(record.package_id)
+ return marked
+
+ async def retention_history(self, package_id: UUID) -> list[Event]:
+ """Return all retention events for a package, ordered by sequence."""
+ await self.get_package(package_id)
+ async with self._engine.connect() as conn:
+ rows = (
+ await conn.execute(
+ select(events_t)
+ .where(
+ events_t.c.subject_id == package_id,
+ events_t.c.event_type.in_(_RETENTION_EVENT_TYPES),
+ )
+ .order_by(events_t.c.sequence)
+ )
+ ).all()
+ return [
+ Event(
+ event_type=r.event_type,
+ subject_kind=r.subject_kind,
+ subject_id=r.subject_id,
+ actor=r.actor,
+ payload=r.payload,
+ payload_digest=r.payload_digest,
+ sequence=r.sequence,
+ created_at=r.created_at,
+ )
+ for r in rows
+ ]
+
async def get_manifest_bytes(self, package_id: UUID, *, format: str = "cbor") -> bytes:
"""Return the finalised manifest. ``format`` is ``cbor`` (canonical
CBOR, the wire form) or ``json`` (the JCS projection)."""
@@ -289,20 +715,27 @@ class Registry:
return jcs_projection(manifest_decode(payload_bytes))
raise ValueError(f"unknown manifest format: {format!r} (expected 'cbor' or 'json')")
- async def get_file(self, file_id: UUID) -> AsyncIterator[bytes]:
+ async def get_file(
+ self,
+ file_id: UUID,
+ *,
+ byte_range: tuple[int, int] | None = None,
+ ) -> AsyncIterator[bytes]:
"""Return an async byte iterator for the bytes of a stored file."""
+ record = await self.get_file_metadata(file_id)
+ ca = ContentAddress(record.content_address)
+ return await self._dataplane.serve_object(ca, byte_range=byte_range)
+
+ async def fetch_events(
+ self,
+ *,
+ since_sequence: int = 0,
+ limit: int = 100,
+ ) -> list[Event]:
+ """Fetch one ordered batch of events with sequence greater than
+ ``since_sequence``."""
async with self._engine.connect() as conn:
- row = (
- await conn.execute(
- select(storage_locations)
- .where(storage_locations.c.artifact_file_id == file_id)
- .limit(1)
- )
- ).first()
- if row is None:
- raise FileNotFoundError(f"file not found: {file_id}")
- ca = ContentAddress(row.content_address)
- return await self._dataplane.serve_object(ca)
+ return await fetch_since(conn, since_sequence=since_sequence, limit=limit)
def tail_events(
self,
@@ -342,25 +775,83 @@ class Registry:
# ---- internals ----------------------------------------------------------
- async def _validate_retention_class(self, retention_class: str) -> None:
+ async def _get_retention_class(self, retention_class: str) -> RetentionClassRecord:
async with self._engine.connect() as conn:
row = (
await conn.execute(
- select(retention_classes.c.class_id).where(
- retention_classes.c.class_id == retention_class
- )
+ select(retention_classes).where(retention_classes.c.class_id == retention_class)
)
).first()
if row is None:
raise ValueError(f"unknown retention class: {retention_class!r}")
+ return RetentionClassRecord(
+ class_id=row.class_id,
+ default_duration_seconds=row.default_duration_seconds,
+ deletion_strategy=row.deletion_strategy,
+ )
def _iso(value: datetime | None) -> str | None:
if value is None:
return None
+ return _ensure_aware(value).isoformat()
+
+
+def _ensure_aware(value: datetime) -> datetime:
if value.tzinfo is None:
- value = value.replace(tzinfo=UTC)
- return value.isoformat()
+ return value.replace(tzinfo=UTC)
+ return value.astimezone(UTC)
+
+
+def _parse_iso_datetime(value: str | None) -> datetime | None:
+ if value is None:
+ return None
+ return _ensure_aware(datetime.fromisoformat(value))
+
+
+def _require_reason(reason: str) -> str:
+ clean = reason.strip()
+ if not clean:
+ raise RetentionStateError("reason is required")
+ return clean
+
+
+def _deletion_eligible_payload(*, expires_at: datetime | None, reason: str) -> bytes:
+ return cbor2.dumps(
+ {
+ "expires_at": _iso(expires_at),
+ "reason": reason,
+ },
+ canonical=True,
+ )
+
+
+def _package_record_from_row(row: Any) -> PackageRecord:
+ return PackageRecord(
+ id=row.id,
+ name=row.name,
+ producer=row.producer,
+ subject=row.subject,
+ retention_class=row.retention_class,
+ metadata=dict(row.metadata) if row.metadata else {},
+ status=row.status,
+ manifest_digest_hex=row.manifest_digest.hex() if row.manifest_digest else None,
+ created_at=row.created_at,
+ finalized_at=row.finalized_at,
+ expires_at=row.expires_at,
+ last_event_sequence=row.last_event_sequence,
+ metadata_schema_id=row.metadata_schema_id,
+ )
+
+
+def _retention_state_record_from_row(row: Any) -> RetentionStateRecord:
+ return RetentionStateRecord(
+ package_id=row.package_id,
+ current_expires_at=row.current_expires_at,
+ effective_class=row.effective_class,
+ active_hold_id=row.active_hold_id,
+ eligible_for_deletion=bool(row.eligible_for_deletion),
+ )
def _build_manifest(
diff --git a/src/artifactstore/retention/__init__.py b/src/artifactstore/retention/__init__.py
index 06b2abc..355723c 100644
--- a/src/artifactstore/retention/__init__.py
+++ b/src/artifactstore/retention/__init__.py
@@ -1,5 +1,85 @@
-"""Retention policy engine.
+"""Retention policy engine."""
-Seed classes land in ARTIFACT-STORE-WP-0001-T002 (data model). Active policy
-operations (extensions, holds, sweeper) land in workplan WP-0003.
-"""
+from __future__ import annotations
+
+import tomllib
+from collections.abc import Mapping
+from dataclasses import dataclass
+from datetime import datetime, timedelta
+from pathlib import Path
+from typing import Any
+
+__all__ = ["RetentionDecision", "RetentionPolicy"]
+
+
+@dataclass(frozen=True, slots=True)
+class RetentionDecision:
+ """Computed default retention outcome for a package."""
+
+ retention_class: str
+ default_duration_seconds: int | None
+ expires_at: datetime | None
+ eligible_for_deletion: bool = False
+
+
+class RetentionPolicy:
+ """Applies operator-configured default retention durations.
+
+ The database stores seed defaults for each retention class. Operators may
+ override those defaults through a TOML file whose shape is documented in
+ ``docs/OPERATOR.md``.
+ """
+
+ def __init__(self, overrides: Mapping[str, int | None] | None = None) -> None:
+ self._overrides = dict(overrides or {})
+
+ @classmethod
+ def from_toml(cls, path: str | Path | None) -> RetentionPolicy:
+ if path is None or str(path).strip() == "":
+ return cls()
+ config_path = Path(path)
+ with config_path.open("rb") as fh:
+ raw = tomllib.load(fh)
+ return cls(_parse_duration_overrides(raw))
+
+ def default_for(
+ self,
+ *,
+ retention_class: str,
+ class_default_seconds: int | None,
+ base_time: datetime,
+ ) -> RetentionDecision:
+ duration_seconds = self._overrides.get(retention_class, class_default_seconds)
+ expires_at = (
+ None
+ if duration_seconds is None
+ else base_time + timedelta(seconds=duration_seconds)
+ )
+ return RetentionDecision(
+ retention_class=retention_class,
+ default_duration_seconds=duration_seconds,
+ expires_at=expires_at,
+ eligible_for_deletion=False,
+ )
+
+
+def _parse_duration_overrides(raw: Mapping[str, Any]) -> dict[str, int | None]:
+ section = raw.get("retention_classes", {})
+ if not isinstance(section, dict):
+ raise ValueError("retention_classes must be a TOML table")
+ overrides: dict[str, int | None] = {}
+ for class_id, value in section.items():
+ duration = value.get("default_duration_seconds") if isinstance(value, dict) else value
+ if duration is None:
+ overrides[class_id] = None
+ continue
+ if not isinstance(duration, int):
+ raise ValueError(
+ f"retention_classes.{class_id}.default_duration_seconds must be an integer"
+ )
+ if duration < 0:
+ raise ValueError(
+ f"retention_classes.{class_id}.default_duration_seconds must be non-negative"
+ )
+ overrides[class_id] = duration
+ return overrides
diff --git a/tests/integration/test_cli_commands.py b/tests/integration/test_cli_commands.py
index ebf01d4..1922069 100644
--- a/tests/integration/test_cli_commands.py
+++ b/tests/integration/test_cli_commands.py
@@ -2,8 +2,10 @@
from __future__ import annotations
+import asyncio
import json
from pathlib import Path
+from typing import Any
import pytest
from sqlalchemy import create_engine, insert, inspect
@@ -84,3 +86,169 @@ def test_cli_health_reports_ok(
assert payload["status"] == "ok"
assert payload["db"]["healthy"] is True
assert payload["backend"]["healthy"] is True
+
+
+def test_cli_push_uses_http_api(
+ runner: CliRunner,
+ tmp_path: Path,
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ source = tmp_path / "source"
+ source.mkdir()
+ (source / "a.txt").write_text("alpha", encoding="utf-8")
+ calls: list[tuple[str, str, dict[str, Any]]] = []
+ multipart_calls: list[tuple[str, dict[str, str], bytes]] = []
+
+ def fake_http_json(
+ method: str,
+ base_url: str,
+ path: str,
+ token: str,
+ payload: dict[str, Any],
+ ) -> dict[str, Any]:
+ calls.append((method, path, payload))
+ assert base_url == "http://api.test"
+ assert token == "secret"
+ if path == "/packages":
+ return {"id": "pkg-1"}
+ if path == "/packages/pkg-1/finalize":
+ return {"manifest_digest": "blake3:abc"}
+ raise AssertionError(f"unexpected JSON request: {method} {path}")
+
+ def fake_http_multipart(
+ base_url: str,
+ path: str,
+ token: str,
+ *,
+ fields: dict[str, str],
+ file_field: str,
+ file_name: str,
+ file_content_type: str,
+ file_bytes: bytes,
+ ) -> dict[str, Any]:
+ assert base_url == "http://api.test"
+ assert token == "secret"
+ assert path == "/packages/pkg-1/files"
+ assert file_field == "file"
+ assert file_name == "a.txt"
+ assert file_content_type == "text/plain"
+ multipart_calls.append((path, fields, file_bytes))
+ return {"id": "file-1"}
+
+ monkeypatch.setattr("artifactstore.cli._http_json", fake_http_json)
+ monkeypatch.setattr("artifactstore.cli._http_multipart", fake_http_multipart)
+
+ result = runner.invoke(
+ cli_app,
+ [
+ "push",
+ str(source),
+ "--producer",
+ "prod",
+ "--subject",
+ "sub",
+ "--api-url",
+ "http://api.test",
+ "--token",
+ "secret",
+ ],
+ )
+
+ assert result.exit_code == 0, result.output
+ assert json.loads(result.output) == {
+ "package_id": "pkg-1",
+ "manifest_digest": "blake3:abc",
+ "files": 1,
+ }
+ assert calls[0][1] == "/packages"
+ assert calls[1][1] == "/packages/pkg-1/finalize"
+ assert multipart_calls == [
+ (
+ "/packages/pkg-1/files",
+ {"relative_path": "a.txt", "media_type": "text/plain"},
+ b"alpha",
+ )
+ ]
+
+
+def test_cli_manifest_fetches_json_projection(
+ runner: CliRunner,
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ def fake_http_bytes(
+ method: str,
+ base_url: str,
+ path: str,
+ token: str,
+ *,
+ body: bytes | None = None,
+ headers: dict[str, str] | None = None,
+ ) -> bytes:
+ assert method == "GET"
+ assert base_url == "http://api.test"
+ assert path == "/packages/pkg-1/manifest.json"
+ assert token == "secret"
+ assert body is None
+ assert headers == {"Accept": "application/json"}
+ return b'{"manifest_version":1}'
+
+ monkeypatch.setattr("artifactstore.cli._http_bytes", fake_http_bytes)
+
+ result = runner.invoke(
+ cli_app,
+ [
+ "manifest",
+ "pkg-1",
+ "--api-url",
+ "http://api.test",
+ "--token",
+ "secret",
+ ],
+ )
+
+ assert result.exit_code == 0, result.output
+ assert json.loads(result.output) == {"manifest_version": 1}
+
+
+def test_cli_retention_sweep_marks_expired_package(
+ runner: CliRunner,
+ env_db: Path,
+ tmp_path: Path,
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ sync_engine = create_engine(f"sqlite:///{env_db}", future=True)
+ metadata.create_all(sync_engine)
+ with sync_engine.begin() as conn:
+ conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
+ sync_engine.dispose()
+
+ retention_config = tmp_path / "retention.toml"
+ retention_config.write_text(
+ '[retention_classes.transient]\ndefault_duration_seconds = 0\n',
+ encoding="utf-8",
+ )
+ monkeypatch.setenv("ARTIFACTSTORE_RETENTION_CONFIG_PATH", str(retention_config))
+
+ async def create_expired_package() -> str:
+ from artifactstore.app import build_registry
+ from artifactstore.config import get_settings
+
+ registry = build_registry(get_settings())
+ try:
+ package_id = await registry.create_package(
+ name="expired",
+ producer="tests",
+ subject="cli-sweep",
+ retention_class="transient",
+ actor="ops",
+ )
+ finally:
+ await registry.dispose()
+ return str(package_id)
+
+ package_id = asyncio.run(create_expired_package())
+ result = runner.invoke(cli_app, ["retention", "sweep"])
+
+ assert result.exit_code == 0, result.output
+ payload = json.loads(result.output)
+ assert payload == {"marked_package_ids": [package_id], "marked_count": 1}
diff --git a/tests/integration/test_http_api.py b/tests/integration/test_http_api.py
new file mode 100644
index 0000000..6e387e1
--- /dev/null
+++ b/tests/integration/test_http_api.py
@@ -0,0 +1,240 @@
+"""HTTP API integration tests for ARTIFACT-STORE-WP-0002."""
+
+from __future__ import annotations
+
+import tempfile
+from pathlib import Path
+from typing import Any
+
+import cbor2
+from fastapi.testclient import TestClient
+from hypothesis import HealthCheck, given
+from hypothesis import settings as hypothesis_settings
+from hypothesis import strategies as st
+from sqlalchemy import create_engine, insert
+
+from artifactstore.api.http import create_app
+from artifactstore.config import Settings
+from artifactstore.db.schema import metadata, retention_classes
+from artifactstore.db.seed import RETENTION_CLASS_SEEDS
+from artifactstore.identity import digest_bytes
+
+AUTH = {"Authorization": "Bearer test-token"}
+
+
+def _settings(root: Path) -> Settings:
+ db_path = root / "http-api.db"
+ storage_root = root / "storage"
+ storage_root.mkdir(parents=True, exist_ok=True)
+ sync_engine = create_engine(f"sqlite:///{db_path}", future=True)
+ metadata.create_all(sync_engine)
+ with sync_engine.begin() as conn:
+ conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
+ sync_engine.dispose()
+ return Settings(
+ database_url=f"sqlite+aiosqlite:///{db_path}",
+ storage_local_root=str(storage_root),
+ log_level="INFO",
+ auth_tokens="test-token",
+ )
+
+
+def _create_package(client: TestClient, *, name: str = "pkg") -> str:
+ resp = client.post(
+ "/packages",
+ headers=AUTH,
+ json={
+ "name": name,
+ "producer": "guide-board",
+ "subject": "run-42",
+ "retention_class": "raw-evidence",
+ "metadata": {"run_id": "r-42", "kind": "integration"},
+ },
+ )
+ assert resp.status_code == 201, resp.text
+ return str(resp.json()["id"])
+
+
+def _upload_file(client: TestClient, package_id: str, rel_path: str, data: bytes) -> dict[str, Any]:
+ resp = client.post(
+ f"/packages/{package_id}/files",
+ headers=AUTH,
+ data={"relative_path": rel_path, "media_type": "application/octet-stream"},
+ files={"file": (Path(rel_path).name, data, "application/octet-stream")},
+ )
+ assert resp.status_code == 201, resp.text
+ return dict(resp.json())
+
+
+def test_http_surface_ingest_finalize_download_and_events(tmp_path: Path) -> None:
+ app = create_app(_settings(tmp_path))
+ with TestClient(app) as client:
+ unauth = client.get("/packages")
+ assert unauth.status_code == 401
+ assert unauth.headers["content-type"].startswith("application/problem+json")
+
+ assert client.get("/openapi.json").status_code == 200
+ assert client.get("/docs").status_code == 200
+ assert client.get("/backends", headers=AUTH).json()["backends"][0]["backend_id"] == "local"
+ assert client.get("/retention-classes", headers=AUTH).json()["retention_classes"]
+
+ package_id = _create_package(client)
+ listing = client.get(
+ "/packages",
+ headers=AUTH,
+ params={
+ "producer": "guide-board",
+ "subject": "run-42",
+ "retention_class": "raw-evidence",
+ "metadata_key": "run_id",
+ "metadata_value": "r-42",
+ },
+ )
+ assert listing.status_code == 200
+ assert [p["id"] for p in listing.json()["packages"]] == [package_id]
+
+ data = b"hello artifact-store http api" * 64
+ file_record = _upload_file(client, package_id, "reports/hello.bin", data)
+ assert file_record["size_bytes"] == len(data)
+ assert file_record["digest_primary_hex"] == digest_bytes(data).primary.hex
+
+ finalized = client.post(f"/packages/{package_id}/finalize", headers=AUTH)
+ assert finalized.status_code == 200, finalized.text
+ assert finalized.json()["status"] == "finalized"
+ assert finalized.json()["manifest_digest"].startswith("blake3:")
+
+ manifest_cbor = client.get(
+ f"/packages/{package_id}/manifest",
+ headers={**AUTH, "Accept": "application/cbor"},
+ )
+ assert manifest_cbor.status_code == 200
+ manifest_payload = cbor2.loads(manifest_cbor.content)
+ assert manifest_payload["manifest_version"] == 1
+ assert manifest_payload["package"]["id"] == package_id
+
+ manifest_json = client.get(f"/packages/{package_id}/manifest.json", headers=AUTH)
+ assert manifest_json.status_code == 200
+ assert manifest_json.json()["files"][0]["relative_path"] == "reports/hello.bin"
+
+ file_id = file_record["id"]
+ metadata_resp = client.get(f"/files/{file_id}", headers=AUTH)
+ assert metadata_resp.status_code == 200
+ content_address = metadata_resp.json()["content_address"]
+
+ download = client.get(f"/files/{file_id}/download", headers=AUTH)
+ assert download.status_code == 200
+ assert download.content == data
+ assert download.headers["etag"] == f'"{content_address}"'
+
+ partial = client.get(
+ f"/files/{file_id}/download",
+ headers={**AUTH, "Range": "bytes=6-17"},
+ )
+ assert partial.status_code == 206
+ assert partial.headers["content-range"] == f"bytes 6-17/{len(data)}"
+ assert partial.content == data[6:18]
+
+ not_modified = client.get(
+ f"/files/{file_id}/download",
+ headers={**AUTH, "If-None-Match": f'"{content_address}"'},
+ )
+ assert not_modified.status_code == 304
+
+ events_json = client.get(
+ "/events",
+ headers={**AUTH, "Accept": "application/json"},
+ params={"since": 0, "limit": 10, "wait_seconds": 0},
+ )
+ assert events_json.status_code == 200
+ assert [e["event_type"] for e in events_json.json()["events"]] == [
+ "v1.package.created",
+ "v1.retention.default_applied",
+ "v1.file.ingested",
+ "v1.package.finalized",
+ ]
+
+ events_cbor = client.get(
+ "/events",
+ headers={**AUTH, "Accept": "application/cbor"},
+ params={"since": 0, "limit": 10, "wait_seconds": 0},
+ )
+ assert events_cbor.status_code == 200
+ assert cbor2.loads(events_cbor.content)["events"][0]["sequence"] == 1
+
+
+def test_http_scripted_50_file_package_flow(tmp_path: Path) -> None:
+ app = create_app(_settings(tmp_path))
+ with TestClient(app) as client:
+ package_id = _create_package(client, name="fifty")
+ uploaded: list[tuple[str, bytes, dict[str, Any]]] = []
+ for idx in range(50):
+ rel_path = f"bundle/file-{idx:02d}.bin"
+ payload = f"payload {idx:02d}:".encode() + bytes([idx]) * (idx + 1)
+ record = _upload_file(client, package_id, rel_path, payload)
+ uploaded.append((rel_path, payload, record))
+
+ finalized = client.post(f"/packages/{package_id}/finalize", headers=AUTH)
+ assert finalized.status_code == 200, finalized.text
+
+ for rel_path, payload, record in uploaded:
+ assert record["relative_path"] == rel_path
+ assert record["digest_primary_hex"] == digest_bytes(payload).primary.hex
+ downloaded = client.get(f"/files/{record['id']}/download", headers=AUTH)
+ assert downloaded.status_code == 200
+ assert downloaded.content == payload
+
+ events = client.get(
+ "/events",
+ headers={**AUTH, "Accept": "application/json"},
+ params={"since": 0, "limit": 100, "wait_seconds": 0},
+ )
+ assert events.status_code == 200
+ assert len(events.json()["events"]) == 53
+ assert events.json()["events"][-1]["event_type"] == "v1.package.finalized"
+
+
+@given(
+ data=st.binary(min_size=1, max_size=512),
+ stem=st.text(alphabet=list("abcdefghijklmnopqrstuvwxyz0123456789_-"), min_size=1, max_size=24),
+)
+@hypothesis_settings(
+ max_examples=12,
+ deadline=None,
+ suppress_health_check=[HealthCheck.function_scoped_fixture],
+)
+def test_upload_session_lifecycle_property(data: bytes, stem: str) -> None:
+ with tempfile.TemporaryDirectory() as tmp:
+ app = create_app(_settings(Path(tmp)))
+ with TestClient(app) as client:
+ package_id = _create_package(client, name="upload-session")
+ opened = client.post(
+ "/uploads",
+ headers=AUTH,
+ json={"expected_size_bytes": len(data), "media_type": "application/octet-stream"},
+ )
+ assert opened.status_code == 201, opened.text
+ upload_url = opened.json()["content_upload_url"]
+
+ patched = client.patch(
+ upload_url,
+ headers={**AUTH, "Content-Range": f"bytes 0-{len(data) - 1}/{len(data)}"},
+ content=data,
+ )
+ assert patched.status_code == 200, patched.text
+ assert patched.json()["received_bytes"] == len(data)
+
+ completed = client.post(
+ f"{upload_url}/complete",
+ headers=AUTH,
+ json={
+ "package_id": package_id,
+ "relative_path": f"uploads/{stem}.bin",
+ "media_type": "application/octet-stream",
+ },
+ )
+ assert completed.status_code == 201, completed.text
+ file_id = completed.json()["id"]
+
+ downloaded = client.get(f"/files/{file_id}/download", headers=AUTH)
+ assert downloaded.status_code == 200
+ assert downloaded.content == data
diff --git a/tests/integration/test_registry.py b/tests/integration/test_registry.py
index 4f23855..7420582 100644
--- a/tests/integration/test_registry.py
+++ b/tests/integration/test_registry.py
@@ -243,18 +243,19 @@ async def test_end_to_end_ingest_finalize_replay(
stream = await registry.get_file(fid)
assert await _consume(stream) == expected
- # Tail events: 1 created + 3 ingested + 1 finalized = 5.
+ # Tail events: 1 created + 1 default retention + 3 ingested + 1 finalized = 6.
collected = []
async def _consume_tail() -> None:
async for evt in registry.tail_events(since_sequence=0, poll_interval_seconds=0.01):
collected.append(evt)
- if len(collected) >= 5:
+ if len(collected) >= 6:
break
await asyncio.wait_for(_consume_tail(), timeout=5.0)
assert [e.event_type for e in collected] == [
"v1.package.created",
+ "v1.retention.default_applied",
"v1.file.ingested",
"v1.file.ingested",
"v1.file.ingested",
diff --git a/tests/integration/test_retention_lifecycle.py b/tests/integration/test_retention_lifecycle.py
new file mode 100644
index 0000000..773f0a6
--- /dev/null
+++ b/tests/integration/test_retention_lifecycle.py
@@ -0,0 +1,250 @@
+"""Retention lifecycle integration tests for ARTIFACT-STORE-WP-0003."""
+
+from __future__ import annotations
+
+from collections.abc import AsyncIterator
+from datetime import datetime, timedelta
+from pathlib import Path
+
+import cbor2
+import pytest
+import pytest_asyncio
+from fastapi.testclient import TestClient
+from sqlalchemy import insert
+from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
+
+from artifactstore.api.http import create_app
+from artifactstore.config import Settings
+from artifactstore.dataplane import InProcessDataPlane
+from artifactstore.db.schema import metadata, retention_classes
+from artifactstore.db.seed import RETENTION_CLASS_SEEDS
+from artifactstore.events import RegistryViewWriter
+from artifactstore.registry import Registry, RetentionStateError
+from artifactstore.retention import RetentionPolicy
+from artifactstore.storage import LocalBackend
+
+AUTH = {"Authorization": "Bearer test-token"}
+
+
+@pytest_asyncio.fixture
+async def engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]:
+ db_path = tmp_path / "retention.db"
+ eng = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
+ async with eng.begin() as conn:
+ await conn.run_sync(metadata.create_all)
+ for seed in RETENTION_CLASS_SEEDS:
+ await conn.execute(insert(retention_classes).values(**seed))
+ yield eng
+ await eng.dispose()
+
+
+@pytest.fixture
+def registry(engine: AsyncEngine, tmp_path: Path) -> Registry:
+ backend = LocalBackend(tmp_path / "store", backend_id="local")
+ dataplane = InProcessDataPlane(backend, tmp_dir=tmp_path / "dp-tmp")
+ return Registry(
+ engine,
+ dataplane,
+ RegistryViewWriter(),
+ RetentionPolicy({"transient": 0}),
+ )
+
+
+def _http_settings(tmp_path: Path, retention_config_path: Path | None = None) -> Settings:
+ db_path = tmp_path / "retention-http.db"
+ storage_root = tmp_path / "storage"
+ storage_root.mkdir(parents=True, exist_ok=True)
+ from sqlalchemy import create_engine
+
+ sync_engine = create_engine(f"sqlite:///{db_path}", future=True)
+ metadata.create_all(sync_engine)
+ with sync_engine.begin() as conn:
+ conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
+ sync_engine.dispose()
+ return Settings(
+ database_url=f"sqlite+aiosqlite:///{db_path}",
+ storage_local_root=str(storage_root),
+ log_level="INFO",
+ auth_tokens="test-token",
+ retention_config_path=str(retention_config_path or ""),
+ )
+
+
+def _create_package(
+ client: TestClient,
+ *,
+ retention_class: str = "raw-evidence",
+) -> dict[str, object]:
+ resp = client.post(
+ "/packages",
+ headers=AUTH,
+ json={
+ "name": "retention",
+ "producer": "tests",
+ "subject": "retention-subject",
+ "retention_class": retention_class,
+ "metadata": {},
+ },
+ )
+ assert resp.status_code == 201, resp.text
+ return dict(resp.json())
+
+
+async def test_default_retention_and_permanent_record(registry: Registry) -> None:
+ transient_id = await registry.create_package(
+ name="short",
+ producer="tests",
+ subject="transient",
+ retention_class="transient",
+ actor="ops",
+ )
+ transient_state = await registry.get_retention_state(transient_id)
+ assert transient_state.current_expires_at is not None
+ assert transient_state.eligible_for_deletion is False
+
+ permanent_id = await registry.create_package(
+ name="forever",
+ producer="tests",
+ subject="permanent",
+ retention_class="permanent-record",
+ actor="ops",
+ )
+ permanent_state = await registry.get_retention_state(permanent_id)
+ assert permanent_state.current_expires_at is None
+ assert permanent_state.eligible_for_deletion is False
+
+ history = await registry.retention_history(transient_id)
+ assert [event.event_type for event in history] == ["v1.retention.default_applied"]
+ assert cbor2.loads(history[0].payload)["default_duration_seconds"] == 0
+
+
+async def test_retention_extension_requires_later_expiry(registry: Registry) -> None:
+ package_id = await registry.create_package(
+ name="extend",
+ producer="tests",
+ subject="extension",
+ retention_class="transient",
+ actor="ops",
+ )
+ current = await registry.get_retention_state(package_id)
+ assert current.current_expires_at is not None
+
+ with pytest.raises(RetentionStateError, match="strictly later"):
+ await registry.extend_retention(
+ package_id,
+ new_expires_at=current.current_expires_at,
+ reason="not later",
+ actor="ops",
+ )
+
+ new_expiry = current.current_expires_at + timedelta(days=1)
+ extended = await registry.extend_retention(
+ package_id,
+ new_expires_at=new_expiry,
+ reason="needed for quarterly review",
+ actor="ops",
+ )
+ assert extended.current_expires_at == new_expiry
+ history = await registry.retention_history(package_id)
+ assert [event.event_type for event in history] == [
+ "v1.retention.default_applied",
+ "v1.retention.extended",
+ ]
+
+
+async def test_hold_release_and_sweeper_eligibility_transition(registry: Registry) -> None:
+ package_id = await registry.create_package(
+ name="held",
+ producer="tests",
+ subject="hold-release",
+ retention_class="transient",
+ actor="ops",
+ )
+ initial = await registry.get_retention_state(package_id)
+ assert initial.current_expires_at is not None
+ after_expiry = initial.current_expires_at + timedelta(seconds=5)
+
+ hold_id = await registry.apply_retention_hold(
+ package_id,
+ reason="quarterly hold",
+ actor="ops",
+ )
+ held = await registry.get_retention_state(package_id)
+ assert held.active_hold_id == hold_id
+
+ assert await registry.sweep_deletion_eligibility(now=after_expiry) == []
+ still_held = await registry.get_retention_state(package_id)
+ assert still_held.eligible_for_deletion is False
+
+ released = await registry.release_retention_hold(
+ package_id,
+ hold_id,
+ reason="hold complete",
+ actor="ops",
+ now=after_expiry,
+ )
+ assert released.active_hold_id is None
+ assert released.eligible_for_deletion is True
+
+ assert await registry.sweep_deletion_eligibility(now=after_expiry) == []
+ history = await registry.retention_history(package_id)
+ assert [event.event_type for event in history] == [
+ "v1.retention.default_applied",
+ "v1.retention.hold_applied",
+ "v1.retention.hold_released",
+ "v1.retention.deletion_eligible",
+ ]
+
+
+def test_http_retention_controls_and_history_formats(tmp_path: Path) -> None:
+ app = create_app(_http_settings(tmp_path))
+ with TestClient(app) as client:
+ package = _create_package(client)
+ package_id = str(package["id"])
+ current_expires_at = datetime.fromisoformat(str(package["expires_at"]))
+ new_expiry = current_expires_at + timedelta(days=7)
+
+ extended = client.post(
+ f"/packages/{package_id}/retention/extensions",
+ headers=AUTH,
+ json={
+ "new_expires_at": new_expiry.isoformat(),
+ "reason": "retain for release signoff",
+ },
+ )
+ assert extended.status_code == 200, extended.text
+ assert extended.json()["current_expires_at"] == new_expiry.isoformat()
+
+ hold = client.post(
+ f"/packages/{package_id}/retention/holds",
+ headers=AUTH,
+ json={"reason": "external audit"},
+ )
+ assert hold.status_code == 201, hold.text
+ hold_id = hold.json()["hold_id"]
+
+ released = client.post(
+ f"/packages/{package_id}/retention/holds/{hold_id}/release",
+ headers=AUTH,
+ json={"reason": "audit complete"},
+ )
+ assert released.status_code == 200, released.text
+ assert released.json()["active_hold_id"] is None
+
+ history_json = client.get(f"/packages/{package_id}/retention/history", headers=AUTH)
+ assert history_json.status_code == 200
+ assert [event["event_type"] for event in history_json.json()["events"]] == [
+ "v1.retention.default_applied",
+ "v1.retention.extended",
+ "v1.retention.hold_applied",
+ "v1.retention.hold_released",
+ ]
+
+ history_cbor = client.get(
+ f"/packages/{package_id}/retention/history",
+ headers={**AUTH, "Accept": "application/cbor"},
+ )
+ assert history_cbor.status_code == 200
+ assert cbor2.loads(history_cbor.content)["events"][1]["event_type"] == (
+ "v1.retention.extended"
+ )
diff --git a/workplans/ARTIFACT-STORE-WP-0002-ingestion-api.md b/workplans/ARTIFACT-STORE-WP-0002-ingestion-api.md
index 0f31279..e082031 100644
--- a/workplans/ARTIFACT-STORE-WP-0002-ingestion-api.md
+++ b/workplans/ARTIFACT-STORE-WP-0002-ingestion-api.md
@@ -4,13 +4,13 @@ type: workplan
title: "Ingestion API And Manifest Surface"
repo: artifact-store
domain: stack
-status: planned
+status: done
owner: codex
topic_slug: stack
planning_priority: high
planning_order: 2
created: "2026-05-15"
-updated: "2026-05-15"
+updated: "2026-05-16"
state_hub_workstream_id: "cedbfe03-363c-43fd-a5cb-bef52b29af7e"
---
@@ -37,9 +37,9 @@ download files, and tail the event stream.
```task
id: ARTIFACT-STORE-WP-0002-T001
-status: cancelled
+status: done
priority: high
-state_hub_task_id: "e3879111-4be9-4731-8aea-15abb874f960"
+state_hub_task_id: "197e22ff-0003-433d-bfa0-2323152b85dc"
```
Acceptance:
@@ -58,7 +58,7 @@ Acceptance:
```task
id: ARTIFACT-STORE-WP-0002-T002
-status: todo
+status: done
priority: high
state_hub_task_id: "9c8c3853-2090-42be-9995-0b8ce4a76104"
```
@@ -78,7 +78,7 @@ Acceptance:
```task
id: ARTIFACT-STORE-WP-0002-T003
-status: todo
+status: done
priority: medium
state_hub_task_id: "710bbd2f-9bc1-4395-bbd1-2b22c1b7eb37"
```
@@ -99,7 +99,7 @@ Acceptance:
```task
id: ARTIFACT-STORE-WP-0002-T004
-status: todo
+status: done
priority: medium
state_hub_task_id: "d848bc41-edfa-48fc-bb2c-f2526f422c50"
```
@@ -117,7 +117,7 @@ Acceptance:
```task
id: ARTIFACT-STORE-WP-0002-T005
-status: todo
+status: done
priority: medium
state_hub_task_id: "27d33e90-6b31-4c1f-832b-870cd2c5fbe5"
```
@@ -134,7 +134,7 @@ Acceptance:
```task
id: ARTIFACT-STORE-WP-0002-T006
-status: todo
+status: done
priority: high
state_hub_task_id: "f422696f-a206-4030-be05-c342f94e9efd"
```
diff --git a/workplans/ARTIFACT-STORE-WP-0003-retention-lifecycle.md b/workplans/ARTIFACT-STORE-WP-0003-retention-lifecycle.md
index 3cbf66d..b25111e 100644
--- a/workplans/ARTIFACT-STORE-WP-0003-retention-lifecycle.md
+++ b/workplans/ARTIFACT-STORE-WP-0003-retention-lifecycle.md
@@ -39,9 +39,9 @@ WP-0006).
```task
id: ARTIFACT-STORE-WP-0003-T001
-status: cancelled
+status: done
priority: high
-state_hub_task_id: "2d6cbd83-c348-45ad-a223-7870a3412225"
+state_hub_task_id: "25531837-d2ff-4252-b0d0-31283597737f"
```
Acceptance: