From c33baa363555e276f3a9673f63599efbdda6f377 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 16 May 2026 23:10:21 +0200 Subject: [PATCH] Implement HTTP ingestion and retention lifecycle --- .env.example | 17 + docs/OPERATOR.md | 61 +- src/artifactstore/api/http/__init__.py | 802 +++++++++++++++++- src/artifactstore/app.py | 4 +- src/artifactstore/cli/__init__.py | 224 +++++ src/artifactstore/config.py | 15 + src/artifactstore/events/views.py | 118 +++ src/artifactstore/registry/__init__.py | 533 +++++++++++- src/artifactstore/retention/__init__.py | 88 +- tests/integration/test_cli_commands.py | 168 ++++ tests/integration/test_http_api.py | 240 ++++++ tests/integration/test_registry.py | 5 +- tests/integration/test_retention_lifecycle.py | 250 ++++++ .../ARTIFACT-STORE-WP-0002-ingestion-api.md | 18 +- ...IFACT-STORE-WP-0003-retention-lifecycle.md | 4 +- 15 files changed, 2478 insertions(+), 69 deletions(-) create mode 100644 tests/integration/test_http_api.py create mode 100644 tests/integration/test_retention_lifecycle.py diff --git a/.env.example b/.env.example index ffa1b09..29f1309 100644 --- a/.env.example +++ b/.env.example @@ -14,3 +14,20 @@ ARTIFACTSTORE_STORAGE_LOCAL_ROOT=./var/storage # Python logging level: DEBUG | INFO | WARNING | ERROR ARTIFACTSTORE_LOG_LEVEL=INFO + +# Shared-secret bearer tokens for the HTTP API. Comma- or newline-separated. +# Protected endpoints return 401 until at least one token is configured. +ARTIFACTSTORE_AUTH_TOKENS=dev-token + +# Read endpoints are authenticated by default. Set true only for local demos. +ARTIFACTSTORE_ANON_READ=false + +# Defaults used by `artifactstore push` and `artifactstore manifest`. +ARTIFACTSTORE_API_URL=http://127.0.0.1:8000 +ARTIFACTSTORE_API_TOKEN=dev-token + +# Optional TOML file overriding retention class default durations. +ARTIFACTSTORE_RETENTION_CONFIG_PATH= + +# Default interval for external schedulers that run `artifactstore retention sweep`. +ARTIFACTSTORE_RETENTION_SWEEP_INTERVAL_SECONDS=3600 diff --git a/docs/OPERATOR.md b/docs/OPERATOR.md index c58282d..729e4a3 100644 --- a/docs/OPERATOR.md +++ b/docs/OPERATOR.md @@ -1,12 +1,10 @@ # Operator Guide -Status: v0.1 (WP-0001 baseline) +Status: v0.1 (WP-0003 baseline) Updated: 2026-05-16 This guide is the user manual for running `artifact-store` v0.1 — the -library + CLI + minimal HTTP app that landed in WP-0001. Ingest, finalize, -and retrieve workflows go through the Python library today; the HTTP -upload API arrives in WP-0002. +library, CLI, HTTP ingestion API, manifest surface, and retention lifecycle. For architectural background see [ARCHITECTURE-BLUEPRINT.md](ARCHITECTURE-BLUEPRINT.md), the ADRs under @@ -50,9 +48,42 @@ All settings are prefixed with ``ARTIFACTSTORE_`` and read by | `ARTIFACTSTORE_DATABASE_URL` | `sqlite+aiosqlite:///./var/artifactstore.db` | SQLAlchemy async URL. Alembic translates `+aiosqlite` and `+asyncpg` to their sync drivers at migrate-time. | | `ARTIFACTSTORE_STORAGE_LOCAL_ROOT`| `./var/storage` | Root directory for the local filesystem storage backend. Created on first use. | | `ARTIFACTSTORE_LOG_LEVEL` | `INFO` | Python logging level (`DEBUG` / `INFO` / `WARNING` / `ERROR`). | +| `ARTIFACTSTORE_AUTH_TOKENS` | empty | Comma- or newline-separated shared-secret bearer tokens for the HTTP API. | +| `ARTIFACTSTORE_ANON_READ` | `false` | Set `true` only for local demos where read endpoints may be anonymous. | +| `ARTIFACTSTORE_API_URL` | `http://127.0.0.1:8000` | Default API base URL used by HTTP-backed CLI commands. | +| `ARTIFACTSTORE_API_TOKEN` | empty | Default bearer token used by HTTP-backed CLI commands. | +| `ARTIFACTSTORE_RETENTION_CONFIG_PATH` | empty | Optional TOML file overriding retention-class default durations. | +| `ARTIFACTSTORE_RETENTION_SWEEP_INTERVAL_SECONDS` | `3600` | Default interval for external schedulers that invoke the retention sweeper. | See [`.env.example`](../.env.example) for the canonical template. +### Retention policy TOML + +By default, retention durations come from the seeded `retention_classes` +rows. Operators can override the default duration per class with +`ARTIFACTSTORE_RETENTION_CONFIG_PATH`: + +```toml +[retention_classes.transient] +default_duration_seconds = 86400 + +[retention_classes."raw-evidence"] +default_duration_seconds = 7776000 + +[retention_classes."summary-evidence"] +default_duration_seconds = 31536000 + +[retention_classes."release-evidence"] +default_duration_seconds = 220752000 + +[retention_classes."permanent-record"] +# Omit default_duration_seconds for no expiry. +``` + +Run `artifactstore retention sweep` from cron or another scheduler to mark +expired, unheld packages eligible for deletion. This work only records +eligibility; it never deletes bytes. + ## Database backends ### SQLite (development default) @@ -113,21 +144,27 @@ lands in WP-0004. | `artifactstore migrate` | Run `alembic upgrade head` against the configured database. | | `artifactstore replay` | Truncate every materialised view and rebuild it from the event log; prints the highest sequence applied. | | `artifactstore health` | JSON liveness summary (db, backend, status). Same payload as the HTTP `/health` endpoint. | +| `artifactstore push ` | Push a directory through the HTTP API and finalize the package. | +| `artifactstore manifest ` | Fetch the JSON manifest projection through the HTTP API. | +| `artifactstore retention sweep` | Run one deletion-eligibility sweep against the configured DB. | The CLI is a thin client over `artifactstore.registry.Registry` (see [ADR-0005](adr/0005-v1-tech-stack.md)). ## HTTP reference (v0.1) -| Route | Purpose | -|----------------|---------| -| `GET /` | Service banner (scaffold marker). | -| `GET /health` | Liveness summary. Returns ``{status, db, backend, version}``. `status` is `ok` only when both the DB probe (`SELECT 1`) and the backend `health()` succeed. | -| `GET /docs` | FastAPI's interactive OpenAPI docs (`/openapi.json` underneath). | +| Route family | Purpose | +|-----------------------|---------| +| `GET /`, `GET /health` | Anonymous service banner and liveness summary. | +| `GET /docs`, `GET /openapi.json` | FastAPI's interactive OpenAPI docs and generated schema. | +| `/packages...` | Create, list, inspect, upload files to, finalize, and retrieve manifests for packages. | +| `/files...` | File metadata and byte downloads, including single-range reads. | +| `/uploads...` | Upload-session wire shape for whole-body v1 uploads. | +| `/packages/{id}/retention...` | Extend retention, apply/release holds, and read retention history. | +| `GET /events` | Long-poll event feed, CBOR by default or JSON with `Accept: application/json`. | -Package CRUD, file upload/download, manifest retrieval, retention controls, -and the event stream all land in WP-0002–WP-0003. Today they are reachable -via the Python library. +All non-health routes require a bearer token unless +`ARTIFACTSTORE_ANON_READ=true` is set for read endpoints. ## End-to-end smoke test (Python library) diff --git a/src/artifactstore/api/http/__init__.py b/src/artifactstore/api/http/__init__.py index 23a1605..351f474 100644 --- a/src/artifactstore/api/http/__init__.py +++ b/src/artifactstore/api/http/__init__.py @@ -1,41 +1,138 @@ -"""FastAPI application — HTTP surface for the registry. - -T014 ships a minimal app with two routes: - -* ``GET /`` — service banner. -* ``GET /health`` — registry liveness + DB connectivity + storage backend. - -Richer endpoints (package CRUD, file upload, manifest retrieval, event -stream) land in workplan WP-0002. The app is built through -:func:`create_app` so tests can inject their own settings. -""" +"""FastAPI application — HTTP surface for the registry.""" from __future__ import annotations -from contextlib import asynccontextmanager -from typing import Any +import asyncio +import re +import secrets +from collections.abc import AsyncIterator, Mapping +from contextlib import asynccontextmanager, suppress +from dataclasses import dataclass +from datetime import UTC, datetime +from email import policy +from email.parser import BytesParser +from http import HTTPStatus +from typing import Any, NoReturn, cast +from uuid import UUID, uuid4 -from fastapi import Depends, FastAPI, Request +import cbor2 +import jcs +from fastapi import Depends, FastAPI, Header, HTTPException, Query, Request, Response, status +from fastapi.exceptions import RequestValidationError +from fastapi.responses import JSONResponse, StreamingResponse +from pydantic import BaseModel, Field from artifactstore import __version__ from artifactstore.app import build_registry -from artifactstore.config import Settings -from artifactstore.registry import Registry +from artifactstore.config import Settings, get_settings +from artifactstore.events.model import Event +from artifactstore.registry import ( + DuplicateRelativePathError, + FileNotFoundError, + FileRecord, + IllegalPackageStateError, + PackageNotFoundError, + PackageRecord, + Registry, + RetentionClassRecord, + RetentionStateError, + RetentionStateRecord, +) __all__ = ["app", "create_app"] +_RANGE_RE = re.compile(r"^bytes=(?P\d*)-(?P\d*)$") +_CONTENT_RANGE_RE = re.compile(r"^bytes (?P\d+)-(?P\d+)/(?P\d+|\*)$") +_MAX_EVENT_LIMIT = 500 + + +class PackageCreate(BaseModel): + name: str = Field(min_length=1) + producer: str = Field(min_length=1) + subject: str = Field(min_length=1) + retention_class: str = Field(min_length=1) + metadata: dict[str, Any] = Field(default_factory=dict) + + +class UploadCreate(BaseModel): + expected_size_bytes: int | None = Field(default=None, ge=0) + media_type: str | None = None + + +class UploadComplete(BaseModel): + package_id: UUID + relative_path: str = Field(min_length=1) + media_type: str | None = None + + +class RetentionExtensionCreate(BaseModel): + new_expires_at: datetime + reason: str = Field(min_length=1) + + +class RetentionHoldCreate(BaseModel): + reason: str = Field(min_length=1) + + +class RetentionHoldRelease(BaseModel): + reason: str = Field(min_length=1) + + +@dataclass(slots=True) +class MultipartFile: + filename: str + content_type: str + content: bytes + fields: dict[str, str] + + +@dataclass(slots=True) +class UploadSession: + upload_id: UUID + buffer: bytearray + expected_size_bytes: int | None + media_type: str | None + status: str = "open" + def get_registry(request: Request) -> Registry: return request.app.state.registry # type: ignore[no-any-return] +def get_http_settings(request: Request) -> Settings: + return request.app.state.settings # type: ignore[no-any-return] + + +async def require_read_auth( + request: Request, + authorization: str | None = Header(default=None, alias="Authorization"), +) -> str: + settings = get_http_settings(request) + if settings.anon_read: + return "anonymous" + return _require_bearer(settings, authorization) + + +async def require_write_auth( + request: Request, + authorization: str | None = Header(default=None, alias="Authorization"), +) -> str: + settings = get_http_settings(request) + return _require_bearer(settings, authorization) + + def create_app(settings: Settings | None = None) -> FastAPI: """Build the FastAPI app. Lifespan owns the registry instance.""" + effective_settings = settings or get_settings() + @asynccontextmanager - async def lifespan(application: FastAPI) -> Any: - registry = build_registry(settings) + async def lifespan(application: FastAPI) -> AsyncIterator[None]: + registry = build_registry(effective_settings) application.state.registry = registry + application.state.settings = effective_settings + application.state.upload_sessions = {} + application.state.upload_lock = asyncio.Lock() try: yield finally: @@ -47,6 +144,26 @@ def create_app(settings: Settings | None = None) -> FastAPI: lifespan=lifespan, ) + @application.exception_handler(HTTPException) + async def http_exception_handler(_request: Request, exc: HTTPException) -> JSONResponse: + return _problem_response( + status_code=exc.status_code, + title=_status_phrase(exc.status_code), + detail=str(exc.detail), + headers=exc.headers, + ) + + @application.exception_handler(RequestValidationError) + async def validation_exception_handler( + _request: Request, + exc: RequestValidationError, + ) -> JSONResponse: + return _problem_response( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + title="Validation error", + detail=str(exc.errors()), + ) + @application.get("/") def root() -> dict[str, str]: return { @@ -74,7 +191,656 @@ def create_app(settings: Settings | None = None) -> FastAPI: }, } + @application.get("/backends") + async def backends( + _actor: str = Depends(require_read_auth), + registry: Registry = Depends(get_registry), + ) -> dict[str, Any]: + backend_status = await registry.backend_health() + return { + "backends": [ + { + "backend_id": backend_status.backend_id, + "healthy": backend_status.healthy, + "detail": backend_status.detail, + "free_bytes": backend_status.free_bytes, + "total_bytes": backend_status.total_bytes, + } + ] + } + + @application.get("/retention-classes") + async def retention_classes( + _actor: str = Depends(require_read_auth), + registry: Registry = Depends(get_registry), + ) -> dict[str, Any]: + classes = await registry.list_retention_classes() + return {"retention_classes": [_retention_class_dict(c) for c in classes]} + + @application.post("/packages", status_code=status.HTTP_201_CREATED) + async def create_package( + body: PackageCreate, + actor: str = Depends(require_write_auth), + registry: Registry = Depends(get_registry), + ) -> dict[str, Any]: + try: + package_id = await registry.create_package( + name=body.name, + producer=body.producer, + subject=body.subject, + retention_class=body.retention_class, + actor=actor, + metadata=body.metadata, + ) + return _package_dict(await registry.get_package(package_id)) + except ValueError as exc: + _raise_problem(status.HTTP_400_BAD_REQUEST, str(exc)) + + @application.get("/packages") + async def list_packages( + producer: str | None = None, + subject: str | None = None, + retention_class: str | None = None, + metadata_key: str | None = None, + metadata_value: str | None = None, + _actor: str = Depends(require_read_auth), + registry: Registry = Depends(get_registry), + ) -> dict[str, Any]: + packages = await registry.list_packages( + producer=producer, + subject=subject, + retention_class=retention_class, + metadata_key=metadata_key, + metadata_value=metadata_value, + ) + return {"packages": [_package_dict(p) for p in packages]} + + @application.get("/packages/{package_id}") + async def get_package( + package_id: UUID, + _actor: str = Depends(require_read_auth), + registry: Registry = Depends(get_registry), + ) -> dict[str, Any]: + try: + return _package_dict(await registry.get_package(package_id)) + except PackageNotFoundError as exc: + _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) + + @application.post("/packages/{package_id}/files", status_code=status.HTTP_201_CREATED) + async def upload_package_file( + package_id: UUID, + request: Request, + actor: str = Depends(require_write_auth), + registry: Registry = Depends(get_registry), + ) -> dict[str, Any]: + multipart = await _read_single_file_multipart(request) + relative_path = multipart.fields.get("relative_path") or multipart.filename + media_type = multipart.fields.get("media_type") or multipart.content_type + try: + file_id = await registry.ingest_file( + package_id, + relative_path=relative_path, + media_type=media_type or "application/octet-stream", + stream=_bytes_stream(multipart.content), + actor=actor, + ) + return _file_dict(await registry.get_file_metadata(file_id)) + except PackageNotFoundError as exc: + _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) + except DuplicateRelativePathError as exc: + _raise_problem(status.HTTP_409_CONFLICT, str(exc)) + except IllegalPackageStateError as exc: + _raise_problem(status.HTTP_409_CONFLICT, str(exc)) + + @application.post("/packages/{package_id}/finalize") + async def finalize_package( + package_id: UUID, + actor: str = Depends(require_write_auth), + registry: Registry = Depends(get_registry), + ) -> dict[str, Any]: + try: + await registry.finalize_package(package_id, actor=actor) + return _package_dict(await registry.get_package(package_id)) + except PackageNotFoundError as exc: + _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) + except IllegalPackageStateError as exc: + _raise_problem(status.HTTP_409_CONFLICT, str(exc)) + + @application.post("/packages/{package_id}/retention/extensions") + async def extend_retention( + package_id: UUID, + body: RetentionExtensionCreate, + actor: str = Depends(require_write_auth), + registry: Registry = Depends(get_registry), + ) -> dict[str, Any]: + try: + state = await registry.extend_retention( + package_id, + new_expires_at=body.new_expires_at, + reason=body.reason, + actor=actor, + ) + return _retention_state_dict(state) + except PackageNotFoundError as exc: + _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) + except RetentionStateError as exc: + _raise_problem(status.HTTP_409_CONFLICT, str(exc)) + + @application.post("/packages/{package_id}/retention/holds", status_code=status.HTTP_201_CREATED) + async def apply_hold( + package_id: UUID, + body: RetentionHoldCreate, + actor: str = Depends(require_write_auth), + registry: Registry = Depends(get_registry), + ) -> dict[str, Any]: + try: + hold_id = await registry.apply_retention_hold( + package_id, + reason=body.reason, + actor=actor, + ) + state = await registry.get_retention_state(package_id) + return {"hold_id": str(hold_id), "retention": _retention_state_dict(state)} + except PackageNotFoundError as exc: + _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) + except RetentionStateError as exc: + _raise_problem(status.HTTP_409_CONFLICT, str(exc)) + + @application.post("/packages/{package_id}/retention/holds/{hold_id}/release") + async def release_hold( + package_id: UUID, + hold_id: UUID, + body: RetentionHoldRelease, + actor: str = Depends(require_write_auth), + registry: Registry = Depends(get_registry), + ) -> dict[str, Any]: + try: + state = await registry.release_retention_hold( + package_id, + hold_id, + reason=body.reason, + actor=actor, + ) + return _retention_state_dict(state) + except PackageNotFoundError as exc: + _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) + except RetentionStateError as exc: + _raise_problem(status.HTTP_409_CONFLICT, str(exc)) + + @application.get("/packages/{package_id}/retention/history") + async def retention_history( + package_id: UUID, + request: Request, + _actor: str = Depends(require_read_auth), + registry: Registry = Depends(get_registry), + ) -> Response: + try: + history = await registry.retention_history(package_id) + except PackageNotFoundError as exc: + _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) + payload = {"events": [_event_dict(event) for event in history]} + accept = request.headers.get("accept", "") + if "application/cbor" in accept: + return Response( + content=cbor2.dumps(payload, canonical=True), + media_type="application/cbor", + ) + return Response(content=jcs.canonicalize(payload), media_type="application/json") + + @application.get("/packages/{package_id}/manifest") + async def get_manifest_cbor( + package_id: UUID, + _actor: str = Depends(require_read_auth), + registry: Registry = Depends(get_registry), + ) -> Response: + try: + payload = await registry.get_manifest_bytes(package_id, format="cbor") + except PackageNotFoundError as exc: + _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) + return Response(content=payload, media_type="application/cbor") + + @application.get("/packages/{package_id}/manifest.json") + async def get_manifest_json( + package_id: UUID, + _actor: str = Depends(require_read_auth), + registry: Registry = Depends(get_registry), + ) -> Response: + try: + payload = await registry.get_manifest_bytes(package_id, format="json") + except PackageNotFoundError as exc: + _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) + return Response(content=payload, media_type="application/json") + + @application.get("/files/{file_id}") + async def get_file_metadata( + file_id: UUID, + _actor: str = Depends(require_read_auth), + registry: Registry = Depends(get_registry), + ) -> dict[str, Any]: + try: + return _file_dict(await registry.get_file_metadata(file_id)) + except FileNotFoundError as exc: + _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) + + @application.get("/files/{file_id}/download") + async def download_file( + file_id: UUID, + range_header: str | None = Header(default=None, alias="Range"), + if_none_match: str | None = Header(default=None, alias="If-None-Match"), + _actor: str = Depends(require_read_auth), + registry: Registry = Depends(get_registry), + ) -> Response: + try: + file_record = await registry.get_file_metadata(file_id) + except FileNotFoundError as exc: + _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) + etag = _etag(file_record.content_address) + headers = { + "Accept-Ranges": "bytes", + "ETag": etag, + "Content-Type": file_record.media_type, + } + if if_none_match and _etag_matches(if_none_match, file_record.content_address): + return Response(status_code=status.HTTP_304_NOT_MODIFIED, headers=headers) + + byte_range = _parse_range(range_header, file_record.size_bytes) + status_code = status.HTTP_200_OK + if byte_range is None: + headers["Content-Length"] = str(file_record.size_bytes) + else: + start, end = byte_range + headers["Content-Range"] = f"bytes {start}-{end}/{file_record.size_bytes}" + headers["Content-Length"] = str(end - start + 1) + status_code = status.HTTP_206_PARTIAL_CONTENT + + stream = await registry.get_file(file_id, byte_range=byte_range) + return StreamingResponse(stream, status_code=status_code, headers=headers) + + @application.post("/uploads", status_code=status.HTTP_201_CREATED) + async def open_upload( + body: UploadCreate, + request: Request, + _actor: str = Depends(require_write_auth), + ) -> dict[str, Any]: + upload_id = uuid4() + session = UploadSession( + upload_id=upload_id, + buffer=bytearray(), + expected_size_bytes=body.expected_size_bytes, + media_type=body.media_type, + ) + async with request.app.state.upload_lock: + request.app.state.upload_sessions[str(upload_id)] = session + return { + "upload_id": str(upload_id), + "status": session.status, + "content_upload_url": f"/uploads/{upload_id}", + } + + @application.patch("/uploads/{upload_id}") + async def patch_upload( + upload_id: UUID, + request: Request, + content_range: str | None = Header(default=None, alias="Content-Range"), + _actor: str = Depends(require_write_auth), + ) -> dict[str, Any]: + if content_range is None: + _raise_problem(status.HTTP_400_BAD_REQUEST, "Content-Range header is required") + body = await request.body() + start, end, total = _parse_content_range(content_range) + if end - start + 1 != len(body): + _raise_problem( + status.HTTP_400_BAD_REQUEST, + "Content-Range length does not match request body length", + ) + async with request.app.state.upload_lock: + session = _upload_session(request, upload_id) + if start != len(session.buffer): + _raise_problem( + status.HTTP_409_CONFLICT, + f"upload offset mismatch: expected {len(session.buffer)}, got {start}", + ) + session.buffer.extend(body) + if total is not None and len(session.buffer) > total: + _raise_problem(status.HTTP_400_BAD_REQUEST, "upload exceeds declared total size") + session.status = ( + "uploaded" if total is not None and len(session.buffer) == total else "open" + ) + return { + "upload_id": str(upload_id), + "status": session.status, + "received_bytes": len(session.buffer), + "expected_size_bytes": total, + } + + @application.post("/uploads/{upload_id}/complete", status_code=status.HTTP_201_CREATED) + async def complete_upload( + upload_id: UUID, + body: UploadComplete, + request: Request, + actor: str = Depends(require_write_auth), + registry: Registry = Depends(get_registry), + ) -> dict[str, Any]: + async with request.app.state.upload_lock: + session = _upload_session(request, upload_id) + payload = bytes(session.buffer) + media_type = body.media_type or session.media_type or "application/octet-stream" + try: + file_id = await registry.ingest_file( + body.package_id, + relative_path=body.relative_path, + media_type=media_type, + stream=_bytes_stream(payload), + actor=actor, + ) + except PackageNotFoundError as exc: + _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) + except DuplicateRelativePathError as exc: + _raise_problem(status.HTTP_409_CONFLICT, str(exc)) + except IllegalPackageStateError as exc: + _raise_problem(status.HTTP_409_CONFLICT, str(exc)) + async with request.app.state.upload_lock: + request.app.state.upload_sessions.pop(str(upload_id), None) + return _file_dict(await registry.get_file_metadata(file_id)) + + @application.get("/events") + async def events( + request: Request, + since: int = Query(default=0, ge=0), + limit: int = Query(default=100, ge=1, le=_MAX_EVENT_LIMIT), + wait_seconds: float = Query(default=1.0, ge=0.0, le=30.0), + _actor: str = Depends(require_read_auth), + registry: Registry = Depends(get_registry), + ) -> Response: + batch = await _long_poll_events( + registry, + since_sequence=since, + limit=limit, + wait_seconds=wait_seconds, + ) + payload = {"events": [_event_dict(e) for e in batch]} + accept = request.headers.get("accept", "") + if "application/json" in accept: + return Response(content=jcs.canonicalize(payload), media_type="application/json") + return Response(content=cbor2.dumps(payload, canonical=True), media_type="application/cbor") + return application +def _require_bearer(settings: Settings, authorization: str | None) -> str: + tokens = settings.bearer_tokens + if not tokens: + _raise_problem( + status.HTTP_401_UNAUTHORIZED, + "No bearer tokens are configured", + headers={"WWW-Authenticate": "Bearer"}, + ) + if authorization is None or not authorization.lower().startswith("bearer "): + _raise_problem( + status.HTTP_401_UNAUTHORIZED, + "Bearer token required", + headers={"WWW-Authenticate": "Bearer"}, + ) + supplied = authorization.partition(" ")[2].strip() + if not any(secrets.compare_digest(supplied, token) for token in tokens): + _raise_problem( + status.HTTP_401_UNAUTHORIZED, + "Invalid bearer token", + headers={"WWW-Authenticate": "Bearer"}, + ) + return "bearer" + + +def _raise_problem( + status_code: int, + detail: str, + *, + headers: Mapping[str, str] | None = None, +) -> NoReturn: + raise HTTPException(status_code=status_code, detail=detail, headers=headers) + + +def _problem_response( + *, + status_code: int, + title: str, + detail: str, + headers: Mapping[str, str] | None = None, +) -> JSONResponse: + return JSONResponse( + status_code=status_code, + content={ + "type": "about:blank", + "title": title, + "status": status_code, + "detail": detail, + }, + media_type="application/problem+json", + headers=headers, + ) + + +def _status_phrase(status_code: int) -> str: + try: + return HTTPStatus(status_code).phrase + except ValueError: + return "HTTP error" + + +def _package_dict(record: PackageRecord) -> dict[str, Any]: + return { + "id": str(record.id), + "name": record.name, + "producer": record.producer, + "subject": record.subject, + "retention_class": record.retention_class, + "metadata_schema_id": str(record.metadata_schema_id) if record.metadata_schema_id else None, + "metadata": record.metadata, + "status": record.status, + "manifest_digest": f"blake3:{record.manifest_digest_hex}" + if record.manifest_digest_hex + else None, + "created_at": _iso(record.created_at), + "finalized_at": _iso(record.finalized_at), + "expires_at": _iso(record.expires_at), + "last_event_sequence": record.last_event_sequence, + } + + +def _file_dict(record: FileRecord) -> dict[str, Any]: + return { + "id": str(record.id), + "package_id": str(record.package_id), + "relative_path": record.relative_path, + "media_type": record.media_type, + "size_bytes": record.size_bytes, + "digest_algorithm": record.digest_algorithm, + "digest_primary_hex": record.digest_primary_hex, + "digest_sha256_hex": record.digest_sha256_hex, + "content_address": record.content_address, + "created_at": _iso(record.created_at), + "storage": { + "backend_id": record.backend_id, + "object_key": record.object_key, + "retrieval_tier": record.retrieval_tier, + "status": record.storage_status, + }, + } + + +def _retention_class_dict(record: RetentionClassRecord) -> dict[str, Any]: + return { + "class_id": record.class_id, + "default_duration_seconds": record.default_duration_seconds, + "deletion_strategy": record.deletion_strategy, + } + + +def _retention_state_dict(record: RetentionStateRecord) -> dict[str, Any]: + return { + "package_id": str(record.package_id), + "current_expires_at": _iso(record.current_expires_at), + "effective_class": record.effective_class, + "active_hold_id": str(record.active_hold_id) if record.active_hold_id else None, + "eligible_for_deletion": record.eligible_for_deletion, + } + + +def _event_dict(event: Event) -> dict[str, Any]: + payload = cbor2.loads(event.payload) + return { + "sequence": event.sequence, + "created_at": _iso(event.created_at), + "event_type": event.event_type, + "subject_kind": event.subject_kind, + "subject_id": str(event.subject_id) if event.subject_id else None, + "actor": event.actor, + "payload_digest_hex": event.payload_digest.hex(), + "payload": payload, + } + + +def _iso(value: datetime | None) -> str | None: + if value is None: + return None + if value.tzinfo is None: + value = value.replace(tzinfo=UTC) + return value.isoformat() + + +async def _read_single_file_multipart(request: Request) -> MultipartFile: + content_type = request.headers.get("content-type", "") + if not content_type.lower().startswith("multipart/form-data"): + _raise_problem(status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, "multipart/form-data is required") + raw = await request.body() + message = cast( + Any, + BytesParser(policy=policy.default).parsebytes( + f"Content-Type: {content_type}\r\nMIME-Version: 1.0\r\n\r\n".encode() + raw + ), + ) + if not message.is_multipart(): + _raise_problem(status.HTTP_400_BAD_REQUEST, "request body is not multipart") + + fields: dict[str, str] = {} + chosen: tuple[str, str, bytes] | None = None + for part in message.iter_parts(): + if part.get_content_disposition() != "form-data": + continue + name = cast(str | None, part.get_param("name", header="content-disposition")) + filename = part.get_filename() + payload = cast(bytes, part.get_payload(decode=True) or b"") + if filename is None: + if name is not None: + fields[name] = payload.decode(part.get_content_charset() or "utf-8") + continue + if chosen is None or name == "file": + chosen = (filename, part.get_content_type(), payload) + + if chosen is None: + _raise_problem(status.HTTP_400_BAD_REQUEST, "multipart file part is required") + filename, part_content_type, payload = chosen + return MultipartFile( + filename=filename, + content_type=part_content_type or "application/octet-stream", + content=payload, + fields=fields, + ) + + +async def _bytes_stream(data: bytes) -> AsyncIterator[bytes]: + yield data + + +def _etag(content_address: str) -> str: + return f'"{content_address}"' + + +def _etag_matches(header_value: str, content_address: str) -> bool: + candidates = [v.strip() for v in header_value.split(",")] + return ( + "*" in candidates + or content_address in candidates + or _etag(content_address) in candidates + ) + + +def _parse_range(header_value: str | None, size_bytes: int) -> tuple[int, int] | None: + if header_value is None: + return None + match = _RANGE_RE.match(header_value.strip()) + if match is None: + _raise_problem( + status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, + "only a single bytes range is supported", + ) + start_raw = match.group("start") + end_raw = match.group("end") + if size_bytes <= 0: + _raise_problem(status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, "empty file has no range") + if start_raw == "": + if end_raw == "": + _raise_problem(status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, "invalid range") + suffix = int(end_raw) + if suffix <= 0: + _raise_problem(status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, "invalid suffix range") + start = max(size_bytes - suffix, 0) + end = size_bytes - 1 + else: + start = int(start_raw) + end = int(end_raw) if end_raw else size_bytes - 1 + if start >= size_bytes or end < start: + _raise_problem(status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, "range not satisfiable") + return start, min(end, size_bytes - 1) + + +def _parse_content_range(header_value: str) -> tuple[int, int, int | None]: + match = _CONTENT_RANGE_RE.match(header_value.strip()) + if match is None: + _raise_problem(status.HTTP_400_BAD_REQUEST, "invalid Content-Range header") + start = int(match.group("start")) + end = int(match.group("end")) + total_raw = match.group("total") + total = None if total_raw == "*" else int(total_raw) + if end < start: + _raise_problem(status.HTTP_400_BAD_REQUEST, "invalid Content-Range bounds") + if total is not None and end >= total: + _raise_problem(status.HTTP_400_BAD_REQUEST, "Content-Range exceeds total size") + return start, end, total + + +def _upload_session(request: Request, upload_id: UUID) -> UploadSession: + session = request.app.state.upload_sessions.get(str(upload_id)) + if session is None: + _raise_problem(status.HTTP_404_NOT_FOUND, f"upload not found: {upload_id}") + return session # type: ignore[no-any-return] + + +async def _long_poll_events( + registry: Registry, + *, + since_sequence: int, + limit: int, + wait_seconds: float, +) -> list[Event]: + batch = await registry.fetch_events(since_sequence=since_sequence, limit=limit) + if batch or wait_seconds == 0: + return batch + + collected: list[Event] = [] + + async def collect() -> None: + async for event in registry.tail_events( + since_sequence=since_sequence, + poll_interval_seconds=0.05, + ): + collected.append(event) + if len(collected) >= limit: + break + + with suppress(TimeoutError): + await asyncio.wait_for(collect(), timeout=wait_seconds) + return collected + + app = create_app() diff --git a/src/artifactstore/app.py b/src/artifactstore/app.py index 27c11ad..6aeddb0 100644 --- a/src/artifactstore/app.py +++ b/src/artifactstore/app.py @@ -12,6 +12,7 @@ from artifactstore.dataplane import InProcessDataPlane from artifactstore.db.engine import create_engine from artifactstore.events import RegistryViewWriter from artifactstore.registry import Registry +from artifactstore.retention import RetentionPolicy from artifactstore.storage import LocalBackend __all__ = ["build_registry"] @@ -24,4 +25,5 @@ def build_registry(settings: Settings | None = None) -> Registry: backend = LocalBackend(effective.storage_local_root, backend_id="local") dataplane = InProcessDataPlane(backend) view_writer = RegistryViewWriter() - return Registry(engine, dataplane, view_writer) + retention_policy = RetentionPolicy.from_toml(effective.retention_config_path) + return Registry(engine, dataplane, view_writer, retention_policy) diff --git a/src/artifactstore/cli/__init__.py b/src/artifactstore/cli/__init__.py index ce7d7b1..41097f3 100644 --- a/src/artifactstore/cli/__init__.py +++ b/src/artifactstore/cli/__init__.py @@ -10,9 +10,14 @@ from __future__ import annotations import asyncio import json +import mimetypes +import urllib.error +import urllib.request +import uuid from pathlib import Path from typing import Any +import click import typer from artifactstore import __version__ @@ -28,6 +33,8 @@ app = typer.Typer( help="artifact-store: artifact registry and storage gateway", no_args_is_help=True, ) +retention_app = typer.Typer(help="Retention lifecycle commands", no_args_is_help=True) +app.add_typer(retention_app, name="retention") @app.callback() @@ -74,6 +81,107 @@ def health() -> None: typer.echo(json.dumps(payload, indent=2)) +@app.command() +def push( + directory: Path = typer.Argument( + ..., + exists=True, + file_okay=False, + dir_okay=True, + readable=True, + help="Directory to push as one artifact package.", + ), + producer: str = typer.Option(..., "--producer", help="Producer slug for the package."), + subject: str = typer.Option(..., "--subject", help="Subject identifier for the package."), + retention_class: str = typer.Option( + "raw-evidence", + "--retention-class", + help="Retention class id to apply.", + ), + name: str | None = typer.Option(None, "--name", help="Package name; defaults to dir name."), + api_url: str | None = typer.Option(None, "--api-url", help="artifact-store base URL."), + token: str | None = typer.Option(None, "--token", help="Bearer token for the HTTP API."), +) -> None: + """Push a directory through the HTTP API and finalize the package.""" + settings = get_settings() + base_url = api_url or settings.api_url + bearer = token or settings.api_token + if not bearer: + raise click.ClickException("provide --token or ARTIFACTSTORE_API_TOKEN") + + files = sorted(path for path in directory.rglob("*") if path.is_file()) + package = _http_json( + "POST", + base_url, + "/packages", + bearer, + { + "name": name or directory.name, + "producer": producer, + "subject": subject, + "retention_class": retention_class, + "metadata": {"source_directory": str(directory)}, + }, + ) + package_id = str(package["id"]) + + for path in files: + rel_path = path.relative_to(directory).as_posix() + media_type = mimetypes.guess_type(path.name)[0] or "application/octet-stream" + _http_multipart( + base_url, + f"/packages/{package_id}/files", + bearer, + fields={"relative_path": rel_path, "media_type": media_type}, + file_field="file", + file_name=path.name, + file_content_type=media_type, + file_bytes=path.read_bytes(), + ) + + finalized = _http_json("POST", base_url, f"/packages/{package_id}/finalize", bearer, {}) + typer.echo( + json.dumps( + { + "package_id": package_id, + "manifest_digest": finalized.get("manifest_digest"), + "files": len(files), + }, + indent=2, + ) + ) + + +@app.command() +def manifest( + package_id: str = typer.Argument(..., help="Package UUID."), + api_url: str | None = typer.Option(None, "--api-url", help="artifact-store base URL."), + token: str | None = typer.Option(None, "--token", help="Bearer token for the HTTP API."), +) -> None: + """Print a package manifest JSON projection from the HTTP API.""" + settings = get_settings() + base_url = api_url or settings.api_url + bearer = token or settings.api_token + if not bearer: + raise click.ClickException("provide --token or ARTIFACTSTORE_API_TOKEN") + payload = _http_bytes( + "GET", + base_url, + f"/packages/{package_id}/manifest.json", + bearer, + headers={"Accept": "application/json"}, + ) + typer.echo(payload.decode("utf-8")) + + +@retention_app.command("sweep") +def retention_sweep() -> None: + """Run the deletion-eligibility sweeper once against the configured DB.""" + settings = get_settings() + marked = asyncio.run(_retention_sweep_async(settings)) + typer.echo(json.dumps({"marked_package_ids": marked, "marked_count": len(marked)}, indent=2)) + + # ---- internals ------------------------------------------------------------- @@ -110,5 +218,121 @@ async def _health_async(settings: Settings) -> dict[str, Any]: } +async def _retention_sweep_async(settings: Settings) -> list[str]: + from artifactstore.app import build_registry + + registry: Registry = build_registry(settings) + try: + marked = await registry.sweep_deletion_eligibility() + finally: + await registry.dispose() + return [str(package_id) for package_id in marked] + + +def _http_json( + method: str, + base_url: str, + path: str, + token: str, + payload: dict[str, Any], +) -> dict[str, Any]: + response = _http_bytes( + method, + base_url, + path, + token, + body=json.dumps(payload).encode("utf-8"), + headers={"Content-Type": "application/json", "Accept": "application/json"}, + ) + decoded = json.loads(response) + if not isinstance(decoded, dict): + raise click.ClickException(f"expected JSON object from {path}") + return decoded + + +def _http_multipart( + base_url: str, + path: str, + token: str, + *, + fields: dict[str, str], + file_field: str, + file_name: str, + file_content_type: str, + file_bytes: bytes, +) -> dict[str, Any]: + boundary = f"artifactstore-{uuid.uuid4().hex}" + body = bytearray() + for name, value in fields.items(): + body.extend(f"--{boundary}\r\n".encode("ascii")) + body.extend( + f'Content-Disposition: form-data; name="{_quote_header_value(name)}"\r\n\r\n'.encode() + ) + body.extend(value.encode()) + body.extend(b"\r\n") + body.extend(f"--{boundary}\r\n".encode("ascii")) + body.extend( + ( + f'Content-Disposition: form-data; name="{_quote_header_value(file_field)}"; ' + f'filename="{_quote_header_value(file_name)}"\r\n' + f"Content-Type: {file_content_type}\r\n\r\n" + ).encode() + ) + body.extend(file_bytes) + body.extend(b"\r\n") + body.extend(f"--{boundary}--\r\n".encode("ascii")) + + response = _http_bytes( + "POST", + base_url, + path, + token, + body=bytes(body), + headers={ + "Content-Type": f"multipart/form-data; boundary={boundary}", + "Accept": "application/json", + }, + ) + decoded = json.loads(response) + if not isinstance(decoded, dict): + raise click.ClickException(f"expected JSON object from {path}") + return decoded + + +def _http_bytes( + method: str, + base_url: str, + path: str, + token: str, + *, + body: bytes | None = None, + headers: dict[str, str] | None = None, +) -> bytes: + url = f"{base_url.rstrip('/')}/{path.lstrip('/')}" + effective_headers = dict(headers or {}) + effective_headers["Authorization"] = f"Bearer {token}" + request = urllib.request.Request( + url, + data=body, + headers=effective_headers, + method=method, + ) + try: + with urllib.request.urlopen(request, timeout=60) as response: + data = response.read() + if isinstance(data, bytes): + return data + return bytes(data) + except urllib.error.HTTPError as exc: + detail = exc.read().decode("utf-8", errors="replace") + raise click.ClickException(f"HTTP {exc.code} from {path}: {detail}") from exc + except urllib.error.URLError as exc: + raise click.ClickException(f"could not reach {url}: {exc.reason}") from exc + + +def _quote_header_value(value: str) -> str: + return value.replace("\\", "\\\\").replace('"', '\\"') + + if __name__ == "__main__": # pragma: no cover app() diff --git a/src/artifactstore/config.py b/src/artifactstore/config.py index a209301..bca4d43 100644 --- a/src/artifactstore/config.py +++ b/src/artifactstore/config.py @@ -23,6 +23,21 @@ class Settings(BaseSettings): database_url: str = "sqlite+aiosqlite:///./var/artifactstore.db" storage_local_root: str = "./var/storage" log_level: str = "INFO" + auth_tokens: str = "" + anon_read: bool = False + api_url: str = "http://127.0.0.1:8000" + api_token: str = "" + retention_config_path: str = "" + retention_sweep_interval_seconds: int = 3600 + + @property + def bearer_tokens(self) -> frozenset[str]: + """Configured shared-secret bearer tokens, parsed from CSV / newline text.""" + return frozenset( + token.strip() + for token in self.auth_tokens.replace("\n", ",").split(",") + if token.strip() + ) def get_settings() -> Settings: diff --git a/src/artifactstore/events/views.py b/src/artifactstore/events/views.py index 31ca335..afecaac 100644 --- a/src/artifactstore/events/views.py +++ b/src/artifactstore/events/views.py @@ -19,6 +19,7 @@ workplans without changing this module's public surface. from __future__ import annotations +from datetime import UTC, datetime from uuid import UUID import cbor2 @@ -146,8 +147,125 @@ async def _apply_package_finalized(connection: AsyncConnection, event: Event) -> ) +async def _apply_retention_default_applied( + connection: AsyncConnection, + event: Event, +) -> None: + if event.subject_id is None: + raise ValueError("v1.retention.default_applied event must have subject_id") + payload = cbor2.loads(event.payload) + expires_at = _parse_iso(payload["expires_at"]) + await connection.execute( + update(artifact_packages) + .where(artifact_packages.c.id == event.subject_id) + .values( + expires_at=expires_at, + last_event_sequence=event.sequence, + ) + ) + await connection.execute( + update(retention_state) + .where(retention_state.c.package_id == event.subject_id) + .values( + current_expires_at=expires_at, + effective_class=payload["retention_class"], + active_hold_id=None, + eligible_for_deletion=bool(payload.get("eligible_for_deletion", False)), + ) + ) + + +async def _apply_retention_extended(connection: AsyncConnection, event: Event) -> None: + if event.subject_id is None: + raise ValueError("v1.retention.extended event must have subject_id") + payload = cbor2.loads(event.payload) + expires_at = _parse_iso(payload["new_expires_at"]) + await connection.execute( + update(artifact_packages) + .where(artifact_packages.c.id == event.subject_id) + .values( + expires_at=expires_at, + last_event_sequence=event.sequence, + ) + ) + await connection.execute( + update(retention_state) + .where(retention_state.c.package_id == event.subject_id) + .values( + current_expires_at=expires_at, + eligible_for_deletion=False, + ) + ) + + +async def _apply_retention_hold_applied(connection: AsyncConnection, event: Event) -> None: + if event.subject_id is None: + raise ValueError("v1.retention.hold_applied event must have subject_id") + payload = cbor2.loads(event.payload) + await connection.execute( + update(retention_state) + .where(retention_state.c.package_id == event.subject_id) + .values( + active_hold_id=UUID(payload["hold_id"]), + eligible_for_deletion=False, + ) + ) + await connection.execute( + update(artifact_packages) + .where(artifact_packages.c.id == event.subject_id) + .values(last_event_sequence=event.sequence) + ) + + +async def _apply_retention_hold_released(connection: AsyncConnection, event: Event) -> None: + if event.subject_id is None: + raise ValueError("v1.retention.hold_released event must have subject_id") + await connection.execute( + update(retention_state) + .where(retention_state.c.package_id == event.subject_id) + .values(active_hold_id=None) + ) + await connection.execute( + update(artifact_packages) + .where(artifact_packages.c.id == event.subject_id) + .values(last_event_sequence=event.sequence) + ) + + +async def _apply_retention_deletion_eligible( + connection: AsyncConnection, + event: Event, +) -> None: + if event.subject_id is None: + raise ValueError("v1.retention.deletion_eligible event must have subject_id") + await connection.execute( + update(retention_state) + .where(retention_state.c.package_id == event.subject_id) + .values(eligible_for_deletion=True) + ) + await connection.execute( + update(artifact_packages) + .where(artifact_packages.c.id == event.subject_id) + .values(last_event_sequence=event.sequence) + ) + + +def _parse_iso(value: str | None) -> datetime | None: + if value is None: + return None + parsed = datetime.fromisoformat(value) + if parsed.tzinfo is None: + return parsed.replace(tzinfo=UTC) + return parsed.astimezone(UTC) + + _HANDLERS = { "v1.package.created": _apply_package_created, "v1.file.ingested": _apply_file_ingested, "v1.package.finalized": _apply_package_finalized, + "v1.retention.default_applied": _apply_retention_default_applied, + "v1.retention.extended": _apply_retention_extended, + "v1.retention.hold_applied": _apply_retention_hold_applied, + "v1.retention.hold_released": _apply_retention_hold_released, + "v1.retention.deletion_eligible": _apply_retention_deletion_eligible, } diff --git a/src/artifactstore/registry/__init__.py b/src/artifactstore/registry/__init__.py index 9d04ef4..f3c359d 100644 --- a/src/artifactstore/registry/__init__.py +++ b/src/artifactstore/registry/__init__.py @@ -20,6 +20,7 @@ from __future__ import annotations import uuid from collections.abc import AsyncIterator, Sequence +from dataclasses import dataclass from datetime import UTC, datetime from typing import Any from uuid import UUID @@ -39,7 +40,7 @@ from artifactstore.db.schema import ( from artifactstore.db.schema import ( events as events_t, ) -from artifactstore.events import RegistryViewWriter, make_event, tail, write +from artifactstore.events import RegistryViewWriter, fetch_since, make_event, tail, write from artifactstore.events.model import Event from artifactstore.identity import ContentAddress from artifactstore.manifest import ( @@ -61,9 +62,21 @@ from artifactstore.manifest import ( ) from artifactstore.manifest.codec import decode as manifest_decode from artifactstore.manifest.projection import jcs_projection +from artifactstore.retention import RetentionPolicy from artifactstore.storage.spi import BackendStatus -__all__ = ["Registry"] +__all__ = [ + "DuplicateRelativePathError", + "FileNotFoundError", + "FileRecord", + "IllegalPackageStateError", + "PackageNotFoundError", + "PackageRecord", + "Registry", + "RetentionClassRecord", + "RetentionStateError", + "RetentionStateRecord", +] class PackageNotFoundError(KeyError): @@ -82,6 +95,78 @@ class DuplicateRelativePathError(ValueError): """Raised when ingest_file would create a duplicate ``relative_path`` in a package.""" +class RetentionStateError(ValueError): + """Raised when a retention lifecycle operation is invalid.""" + + +@dataclass(frozen=True, slots=True) +class PackageRecord: + """Materialised package row projected into the registry API.""" + + id: UUID + name: str + producer: str + subject: str + retention_class: str + metadata: dict[str, Any] + status: str + manifest_digest_hex: str | None + created_at: datetime | None + finalized_at: datetime | None + expires_at: datetime | None + last_event_sequence: int + metadata_schema_id: UUID | None = None + + +@dataclass(frozen=True, slots=True) +class FileRecord: + """Materialised file row plus its primary storage location.""" + + id: UUID + package_id: UUID + relative_path: str + media_type: str + size_bytes: int + digest_algorithm: str + digest_primary_hex: str + digest_sha256_hex: str + created_at: datetime | None + backend_id: str + content_address: str + object_key: str + retrieval_tier: str + storage_status: str + + +@dataclass(frozen=True, slots=True) +class RetentionClassRecord: + """Configured retention-class row.""" + + class_id: str + default_duration_seconds: int | None + deletion_strategy: str + + +@dataclass(frozen=True, slots=True) +class RetentionStateRecord: + """Materialised retention state for one package.""" + + package_id: UUID + current_expires_at: datetime | None + effective_class: str + active_hold_id: UUID | None + eligible_for_deletion: bool + + +_RETENTION_EVENT_TYPES = ( + "v1.retention.default_applied", + "v1.retention.extended", + "v1.retention.hold_applied", + "v1.retention.hold_released", + "v1.retention.deletion_eligible", +) + + class Registry: """Library-shaped orchestrator over events, dataplane, and views.""" @@ -90,10 +175,12 @@ class Registry: engine: AsyncEngine, dataplane: DataPlane, view_writer: RegistryViewWriter | None = None, + retention_policy: RetentionPolicy | None = None, ) -> None: self._engine = engine self._dataplane = dataplane self._view_writer = view_writer or RegistryViewWriter() + self._retention_policy = retention_policy or RetentionPolicy() # ---- mutating operations ------------------------------------------------ @@ -108,7 +195,7 @@ class Registry: metadata: dict[str, Any] | None = None, ) -> UUID: """Create a new package; returns its ``UUID``.""" - await self._validate_retention_class(retention_class) + retention_class_row = await self._get_retention_class(retention_class) package_id = uuid.uuid4() payload = cbor2.dumps( { @@ -130,6 +217,31 @@ class Registry: async with self._engine.begin() as conn: written = await write(conn, event) await self._view_writer.apply(conn, written) + if written.created_at is None: + raise RuntimeError("created package event missing created_at") + decision = self._retention_policy.default_for( + retention_class=retention_class, + class_default_seconds=retention_class_row.default_duration_seconds, + base_time=_ensure_aware(written.created_at), + ) + default_payload = cbor2.dumps( + { + "retention_class": decision.retention_class, + "default_duration_seconds": decision.default_duration_seconds, + "expires_at": _iso(decision.expires_at), + "eligible_for_deletion": decision.eligible_for_deletion, + }, + canonical=True, + ) + default_event = make_event( + event_type="v1.retention.default_applied", + subject_kind="retention", + subject_id=package_id, + actor=actor, + payload=default_payload, + ) + written_default = await write(conn, default_event) + await self._view_writer.apply(conn, written_default) return package_id async def ingest_file( @@ -265,6 +377,320 @@ class Registry: # ---- read-only operations ---------------------------------------------- + async def list_packages( + self, + *, + producer: str | None = None, + subject: str | None = None, + retention_class: str | None = None, + metadata_key: str | None = None, + metadata_value: str | None = None, + ) -> list[PackageRecord]: + """List package materialised views, optionally filtered for producer + workflows. Metadata key filtering is performed after the portable SQL + filters so the same behavior works on SQLite and PostgreSQL.""" + stmt = select(artifact_packages).order_by( + artifact_packages.c.created_at, + artifact_packages.c.id, + ) + if producer is not None: + stmt = stmt.where(artifact_packages.c.producer == producer) + if subject is not None: + stmt = stmt.where(artifact_packages.c.subject == subject) + if retention_class is not None: + stmt = stmt.where(artifact_packages.c.retention_class == retention_class) + async with self._engine.connect() as conn: + rows = (await conn.execute(stmt)).all() + + records = [_package_record_from_row(r) for r in rows] + if metadata_key is None: + return records + filtered: list[PackageRecord] = [] + for record in records: + if metadata_key not in record.metadata: + continue + if metadata_value is not None and str(record.metadata[metadata_key]) != metadata_value: + continue + filtered.append(record) + return filtered + + async def get_package(self, package_id: UUID) -> PackageRecord: + """Return one package materialised view.""" + async with self._engine.connect() as conn: + row = ( + await conn.execute( + select(artifact_packages).where(artifact_packages.c.id == package_id) + ) + ).first() + if row is None: + raise PackageNotFoundError(f"package not found: {package_id}") + return _package_record_from_row(row) + + async def get_file_metadata(self, file_id: UUID) -> FileRecord: + """Return file metadata plus the primary storage location.""" + stmt = ( + select( + artifact_files.c.id.label("file_id"), + artifact_files.c.package_id, + artifact_files.c.relative_path, + artifact_files.c.media_type, + artifact_files.c.size_bytes, + artifact_files.c.digest_algorithm, + artifact_files.c.digest_primary, + artifact_files.c.digest_sha256, + artifact_files.c.created_at, + storage_locations.c.backend_id, + storage_locations.c.content_address, + storage_locations.c.object_key, + storage_locations.c.retrieval_tier, + storage_locations.c.status.label("storage_status"), + ) + .join( + storage_locations, + storage_locations.c.artifact_file_id == artifact_files.c.id, + ) + .where(artifact_files.c.id == file_id) + .limit(1) + ) + async with self._engine.connect() as conn: + row = (await conn.execute(stmt)).first() + if row is None: + raise FileNotFoundError(f"file not found: {file_id}") + return FileRecord( + id=row.file_id, + package_id=row.package_id, + relative_path=row.relative_path, + media_type=row.media_type, + size_bytes=row.size_bytes, + digest_algorithm=row.digest_algorithm, + digest_primary_hex=row.digest_primary.hex(), + digest_sha256_hex=row.digest_sha256.hex(), + created_at=row.created_at, + backend_id=row.backend_id, + content_address=row.content_address, + object_key=row.object_key, + retrieval_tier=row.retrieval_tier, + storage_status=row.storage_status, + ) + + async def list_retention_classes(self) -> list[RetentionClassRecord]: + """Return configured retention classes.""" + async with self._engine.connect() as conn: + rows = ( + await conn.execute( + select(retention_classes).order_by(retention_classes.c.class_id) + ) + ).all() + return [ + RetentionClassRecord( + class_id=r.class_id, + default_duration_seconds=r.default_duration_seconds, + deletion_strategy=r.deletion_strategy, + ) + for r in rows + ] + + async def get_retention_state(self, package_id: UUID) -> RetentionStateRecord: + """Return the retention materialised view for one package.""" + async with self._engine.connect() as conn: + row = ( + await conn.execute( + select(retention_state).where(retention_state.c.package_id == package_id) + ) + ).first() + if row is None: + raise PackageNotFoundError(f"package not found: {package_id}") + return _retention_state_record_from_row(row) + + async def extend_retention( + self, + package_id: UUID, + *, + new_expires_at: datetime, + reason: str, + actor: str, + ) -> RetentionStateRecord: + """Extend package retention to a strictly later expiry.""" + clean_reason = _require_reason(reason) + target_expires_at = _ensure_aware(new_expires_at) + current = await self.get_retention_state(package_id) + if current.current_expires_at is None: + raise RetentionStateError( + f"package {package_id} has no current expiry to extend" + ) + current_expires_at = _ensure_aware(current.current_expires_at) + if target_expires_at <= current_expires_at: + raise RetentionStateError("new_expires_at must be strictly later than current expiry") + + payload = cbor2.dumps( + { + "previous_expires_at": _iso(current_expires_at), + "new_expires_at": _iso(target_expires_at), + "reason": clean_reason, + }, + canonical=True, + ) + event = make_event( + event_type="v1.retention.extended", + subject_kind="retention", + subject_id=package_id, + actor=actor, + payload=payload, + ) + async with self._engine.begin() as conn: + written = await write(conn, event) + await self._view_writer.apply(conn, written) + return await self.get_retention_state(package_id) + + async def apply_retention_hold( + self, + package_id: UUID, + *, + reason: str, + actor: str, + ) -> UUID: + """Apply one active hold to a package and return the hold id.""" + clean_reason = _require_reason(reason) + current = await self.get_retention_state(package_id) + if current.active_hold_id is not None: + raise RetentionStateError( + f"package {package_id} already has active hold {current.active_hold_id}" + ) + hold_id = uuid.uuid4() + payload = cbor2.dumps( + { + "hold_id": str(hold_id), + "reason": clean_reason, + }, + canonical=True, + ) + event = make_event( + event_type="v1.retention.hold_applied", + subject_kind="retention", + subject_id=package_id, + actor=actor, + payload=payload, + ) + async with self._engine.begin() as conn: + written = await write(conn, event) + await self._view_writer.apply(conn, written) + return hold_id + + async def release_retention_hold( + self, + package_id: UUID, + hold_id: UUID, + *, + reason: str, + actor: str, + now: datetime | None = None, + ) -> RetentionStateRecord: + """Release the active hold, emitting eligibility if the package is expired.""" + clean_reason = _require_reason(reason) + current = await self.get_retention_state(package_id) + if current.active_hold_id != hold_id: + raise RetentionStateError(f"hold {hold_id} is not active on package {package_id}") + + payload = cbor2.dumps( + { + "hold_id": str(hold_id), + "reason": clean_reason, + }, + canonical=True, + ) + release_event = make_event( + event_type="v1.retention.hold_released", + subject_kind="retention", + subject_id=package_id, + actor=actor, + payload=payload, + ) + effective_now = _ensure_aware(now or datetime.now(UTC)) + expired_after_release = ( + current.current_expires_at is not None + and _ensure_aware(current.current_expires_at) <= effective_now + and not current.eligible_for_deletion + ) + async with self._engine.begin() as conn: + written_release = await write(conn, release_event) + await self._view_writer.apply(conn, written_release) + if expired_after_release: + eligible_event = make_event( + event_type="v1.retention.deletion_eligible", + subject_kind="retention", + subject_id=package_id, + actor=actor, + payload=_deletion_eligible_payload( + expires_at=current.current_expires_at, + reason="hold released after expiry", + ), + ) + written_eligible = await write(conn, eligible_event) + await self._view_writer.apply(conn, written_eligible) + return await self.get_retention_state(package_id) + + async def sweep_deletion_eligibility( + self, + *, + now: datetime | None = None, + actor: str = "retention-sweeper", + ) -> list[UUID]: + """Mark expired, unheld packages as eligible for deletion.""" + effective_now = _ensure_aware(now or datetime.now(UTC)) + marked: list[UUID] = [] + async with self._engine.begin() as conn: + rows = (await conn.execute(select(retention_state))).all() + for row in rows: + record = _retention_state_record_from_row(row) + if record.eligible_for_deletion or record.active_hold_id is not None: + continue + if record.current_expires_at is None: + continue + if _ensure_aware(record.current_expires_at) > effective_now: + continue + event = make_event( + event_type="v1.retention.deletion_eligible", + subject_kind="retention", + subject_id=record.package_id, + actor=actor, + payload=_deletion_eligible_payload( + expires_at=record.current_expires_at, + reason="retention expiry reached", + ), + ) + written = await write(conn, event) + await self._view_writer.apply(conn, written) + marked.append(record.package_id) + return marked + + async def retention_history(self, package_id: UUID) -> list[Event]: + """Return all retention events for a package, ordered by sequence.""" + await self.get_package(package_id) + async with self._engine.connect() as conn: + rows = ( + await conn.execute( + select(events_t) + .where( + events_t.c.subject_id == package_id, + events_t.c.event_type.in_(_RETENTION_EVENT_TYPES), + ) + .order_by(events_t.c.sequence) + ) + ).all() + return [ + Event( + event_type=r.event_type, + subject_kind=r.subject_kind, + subject_id=r.subject_id, + actor=r.actor, + payload=r.payload, + payload_digest=r.payload_digest, + sequence=r.sequence, + created_at=r.created_at, + ) + for r in rows + ] + async def get_manifest_bytes(self, package_id: UUID, *, format: str = "cbor") -> bytes: """Return the finalised manifest. ``format`` is ``cbor`` (canonical CBOR, the wire form) or ``json`` (the JCS projection).""" @@ -289,20 +715,27 @@ class Registry: return jcs_projection(manifest_decode(payload_bytes)) raise ValueError(f"unknown manifest format: {format!r} (expected 'cbor' or 'json')") - async def get_file(self, file_id: UUID) -> AsyncIterator[bytes]: + async def get_file( + self, + file_id: UUID, + *, + byte_range: tuple[int, int] | None = None, + ) -> AsyncIterator[bytes]: """Return an async byte iterator for the bytes of a stored file.""" + record = await self.get_file_metadata(file_id) + ca = ContentAddress(record.content_address) + return await self._dataplane.serve_object(ca, byte_range=byte_range) + + async def fetch_events( + self, + *, + since_sequence: int = 0, + limit: int = 100, + ) -> list[Event]: + """Fetch one ordered batch of events with sequence greater than + ``since_sequence``.""" async with self._engine.connect() as conn: - row = ( - await conn.execute( - select(storage_locations) - .where(storage_locations.c.artifact_file_id == file_id) - .limit(1) - ) - ).first() - if row is None: - raise FileNotFoundError(f"file not found: {file_id}") - ca = ContentAddress(row.content_address) - return await self._dataplane.serve_object(ca) + return await fetch_since(conn, since_sequence=since_sequence, limit=limit) def tail_events( self, @@ -342,25 +775,83 @@ class Registry: # ---- internals ---------------------------------------------------------- - async def _validate_retention_class(self, retention_class: str) -> None: + async def _get_retention_class(self, retention_class: str) -> RetentionClassRecord: async with self._engine.connect() as conn: row = ( await conn.execute( - select(retention_classes.c.class_id).where( - retention_classes.c.class_id == retention_class - ) + select(retention_classes).where(retention_classes.c.class_id == retention_class) ) ).first() if row is None: raise ValueError(f"unknown retention class: {retention_class!r}") + return RetentionClassRecord( + class_id=row.class_id, + default_duration_seconds=row.default_duration_seconds, + deletion_strategy=row.deletion_strategy, + ) def _iso(value: datetime | None) -> str | None: if value is None: return None + return _ensure_aware(value).isoformat() + + +def _ensure_aware(value: datetime) -> datetime: if value.tzinfo is None: - value = value.replace(tzinfo=UTC) - return value.isoformat() + return value.replace(tzinfo=UTC) + return value.astimezone(UTC) + + +def _parse_iso_datetime(value: str | None) -> datetime | None: + if value is None: + return None + return _ensure_aware(datetime.fromisoformat(value)) + + +def _require_reason(reason: str) -> str: + clean = reason.strip() + if not clean: + raise RetentionStateError("reason is required") + return clean + + +def _deletion_eligible_payload(*, expires_at: datetime | None, reason: str) -> bytes: + return cbor2.dumps( + { + "expires_at": _iso(expires_at), + "reason": reason, + }, + canonical=True, + ) + + +def _package_record_from_row(row: Any) -> PackageRecord: + return PackageRecord( + id=row.id, + name=row.name, + producer=row.producer, + subject=row.subject, + retention_class=row.retention_class, + metadata=dict(row.metadata) if row.metadata else {}, + status=row.status, + manifest_digest_hex=row.manifest_digest.hex() if row.manifest_digest else None, + created_at=row.created_at, + finalized_at=row.finalized_at, + expires_at=row.expires_at, + last_event_sequence=row.last_event_sequence, + metadata_schema_id=row.metadata_schema_id, + ) + + +def _retention_state_record_from_row(row: Any) -> RetentionStateRecord: + return RetentionStateRecord( + package_id=row.package_id, + current_expires_at=row.current_expires_at, + effective_class=row.effective_class, + active_hold_id=row.active_hold_id, + eligible_for_deletion=bool(row.eligible_for_deletion), + ) def _build_manifest( diff --git a/src/artifactstore/retention/__init__.py b/src/artifactstore/retention/__init__.py index 06b2abc..355723c 100644 --- a/src/artifactstore/retention/__init__.py +++ b/src/artifactstore/retention/__init__.py @@ -1,5 +1,85 @@ -"""Retention policy engine. +"""Retention policy engine.""" -Seed classes land in ARTIFACT-STORE-WP-0001-T002 (data model). Active policy -operations (extensions, holds, sweeper) land in workplan WP-0003. -""" +from __future__ import annotations + +import tomllib +from collections.abc import Mapping +from dataclasses import dataclass +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any + +__all__ = ["RetentionDecision", "RetentionPolicy"] + + +@dataclass(frozen=True, slots=True) +class RetentionDecision: + """Computed default retention outcome for a package.""" + + retention_class: str + default_duration_seconds: int | None + expires_at: datetime | None + eligible_for_deletion: bool = False + + +class RetentionPolicy: + """Applies operator-configured default retention durations. + + The database stores seed defaults for each retention class. Operators may + override those defaults through a TOML file whose shape is documented in + ``docs/OPERATOR.md``. + """ + + def __init__(self, overrides: Mapping[str, int | None] | None = None) -> None: + self._overrides = dict(overrides or {}) + + @classmethod + def from_toml(cls, path: str | Path | None) -> RetentionPolicy: + if path is None or str(path).strip() == "": + return cls() + config_path = Path(path) + with config_path.open("rb") as fh: + raw = tomllib.load(fh) + return cls(_parse_duration_overrides(raw)) + + def default_for( + self, + *, + retention_class: str, + class_default_seconds: int | None, + base_time: datetime, + ) -> RetentionDecision: + duration_seconds = self._overrides.get(retention_class, class_default_seconds) + expires_at = ( + None + if duration_seconds is None + else base_time + timedelta(seconds=duration_seconds) + ) + return RetentionDecision( + retention_class=retention_class, + default_duration_seconds=duration_seconds, + expires_at=expires_at, + eligible_for_deletion=False, + ) + + +def _parse_duration_overrides(raw: Mapping[str, Any]) -> dict[str, int | None]: + section = raw.get("retention_classes", {}) + if not isinstance(section, dict): + raise ValueError("retention_classes must be a TOML table") + overrides: dict[str, int | None] = {} + for class_id, value in section.items(): + duration = value.get("default_duration_seconds") if isinstance(value, dict) else value + if duration is None: + overrides[class_id] = None + continue + if not isinstance(duration, int): + raise ValueError( + f"retention_classes.{class_id}.default_duration_seconds must be an integer" + ) + if duration < 0: + raise ValueError( + f"retention_classes.{class_id}.default_duration_seconds must be non-negative" + ) + overrides[class_id] = duration + return overrides diff --git a/tests/integration/test_cli_commands.py b/tests/integration/test_cli_commands.py index ebf01d4..1922069 100644 --- a/tests/integration/test_cli_commands.py +++ b/tests/integration/test_cli_commands.py @@ -2,8 +2,10 @@ from __future__ import annotations +import asyncio import json from pathlib import Path +from typing import Any import pytest from sqlalchemy import create_engine, insert, inspect @@ -84,3 +86,169 @@ def test_cli_health_reports_ok( assert payload["status"] == "ok" assert payload["db"]["healthy"] is True assert payload["backend"]["healthy"] is True + + +def test_cli_push_uses_http_api( + runner: CliRunner, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + source = tmp_path / "source" + source.mkdir() + (source / "a.txt").write_text("alpha", encoding="utf-8") + calls: list[tuple[str, str, dict[str, Any]]] = [] + multipart_calls: list[tuple[str, dict[str, str], bytes]] = [] + + def fake_http_json( + method: str, + base_url: str, + path: str, + token: str, + payload: dict[str, Any], + ) -> dict[str, Any]: + calls.append((method, path, payload)) + assert base_url == "http://api.test" + assert token == "secret" + if path == "/packages": + return {"id": "pkg-1"} + if path == "/packages/pkg-1/finalize": + return {"manifest_digest": "blake3:abc"} + raise AssertionError(f"unexpected JSON request: {method} {path}") + + def fake_http_multipart( + base_url: str, + path: str, + token: str, + *, + fields: dict[str, str], + file_field: str, + file_name: str, + file_content_type: str, + file_bytes: bytes, + ) -> dict[str, Any]: + assert base_url == "http://api.test" + assert token == "secret" + assert path == "/packages/pkg-1/files" + assert file_field == "file" + assert file_name == "a.txt" + assert file_content_type == "text/plain" + multipart_calls.append((path, fields, file_bytes)) + return {"id": "file-1"} + + monkeypatch.setattr("artifactstore.cli._http_json", fake_http_json) + monkeypatch.setattr("artifactstore.cli._http_multipart", fake_http_multipart) + + result = runner.invoke( + cli_app, + [ + "push", + str(source), + "--producer", + "prod", + "--subject", + "sub", + "--api-url", + "http://api.test", + "--token", + "secret", + ], + ) + + assert result.exit_code == 0, result.output + assert json.loads(result.output) == { + "package_id": "pkg-1", + "manifest_digest": "blake3:abc", + "files": 1, + } + assert calls[0][1] == "/packages" + assert calls[1][1] == "/packages/pkg-1/finalize" + assert multipart_calls == [ + ( + "/packages/pkg-1/files", + {"relative_path": "a.txt", "media_type": "text/plain"}, + b"alpha", + ) + ] + + +def test_cli_manifest_fetches_json_projection( + runner: CliRunner, + monkeypatch: pytest.MonkeyPatch, +) -> None: + def fake_http_bytes( + method: str, + base_url: str, + path: str, + token: str, + *, + body: bytes | None = None, + headers: dict[str, str] | None = None, + ) -> bytes: + assert method == "GET" + assert base_url == "http://api.test" + assert path == "/packages/pkg-1/manifest.json" + assert token == "secret" + assert body is None + assert headers == {"Accept": "application/json"} + return b'{"manifest_version":1}' + + monkeypatch.setattr("artifactstore.cli._http_bytes", fake_http_bytes) + + result = runner.invoke( + cli_app, + [ + "manifest", + "pkg-1", + "--api-url", + "http://api.test", + "--token", + "secret", + ], + ) + + assert result.exit_code == 0, result.output + assert json.loads(result.output) == {"manifest_version": 1} + + +def test_cli_retention_sweep_marks_expired_package( + runner: CliRunner, + env_db: Path, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + sync_engine = create_engine(f"sqlite:///{env_db}", future=True) + metadata.create_all(sync_engine) + with sync_engine.begin() as conn: + conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS]) + sync_engine.dispose() + + retention_config = tmp_path / "retention.toml" + retention_config.write_text( + '[retention_classes.transient]\ndefault_duration_seconds = 0\n', + encoding="utf-8", + ) + monkeypatch.setenv("ARTIFACTSTORE_RETENTION_CONFIG_PATH", str(retention_config)) + + async def create_expired_package() -> str: + from artifactstore.app import build_registry + from artifactstore.config import get_settings + + registry = build_registry(get_settings()) + try: + package_id = await registry.create_package( + name="expired", + producer="tests", + subject="cli-sweep", + retention_class="transient", + actor="ops", + ) + finally: + await registry.dispose() + return str(package_id) + + package_id = asyncio.run(create_expired_package()) + result = runner.invoke(cli_app, ["retention", "sweep"]) + + assert result.exit_code == 0, result.output + payload = json.loads(result.output) + assert payload == {"marked_package_ids": [package_id], "marked_count": 1} diff --git a/tests/integration/test_http_api.py b/tests/integration/test_http_api.py new file mode 100644 index 0000000..6e387e1 --- /dev/null +++ b/tests/integration/test_http_api.py @@ -0,0 +1,240 @@ +"""HTTP API integration tests for ARTIFACT-STORE-WP-0002.""" + +from __future__ import annotations + +import tempfile +from pathlib import Path +from typing import Any + +import cbor2 +from fastapi.testclient import TestClient +from hypothesis import HealthCheck, given +from hypothesis import settings as hypothesis_settings +from hypothesis import strategies as st +from sqlalchemy import create_engine, insert + +from artifactstore.api.http import create_app +from artifactstore.config import Settings +from artifactstore.db.schema import metadata, retention_classes +from artifactstore.db.seed import RETENTION_CLASS_SEEDS +from artifactstore.identity import digest_bytes + +AUTH = {"Authorization": "Bearer test-token"} + + +def _settings(root: Path) -> Settings: + db_path = root / "http-api.db" + storage_root = root / "storage" + storage_root.mkdir(parents=True, exist_ok=True) + sync_engine = create_engine(f"sqlite:///{db_path}", future=True) + metadata.create_all(sync_engine) + with sync_engine.begin() as conn: + conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS]) + sync_engine.dispose() + return Settings( + database_url=f"sqlite+aiosqlite:///{db_path}", + storage_local_root=str(storage_root), + log_level="INFO", + auth_tokens="test-token", + ) + + +def _create_package(client: TestClient, *, name: str = "pkg") -> str: + resp = client.post( + "/packages", + headers=AUTH, + json={ + "name": name, + "producer": "guide-board", + "subject": "run-42", + "retention_class": "raw-evidence", + "metadata": {"run_id": "r-42", "kind": "integration"}, + }, + ) + assert resp.status_code == 201, resp.text + return str(resp.json()["id"]) + + +def _upload_file(client: TestClient, package_id: str, rel_path: str, data: bytes) -> dict[str, Any]: + resp = client.post( + f"/packages/{package_id}/files", + headers=AUTH, + data={"relative_path": rel_path, "media_type": "application/octet-stream"}, + files={"file": (Path(rel_path).name, data, "application/octet-stream")}, + ) + assert resp.status_code == 201, resp.text + return dict(resp.json()) + + +def test_http_surface_ingest_finalize_download_and_events(tmp_path: Path) -> None: + app = create_app(_settings(tmp_path)) + with TestClient(app) as client: + unauth = client.get("/packages") + assert unauth.status_code == 401 + assert unauth.headers["content-type"].startswith("application/problem+json") + + assert client.get("/openapi.json").status_code == 200 + assert client.get("/docs").status_code == 200 + assert client.get("/backends", headers=AUTH).json()["backends"][0]["backend_id"] == "local" + assert client.get("/retention-classes", headers=AUTH).json()["retention_classes"] + + package_id = _create_package(client) + listing = client.get( + "/packages", + headers=AUTH, + params={ + "producer": "guide-board", + "subject": "run-42", + "retention_class": "raw-evidence", + "metadata_key": "run_id", + "metadata_value": "r-42", + }, + ) + assert listing.status_code == 200 + assert [p["id"] for p in listing.json()["packages"]] == [package_id] + + data = b"hello artifact-store http api" * 64 + file_record = _upload_file(client, package_id, "reports/hello.bin", data) + assert file_record["size_bytes"] == len(data) + assert file_record["digest_primary_hex"] == digest_bytes(data).primary.hex + + finalized = client.post(f"/packages/{package_id}/finalize", headers=AUTH) + assert finalized.status_code == 200, finalized.text + assert finalized.json()["status"] == "finalized" + assert finalized.json()["manifest_digest"].startswith("blake3:") + + manifest_cbor = client.get( + f"/packages/{package_id}/manifest", + headers={**AUTH, "Accept": "application/cbor"}, + ) + assert manifest_cbor.status_code == 200 + manifest_payload = cbor2.loads(manifest_cbor.content) + assert manifest_payload["manifest_version"] == 1 + assert manifest_payload["package"]["id"] == package_id + + manifest_json = client.get(f"/packages/{package_id}/manifest.json", headers=AUTH) + assert manifest_json.status_code == 200 + assert manifest_json.json()["files"][0]["relative_path"] == "reports/hello.bin" + + file_id = file_record["id"] + metadata_resp = client.get(f"/files/{file_id}", headers=AUTH) + assert metadata_resp.status_code == 200 + content_address = metadata_resp.json()["content_address"] + + download = client.get(f"/files/{file_id}/download", headers=AUTH) + assert download.status_code == 200 + assert download.content == data + assert download.headers["etag"] == f'"{content_address}"' + + partial = client.get( + f"/files/{file_id}/download", + headers={**AUTH, "Range": "bytes=6-17"}, + ) + assert partial.status_code == 206 + assert partial.headers["content-range"] == f"bytes 6-17/{len(data)}" + assert partial.content == data[6:18] + + not_modified = client.get( + f"/files/{file_id}/download", + headers={**AUTH, "If-None-Match": f'"{content_address}"'}, + ) + assert not_modified.status_code == 304 + + events_json = client.get( + "/events", + headers={**AUTH, "Accept": "application/json"}, + params={"since": 0, "limit": 10, "wait_seconds": 0}, + ) + assert events_json.status_code == 200 + assert [e["event_type"] for e in events_json.json()["events"]] == [ + "v1.package.created", + "v1.retention.default_applied", + "v1.file.ingested", + "v1.package.finalized", + ] + + events_cbor = client.get( + "/events", + headers={**AUTH, "Accept": "application/cbor"}, + params={"since": 0, "limit": 10, "wait_seconds": 0}, + ) + assert events_cbor.status_code == 200 + assert cbor2.loads(events_cbor.content)["events"][0]["sequence"] == 1 + + +def test_http_scripted_50_file_package_flow(tmp_path: Path) -> None: + app = create_app(_settings(tmp_path)) + with TestClient(app) as client: + package_id = _create_package(client, name="fifty") + uploaded: list[tuple[str, bytes, dict[str, Any]]] = [] + for idx in range(50): + rel_path = f"bundle/file-{idx:02d}.bin" + payload = f"payload {idx:02d}:".encode() + bytes([idx]) * (idx + 1) + record = _upload_file(client, package_id, rel_path, payload) + uploaded.append((rel_path, payload, record)) + + finalized = client.post(f"/packages/{package_id}/finalize", headers=AUTH) + assert finalized.status_code == 200, finalized.text + + for rel_path, payload, record in uploaded: + assert record["relative_path"] == rel_path + assert record["digest_primary_hex"] == digest_bytes(payload).primary.hex + downloaded = client.get(f"/files/{record['id']}/download", headers=AUTH) + assert downloaded.status_code == 200 + assert downloaded.content == payload + + events = client.get( + "/events", + headers={**AUTH, "Accept": "application/json"}, + params={"since": 0, "limit": 100, "wait_seconds": 0}, + ) + assert events.status_code == 200 + assert len(events.json()["events"]) == 53 + assert events.json()["events"][-1]["event_type"] == "v1.package.finalized" + + +@given( + data=st.binary(min_size=1, max_size=512), + stem=st.text(alphabet=list("abcdefghijklmnopqrstuvwxyz0123456789_-"), min_size=1, max_size=24), +) +@hypothesis_settings( + max_examples=12, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], +) +def test_upload_session_lifecycle_property(data: bytes, stem: str) -> None: + with tempfile.TemporaryDirectory() as tmp: + app = create_app(_settings(Path(tmp))) + with TestClient(app) as client: + package_id = _create_package(client, name="upload-session") + opened = client.post( + "/uploads", + headers=AUTH, + json={"expected_size_bytes": len(data), "media_type": "application/octet-stream"}, + ) + assert opened.status_code == 201, opened.text + upload_url = opened.json()["content_upload_url"] + + patched = client.patch( + upload_url, + headers={**AUTH, "Content-Range": f"bytes 0-{len(data) - 1}/{len(data)}"}, + content=data, + ) + assert patched.status_code == 200, patched.text + assert patched.json()["received_bytes"] == len(data) + + completed = client.post( + f"{upload_url}/complete", + headers=AUTH, + json={ + "package_id": package_id, + "relative_path": f"uploads/{stem}.bin", + "media_type": "application/octet-stream", + }, + ) + assert completed.status_code == 201, completed.text + file_id = completed.json()["id"] + + downloaded = client.get(f"/files/{file_id}/download", headers=AUTH) + assert downloaded.status_code == 200 + assert downloaded.content == data diff --git a/tests/integration/test_registry.py b/tests/integration/test_registry.py index 4f23855..7420582 100644 --- a/tests/integration/test_registry.py +++ b/tests/integration/test_registry.py @@ -243,18 +243,19 @@ async def test_end_to_end_ingest_finalize_replay( stream = await registry.get_file(fid) assert await _consume(stream) == expected - # Tail events: 1 created + 3 ingested + 1 finalized = 5. + # Tail events: 1 created + 1 default retention + 3 ingested + 1 finalized = 6. collected = [] async def _consume_tail() -> None: async for evt in registry.tail_events(since_sequence=0, poll_interval_seconds=0.01): collected.append(evt) - if len(collected) >= 5: + if len(collected) >= 6: break await asyncio.wait_for(_consume_tail(), timeout=5.0) assert [e.event_type for e in collected] == [ "v1.package.created", + "v1.retention.default_applied", "v1.file.ingested", "v1.file.ingested", "v1.file.ingested", diff --git a/tests/integration/test_retention_lifecycle.py b/tests/integration/test_retention_lifecycle.py new file mode 100644 index 0000000..773f0a6 --- /dev/null +++ b/tests/integration/test_retention_lifecycle.py @@ -0,0 +1,250 @@ +"""Retention lifecycle integration tests for ARTIFACT-STORE-WP-0003.""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from datetime import datetime, timedelta +from pathlib import Path + +import cbor2 +import pytest +import pytest_asyncio +from fastapi.testclient import TestClient +from sqlalchemy import insert +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine + +from artifactstore.api.http import create_app +from artifactstore.config import Settings +from artifactstore.dataplane import InProcessDataPlane +from artifactstore.db.schema import metadata, retention_classes +from artifactstore.db.seed import RETENTION_CLASS_SEEDS +from artifactstore.events import RegistryViewWriter +from artifactstore.registry import Registry, RetentionStateError +from artifactstore.retention import RetentionPolicy +from artifactstore.storage import LocalBackend + +AUTH = {"Authorization": "Bearer test-token"} + + +@pytest_asyncio.fixture +async def engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]: + db_path = tmp_path / "retention.db" + eng = create_async_engine(f"sqlite+aiosqlite:///{db_path}") + async with eng.begin() as conn: + await conn.run_sync(metadata.create_all) + for seed in RETENTION_CLASS_SEEDS: + await conn.execute(insert(retention_classes).values(**seed)) + yield eng + await eng.dispose() + + +@pytest.fixture +def registry(engine: AsyncEngine, tmp_path: Path) -> Registry: + backend = LocalBackend(tmp_path / "store", backend_id="local") + dataplane = InProcessDataPlane(backend, tmp_dir=tmp_path / "dp-tmp") + return Registry( + engine, + dataplane, + RegistryViewWriter(), + RetentionPolicy({"transient": 0}), + ) + + +def _http_settings(tmp_path: Path, retention_config_path: Path | None = None) -> Settings: + db_path = tmp_path / "retention-http.db" + storage_root = tmp_path / "storage" + storage_root.mkdir(parents=True, exist_ok=True) + from sqlalchemy import create_engine + + sync_engine = create_engine(f"sqlite:///{db_path}", future=True) + metadata.create_all(sync_engine) + with sync_engine.begin() as conn: + conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS]) + sync_engine.dispose() + return Settings( + database_url=f"sqlite+aiosqlite:///{db_path}", + storage_local_root=str(storage_root), + log_level="INFO", + auth_tokens="test-token", + retention_config_path=str(retention_config_path or ""), + ) + + +def _create_package( + client: TestClient, + *, + retention_class: str = "raw-evidence", +) -> dict[str, object]: + resp = client.post( + "/packages", + headers=AUTH, + json={ + "name": "retention", + "producer": "tests", + "subject": "retention-subject", + "retention_class": retention_class, + "metadata": {}, + }, + ) + assert resp.status_code == 201, resp.text + return dict(resp.json()) + + +async def test_default_retention_and_permanent_record(registry: Registry) -> None: + transient_id = await registry.create_package( + name="short", + producer="tests", + subject="transient", + retention_class="transient", + actor="ops", + ) + transient_state = await registry.get_retention_state(transient_id) + assert transient_state.current_expires_at is not None + assert transient_state.eligible_for_deletion is False + + permanent_id = await registry.create_package( + name="forever", + producer="tests", + subject="permanent", + retention_class="permanent-record", + actor="ops", + ) + permanent_state = await registry.get_retention_state(permanent_id) + assert permanent_state.current_expires_at is None + assert permanent_state.eligible_for_deletion is False + + history = await registry.retention_history(transient_id) + assert [event.event_type for event in history] == ["v1.retention.default_applied"] + assert cbor2.loads(history[0].payload)["default_duration_seconds"] == 0 + + +async def test_retention_extension_requires_later_expiry(registry: Registry) -> None: + package_id = await registry.create_package( + name="extend", + producer="tests", + subject="extension", + retention_class="transient", + actor="ops", + ) + current = await registry.get_retention_state(package_id) + assert current.current_expires_at is not None + + with pytest.raises(RetentionStateError, match="strictly later"): + await registry.extend_retention( + package_id, + new_expires_at=current.current_expires_at, + reason="not later", + actor="ops", + ) + + new_expiry = current.current_expires_at + timedelta(days=1) + extended = await registry.extend_retention( + package_id, + new_expires_at=new_expiry, + reason="needed for quarterly review", + actor="ops", + ) + assert extended.current_expires_at == new_expiry + history = await registry.retention_history(package_id) + assert [event.event_type for event in history] == [ + "v1.retention.default_applied", + "v1.retention.extended", + ] + + +async def test_hold_release_and_sweeper_eligibility_transition(registry: Registry) -> None: + package_id = await registry.create_package( + name="held", + producer="tests", + subject="hold-release", + retention_class="transient", + actor="ops", + ) + initial = await registry.get_retention_state(package_id) + assert initial.current_expires_at is not None + after_expiry = initial.current_expires_at + timedelta(seconds=5) + + hold_id = await registry.apply_retention_hold( + package_id, + reason="quarterly hold", + actor="ops", + ) + held = await registry.get_retention_state(package_id) + assert held.active_hold_id == hold_id + + assert await registry.sweep_deletion_eligibility(now=after_expiry) == [] + still_held = await registry.get_retention_state(package_id) + assert still_held.eligible_for_deletion is False + + released = await registry.release_retention_hold( + package_id, + hold_id, + reason="hold complete", + actor="ops", + now=after_expiry, + ) + assert released.active_hold_id is None + assert released.eligible_for_deletion is True + + assert await registry.sweep_deletion_eligibility(now=after_expiry) == [] + history = await registry.retention_history(package_id) + assert [event.event_type for event in history] == [ + "v1.retention.default_applied", + "v1.retention.hold_applied", + "v1.retention.hold_released", + "v1.retention.deletion_eligible", + ] + + +def test_http_retention_controls_and_history_formats(tmp_path: Path) -> None: + app = create_app(_http_settings(tmp_path)) + with TestClient(app) as client: + package = _create_package(client) + package_id = str(package["id"]) + current_expires_at = datetime.fromisoformat(str(package["expires_at"])) + new_expiry = current_expires_at + timedelta(days=7) + + extended = client.post( + f"/packages/{package_id}/retention/extensions", + headers=AUTH, + json={ + "new_expires_at": new_expiry.isoformat(), + "reason": "retain for release signoff", + }, + ) + assert extended.status_code == 200, extended.text + assert extended.json()["current_expires_at"] == new_expiry.isoformat() + + hold = client.post( + f"/packages/{package_id}/retention/holds", + headers=AUTH, + json={"reason": "external audit"}, + ) + assert hold.status_code == 201, hold.text + hold_id = hold.json()["hold_id"] + + released = client.post( + f"/packages/{package_id}/retention/holds/{hold_id}/release", + headers=AUTH, + json={"reason": "audit complete"}, + ) + assert released.status_code == 200, released.text + assert released.json()["active_hold_id"] is None + + history_json = client.get(f"/packages/{package_id}/retention/history", headers=AUTH) + assert history_json.status_code == 200 + assert [event["event_type"] for event in history_json.json()["events"]] == [ + "v1.retention.default_applied", + "v1.retention.extended", + "v1.retention.hold_applied", + "v1.retention.hold_released", + ] + + history_cbor = client.get( + f"/packages/{package_id}/retention/history", + headers={**AUTH, "Accept": "application/cbor"}, + ) + assert history_cbor.status_code == 200 + assert cbor2.loads(history_cbor.content)["events"][1]["event_type"] == ( + "v1.retention.extended" + ) diff --git a/workplans/ARTIFACT-STORE-WP-0002-ingestion-api.md b/workplans/ARTIFACT-STORE-WP-0002-ingestion-api.md index 0f31279..e082031 100644 --- a/workplans/ARTIFACT-STORE-WP-0002-ingestion-api.md +++ b/workplans/ARTIFACT-STORE-WP-0002-ingestion-api.md @@ -4,13 +4,13 @@ type: workplan title: "Ingestion API And Manifest Surface" repo: artifact-store domain: stack -status: planned +status: done owner: codex topic_slug: stack planning_priority: high planning_order: 2 created: "2026-05-15" -updated: "2026-05-15" +updated: "2026-05-16" state_hub_workstream_id: "cedbfe03-363c-43fd-a5cb-bef52b29af7e" --- @@ -37,9 +37,9 @@ download files, and tail the event stream. ```task id: ARTIFACT-STORE-WP-0002-T001 -status: cancelled +status: done priority: high -state_hub_task_id: "e3879111-4be9-4731-8aea-15abb874f960" +state_hub_task_id: "197e22ff-0003-433d-bfa0-2323152b85dc" ``` Acceptance: @@ -58,7 +58,7 @@ Acceptance: ```task id: ARTIFACT-STORE-WP-0002-T002 -status: todo +status: done priority: high state_hub_task_id: "9c8c3853-2090-42be-9995-0b8ce4a76104" ``` @@ -78,7 +78,7 @@ Acceptance: ```task id: ARTIFACT-STORE-WP-0002-T003 -status: todo +status: done priority: medium state_hub_task_id: "710bbd2f-9bc1-4395-bbd1-2b22c1b7eb37" ``` @@ -99,7 +99,7 @@ Acceptance: ```task id: ARTIFACT-STORE-WP-0002-T004 -status: todo +status: done priority: medium state_hub_task_id: "d848bc41-edfa-48fc-bb2c-f2526f422c50" ``` @@ -117,7 +117,7 @@ Acceptance: ```task id: ARTIFACT-STORE-WP-0002-T005 -status: todo +status: done priority: medium state_hub_task_id: "27d33e90-6b31-4c1f-832b-870cd2c5fbe5" ``` @@ -134,7 +134,7 @@ Acceptance: ```task id: ARTIFACT-STORE-WP-0002-T006 -status: todo +status: done priority: high state_hub_task_id: "f422696f-a206-4030-be05-c342f94e9efd" ``` diff --git a/workplans/ARTIFACT-STORE-WP-0003-retention-lifecycle.md b/workplans/ARTIFACT-STORE-WP-0003-retention-lifecycle.md index 3cbf66d..b25111e 100644 --- a/workplans/ARTIFACT-STORE-WP-0003-retention-lifecycle.md +++ b/workplans/ARTIFACT-STORE-WP-0003-retention-lifecycle.md @@ -39,9 +39,9 @@ WP-0006). ```task id: ARTIFACT-STORE-WP-0003-T001 -status: cancelled +status: done priority: high -state_hub_task_id: "2d6cbd83-c348-45ad-a223-7870a3412225" +state_hub_task_id: "25531837-d2ff-4252-b0d0-31283597737f" ``` Acceptance: