From 864f7f203c0b606998266769d7d84bb2bd059786 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 16 May 2026 23:26:03 +0200 Subject: [PATCH] Add S3 backend and storage verification --- .env.example | 21 ++ docs/OPERATOR.md | 58 +++- pyproject.toml | 3 + src/artifactstore/api/http/__init__.py | 11 +- src/artifactstore/app.py | 69 ++++- src/artifactstore/cli/__init__.py | 59 +++- src/artifactstore/config.py | 89 ++++++ src/artifactstore/dataplane/inproc.py | 63 ++++- src/artifactstore/dataplane/spi.py | 15 +- src/artifactstore/events/views.py | 16 ++ src/artifactstore/registry/__init__.py | 122 +++++++- src/artifactstore/storage/__init__.py | 3 + src/artifactstore/storage/backends/s3.py | 261 ++++++++++++++++++ tests/integration/test_cli_commands.py | 58 +++- tests/unit/test_config_storage.py | 39 +++ tests/unit/test_dataplane_inproc.py | 18 ++ tests/unit/test_storage_s3.py | 196 +++++++++++++ ...ACT-STORE-WP-0004-s3-compatible-backend.md | 24 +- 18 files changed, 1085 insertions(+), 40 deletions(-) create mode 100644 src/artifactstore/storage/backends/s3.py create mode 100644 tests/unit/test_config_storage.py create mode 100644 tests/unit/test_storage_s3.py diff --git a/.env.example b/.env.example index 29f1309..6cd7f90 100644 --- a/.env.example +++ b/.env.example @@ -31,3 +31,24 @@ ARTIFACTSTORE_RETENTION_CONFIG_PATH= # Default interval for external schedulers that run `artifactstore retention sweep`. ARTIFACTSTORE_RETENTION_SWEEP_INTERVAL_SECONDS=3600 + +# Storage backend selection. `local` is always available; `s3` requires the +# optional S3 dependency and the S3 settings below. +ARTIFACTSTORE_STORAGE_BACKENDS=local +ARTIFACTSTORE_STORAGE_DEFAULT_BACKEND=local +# Format: producer:retention_class=backend_id, with * wildcards. +# Example: guide-board:release-evidence=s3,*:*=local +ARTIFACTSTORE_STORAGE_BACKEND_ROUTES= + +# S3-compatible backend settings for Ceph RGW / MinIO / AWS S3. +ARTIFACTSTORE_S3_ENDPOINT_URL= +ARTIFACTSTORE_S3_REGION=us-east-1 +ARTIFACTSTORE_S3_BUCKET= +ARTIFACTSTORE_S3_KEY_PREFIX=artifact-store +# Secret refs must be env:NAME or file:/mounted/path. +ARTIFACTSTORE_S3_ACCESS_KEY_REF= +ARTIFACTSTORE_S3_SECRET_KEY_REF= +ARTIFACTSTORE_S3_STORAGE_CLASS= +ARTIFACTSTORE_S3_SSE= +ARTIFACTSTORE_S3_MULTIPART_THRESHOLD_BYTES=67108864 +ARTIFACTSTORE_S3_MULTIPART_CHUNK_BYTES=8388608 diff --git a/docs/OPERATOR.md b/docs/OPERATOR.md index 729e4a3..73e451c 100644 --- a/docs/OPERATOR.md +++ b/docs/OPERATOR.md @@ -54,6 +54,19 @@ All settings are prefixed with ``ARTIFACTSTORE_`` and read by | `ARTIFACTSTORE_API_TOKEN` | empty | Default bearer token used by HTTP-backed CLI commands. | | `ARTIFACTSTORE_RETENTION_CONFIG_PATH` | empty | Optional TOML file overriding retention-class default durations. | | `ARTIFACTSTORE_RETENTION_SWEEP_INTERVAL_SECONDS` | `3600` | Default interval for external schedulers that invoke the retention sweeper. | +| `ARTIFACTSTORE_STORAGE_BACKENDS` | `local` | Comma-separated backend IDs to configure (`local`, `s3`). | +| `ARTIFACTSTORE_STORAGE_DEFAULT_BACKEND` | `local` | Backend used when no routing rule matches. | +| `ARTIFACTSTORE_STORAGE_BACKEND_ROUTES` | empty | Comma-separated `producer:retention_class=backend_id` rules; `*` is a wildcard. | +| `ARTIFACTSTORE_S3_ENDPOINT_URL` | empty | S3-compatible endpoint URL for Ceph RGW / MinIO / AWS S3. | +| `ARTIFACTSTORE_S3_REGION` | `us-east-1` | S3 signing region. | +| `ARTIFACTSTORE_S3_BUCKET` | empty | Bucket/container for artifact objects. | +| `ARTIFACTSTORE_S3_KEY_PREFIX` | empty | Optional object-key prefix before `/`. | +| `ARTIFACTSTORE_S3_ACCESS_KEY_REF` | empty | Access key reference, `env:NAME` or `file:/mounted/path`. | +| `ARTIFACTSTORE_S3_SECRET_KEY_REF` | empty | Secret key reference, `env:NAME` or `file:/mounted/path`. | +| `ARTIFACTSTORE_S3_STORAGE_CLASS` | empty | Optional storage class sent on writes. | +| `ARTIFACTSTORE_S3_SSE` | empty | Optional server-side encryption value, e.g. `AES256`. | +| `ARTIFACTSTORE_S3_MULTIPART_THRESHOLD_BYTES` | `67108864` | Multipart threshold for the S3 backend. | +| `ARTIFACTSTORE_S3_MULTIPART_CHUNK_BYTES` | `8388608` | Multipart part size for the S3 backend. | See [`.env.example`](../.env.example) for the canonical template. @@ -131,8 +144,48 @@ Objects are addressed by content (`blake3:`) and laid out as //// ``` -with atomic writes (tmpfile + fsync + rename). The S3-compatible backend -lands in WP-0004. +with atomic writes (tmpfile + fsync + rename). + +### S3-compatible backend + +The `s3` backend targets Ceph RGW first, with MinIO as the development +stand-in and AWS S3 as an interoperability check. Install the optional S3 +dependency before enabling it: + +```sh +uv sync --all-extras --extra s3 +``` + +Ceph RGW example: + +```sh +export ARTIFACTSTORE_STORAGE_BACKENDS=local,s3 +export ARTIFACTSTORE_STORAGE_DEFAULT_BACKEND=s3 +export ARTIFACTSTORE_STORAGE_BACKEND_ROUTES='guide-board:release-evidence=s3,*:*=local' +export ARTIFACTSTORE_S3_ENDPOINT_URL=https://rgw.example.internal +export ARTIFACTSTORE_S3_REGION=us-east-1 +export ARTIFACTSTORE_S3_BUCKET=artifact-store +export ARTIFACTSTORE_S3_KEY_PREFIX=prod/artifact-store +export ARTIFACTSTORE_S3_ACCESS_KEY_REF=env:ARTIFACTSTORE_RGW_ACCESS_KEY +export ARTIFACTSTORE_S3_SECRET_KEY_REF=file:/run/secrets/artifactstore-rgw-secret +export ARTIFACTSTORE_S3_STORAGE_CLASS=STANDARD +export ARTIFACTSTORE_S3_SSE=AES256 +``` + +Manual smoke against Ceph RGW: + +```sh +artifactstore health +artifactstore push ./fixtures/smoke \ + --producer guide-board \ + --subject rgw-smoke \ + --retention-class release-evidence +artifactstore storage verify --backend s3 +``` + +The verification command re-reads stored objects, recomputes the primary +digest, emits `v1.storage.location_verified`, and marks failed locations as +`failed`. A nonzero failed-location count degrades `/health`. ## CLI reference @@ -147,6 +200,7 @@ lands in WP-0004. | `artifactstore push ` | Push a directory through the HTTP API and finalize the package. | | `artifactstore manifest ` | Fetch the JSON manifest projection through the HTTP API. | | `artifactstore retention sweep` | Run one deletion-eligibility sweep against the configured DB. | +| `artifactstore storage verify --backend ` | Re-read stored objects for a backend and record verification events. | The CLI is a thin client over `artifactstore.registry.Registry` (see [ADR-0005](adr/0005-v1-tech-stack.md)). diff --git a/pyproject.toml b/pyproject.toml index 24fcafc..febb0e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,9 @@ dev = [ postgres = [ "psycopg[binary] >= 3.2", ] +s3 = [ + "aioboto3 >= 13.0", +] [project.scripts] artifactstore = "artifactstore.cli:app" diff --git a/src/artifactstore/api/http/__init__.py b/src/artifactstore/api/http/__init__.py index 351f474..da8f26a 100644 --- a/src/artifactstore/api/http/__init__.py +++ b/src/artifactstore/api/http/__init__.py @@ -176,7 +176,12 @@ def create_app(settings: Settings | None = None) -> FastAPI: async def health(registry: Registry = Depends(get_registry)) -> dict[str, Any]: db_ok, db_detail = await registry.db_health() backend_status = await registry.backend_health() - overall = "ok" if db_ok and backend_status.healthy else "degraded" + failed_storage_locations = await registry.failed_storage_locations_count() + overall = ( + "ok" + if db_ok and backend_status.healthy and failed_storage_locations == 0 + else "degraded" + ) return { "service": "artifact-store", "version": __version__, @@ -189,6 +194,7 @@ def create_app(settings: Settings | None = None) -> FastAPI: "free_bytes": backend_status.free_bytes, "total_bytes": backend_status.total_bytes, }, + "storage": {"failed_locations": failed_storage_locations}, } @application.get("/backends") @@ -196,7 +202,7 @@ def create_app(settings: Settings | None = None) -> FastAPI: _actor: str = Depends(require_read_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: - backend_status = await registry.backend_health() + statuses = await registry.backend_health_all() return { "backends": [ { @@ -206,6 +212,7 @@ def create_app(settings: Settings | None = None) -> FastAPI: "free_bytes": backend_status.free_bytes, "total_bytes": backend_status.total_bytes, } + for backend_status in statuses ] } diff --git a/src/artifactstore/app.py b/src/artifactstore/app.py index 6aeddb0..0ddbcfc 100644 --- a/src/artifactstore/app.py +++ b/src/artifactstore/app.py @@ -7,13 +7,15 @@ control-plane consumers stay thin (per ADR-0004). from __future__ import annotations -from artifactstore.config import Settings, get_settings +from collections.abc import Callable + +from artifactstore.config import Settings, get_settings, resolve_secret_ref from artifactstore.dataplane import InProcessDataPlane from artifactstore.db.engine import create_engine from artifactstore.events import RegistryViewWriter from artifactstore.registry import Registry from artifactstore.retention import RetentionPolicy -from artifactstore.storage import LocalBackend +from artifactstore.storage import LocalBackend, S3Backend, S3BackendConfig, StorageBackend __all__ = ["build_registry"] @@ -22,8 +24,65 @@ def build_registry(settings: Settings | None = None) -> Registry: """Wire engine, local FS backend, in-process data plane, and registry.""" effective = settings or get_settings() engine = create_engine(effective) - backend = LocalBackend(effective.storage_local_root, backend_id="local") - dataplane = InProcessDataPlane(backend) + backends = _build_backends(effective) + dataplane = InProcessDataPlane( + backends, + default_backend_id=effective.storage_default_backend, + ) view_writer = RegistryViewWriter() retention_policy = RetentionPolicy.from_toml(effective.retention_config_path) - return Registry(engine, dataplane, view_writer, retention_policy) + return Registry( + engine, + dataplane, + view_writer, + retention_policy, + backend_selector=_backend_selector(effective), + ) + + +def _build_backends(settings: Settings) -> dict[str, StorageBackend]: + configured = settings.configured_backend_ids or ("local",) + backends: dict[str, StorageBackend] = {} + if "local" in configured: + backends["local"] = LocalBackend(settings.storage_local_root, backend_id="local") + if "s3" in configured: + access_key = ( + resolve_secret_ref(settings.s3_access_key_ref) + if settings.s3_access_key_ref + else None + ) + secret_key = ( + resolve_secret_ref(settings.s3_secret_key_ref) + if settings.s3_secret_key_ref + else None + ) + backends["s3"] = S3Backend( + S3BackendConfig( + endpoint_url=settings.s3_endpoint_url, + region=settings.s3_region, + bucket=settings.s3_bucket, + key_prefix=settings.s3_key_prefix, + access_key_id=access_key, + secret_access_key=secret_key, + storage_class=settings.s3_storage_class or None, + sse=settings.s3_sse or None, + multipart_threshold_bytes=settings.s3_multipart_threshold_bytes, + multipart_chunk_bytes=settings.s3_multipart_chunk_bytes, + ) + ) + unknown = set(configured) - set(backends) + if unknown: + raise ValueError(f"unknown storage backend ids: {sorted(unknown)}") + return backends + + +def _backend_selector(settings: Settings) -> Callable[[str, str], str | None]: + routes = settings.backend_routes + + def select(producer: str, retention_class: str) -> str | None: + for route in routes: + if route.matches(producer=producer, retention_class=retention_class): + return route.backend_id + return settings.storage_default_backend + + return select diff --git a/src/artifactstore/cli/__init__.py b/src/artifactstore/cli/__init__.py index 41097f3..b1a33fe 100644 --- a/src/artifactstore/cli/__init__.py +++ b/src/artifactstore/cli/__init__.py @@ -34,7 +34,9 @@ app = typer.Typer( no_args_is_help=True, ) retention_app = typer.Typer(help="Retention lifecycle commands", no_args_is_help=True) +storage_app = typer.Typer(help="Storage backend commands", no_args_is_help=True) app.add_typer(retention_app, name="retention") +app.add_typer(storage_app, name="storage") @app.callback() @@ -182,6 +184,30 @@ def retention_sweep() -> None: typer.echo(json.dumps({"marked_package_ids": marked, "marked_count": len(marked)}, indent=2)) +@storage_app.command("verify") +def storage_verify( + backend: str | None = typer.Option( + None, + "--backend", + help="Backend id to verify; omit to verify every storage location.", + ), +) -> None: + """Re-read storage locations, recompute digests, and record verification events.""" + settings = get_settings() + results = asyncio.run(_storage_verify_async(settings, backend_id=backend)) + failed = [r for r in results if not r["verified"]] + typer.echo( + json.dumps( + { + "verified_count": len(results) - len(failed), + "failed_count": len(failed), + "results": results, + }, + indent=2, + ) + ) + + # ---- internals ------------------------------------------------------------- @@ -200,9 +226,14 @@ async def _health_async(settings: Settings) -> dict[str, Any]: try: db_ok, db_detail = await registry.db_health() backend_status = await registry.backend_health() + failed_storage_locations = await registry.failed_storage_locations_count() finally: await registry.dispose() - overall = "ok" if db_ok and backend_status.healthy else "degraded" + overall = ( + "ok" + if db_ok and backend_status.healthy and failed_storage_locations == 0 + else "degraded" + ) return { "service": "artifact-store", "version": __version__, @@ -215,6 +246,7 @@ async def _health_async(settings: Settings) -> dict[str, Any]: "free_bytes": backend_status.free_bytes, "total_bytes": backend_status.total_bytes, }, + "storage": {"failed_locations": failed_storage_locations}, } @@ -229,6 +261,31 @@ async def _retention_sweep_async(settings: Settings) -> list[str]: return [str(package_id) for package_id in marked] +async def _storage_verify_async( + settings: Settings, + *, + backend_id: str | None, +) -> list[dict[str, Any]]: + from artifactstore.app import build_registry + + registry: Registry = build_registry(settings) + try: + results = await registry.verify_storage_locations(backend_id=backend_id) + finally: + await registry.dispose() + return [ + { + "storage_location_id": str(result.storage_location_id), + "file_id": str(result.file_id), + "backend_id": result.backend_id, + "content_address": result.content_address, + "verified": result.verified, + "mismatch": result.mismatch, + } + for result in results + ] + + def _http_json( method: str, base_url: str, diff --git a/src/artifactstore/config.py b/src/artifactstore/config.py index bca4d43..43a72ee 100644 --- a/src/artifactstore/config.py +++ b/src/artifactstore/config.py @@ -7,8 +7,32 @@ local development; see ``.env.example``. from __future__ import annotations +from dataclasses import dataclass + from pydantic_settings import BaseSettings, SettingsConfigDict +__all__ = [ + "BackendRoute", + "Settings", + "get_settings", + "parse_backend_routes", + "resolve_secret_ref", +] + + +@dataclass(frozen=True, slots=True) +class BackendRoute: + """Route one producer/retention-class pair to a storage backend.""" + + producer: str + retention_class: str + backend_id: str + + def matches(self, *, producer: str, retention_class: str) -> bool: + producer_match = self.producer == "*" or self.producer == producer + retention_match = self.retention_class == "*" or self.retention_class == retention_class + return producer_match and retention_match + class Settings(BaseSettings): """Top-level service configuration.""" @@ -29,6 +53,19 @@ class Settings(BaseSettings): api_token: str = "" retention_config_path: str = "" retention_sweep_interval_seconds: int = 3600 + storage_backends: str = "local" + storage_default_backend: str = "local" + storage_backend_routes: str = "" + s3_endpoint_url: str = "" + s3_region: str = "us-east-1" + s3_bucket: str = "" + s3_key_prefix: str = "" + s3_access_key_ref: str = "" + s3_secret_key_ref: str = "" + s3_storage_class: str = "" + s3_sse: str = "" + s3_multipart_threshold_bytes: int = 64 * 1024 * 1024 + s3_multipart_chunk_bytes: int = 8 * 1024 * 1024 @property def bearer_tokens(self) -> frozenset[str]: @@ -39,6 +76,58 @@ class Settings(BaseSettings): if token.strip() ) + @property + def configured_backend_ids(self) -> tuple[str, ...]: + return tuple( + backend_id.strip() + for backend_id in self.storage_backends.split(",") + if backend_id.strip() + ) + + @property + def backend_routes(self) -> tuple[BackendRoute, ...]: + return parse_backend_routes(self.storage_backend_routes) + + +def parse_backend_routes(value: str) -> tuple[BackendRoute, ...]: + """Parse ``producer:retention_class=backend`` route entries.""" + routes: list[BackendRoute] = [] + for raw_entry in value.split(","): + entry = raw_entry.strip() + if not entry: + continue + selector, sep, backend_id = entry.partition("=") + if sep == "" or not backend_id.strip(): + raise ValueError(f"invalid storage backend route: {entry!r}") + producer, selector_sep, retention_class = selector.partition(":") + if selector_sep == "": + raise ValueError(f"invalid storage backend route selector: {selector!r}") + routes.append( + BackendRoute( + producer=producer.strip() or "*", + retention_class=retention_class.strip() or "*", + backend_id=backend_id.strip(), + ) + ) + return tuple(routes) + + +def resolve_secret_ref(ref: str) -> str: + """Resolve a secret reference from ``env:NAME`` or ``file:/path``.""" + import os + from pathlib import Path + + if ref.startswith("env:"): + name = ref.removeprefix("env:") + try: + return os.environ[name] + except KeyError as exc: + raise ValueError(f"environment variable {name!r} is not set") from exc + if ref.startswith("file:"): + path = Path(ref.removeprefix("file:")) + return path.read_text(encoding="utf-8").strip() + raise ValueError("secret references must use env:NAME or file:/path") + def get_settings() -> Settings: """Return a freshly-loaded :class:`Settings` instance.""" diff --git a/src/artifactstore/dataplane/inproc.py b/src/artifactstore/dataplane/inproc.py index 3546532..b567f2d 100644 --- a/src/artifactstore/dataplane/inproc.py +++ b/src/artifactstore/dataplane/inproc.py @@ -11,7 +11,7 @@ from __future__ import annotations import os import tempfile -from collections.abc import AsyncIterator +from collections.abc import AsyncIterator, Mapping from pathlib import Path from artifactstore.dataplane.spi import ( @@ -39,22 +39,36 @@ _DEFAULT_CHUNK_SIZE = 64 * 1024 class InProcessDataPlane: - """The v1 data plane: wraps one :class:`StorageBackend`.""" + """The v1 data plane: wraps one or more :class:`StorageBackend` instances.""" def __init__( self, - backend: StorageBackend, + backend: StorageBackend | Mapping[str, StorageBackend], *, + default_backend_id: str | None = None, tmp_dir: str | os.PathLike[str] | None = None, chunk_size: int = _DEFAULT_CHUNK_SIZE, ) -> None: - self._backend = backend + if isinstance(backend, Mapping): + self._backends = dict(backend) + if not self._backends: + raise ValueError("at least one storage backend is required") + self._default_backend_id = default_backend_id or sorted(self._backends)[0] + else: + self._backends = {backend.backend_id: backend} + self._default_backend_id = default_backend_id or backend.backend_id + if self._default_backend_id not in self._backends: + raise ValueError(f"unknown default backend: {self._default_backend_id!r}") self._tmp_dir = Path(tmp_dir) if tmp_dir is not None else None self._chunk_size = chunk_size @property def backend(self) -> StorageBackend: - return self._backend + return self._select_backend(None) + + @property + def backends(self) -> Mapping[str, StorageBackend]: + return dict(self._backends) async def ingest_stream( self, @@ -64,6 +78,7 @@ class InProcessDataPlane: ) -> IngestResult: effective_hints = hints or IngestHints() primary_name = effective_hints.primary_algorithm or PRIMARY_ALGORITHM + backend = self._select_backend(effective_hints.backend_id) primary_h = get_algorithm(primary_name)() sha_h = get_algorithm(INTEROP_ALGORITHM)() @@ -88,7 +103,7 @@ class InProcessDataPlane: sha_digest = Digest(algorithm=INTEROP_ALGORITHM, hex=sha_h.hexdigest()) content_address = primary_digest.content_address - receipt = await self._backend.put( + receipt = await backend.put( content_address, _file_chunks(tmp_path, self._chunk_size), size_hint=size, @@ -108,16 +123,23 @@ class InProcessDataPlane: content_address: ContentAddress, *, byte_range: tuple[int, int] | None = None, + backend_id: str | None = None, ) -> AsyncIterator[bytes]: - return await self._backend.get(content_address, byte_range=byte_range) + return await self._select_backend(backend_id).get(content_address, byte_range=byte_range) - async def verify_object(self, content_address: ContentAddress) -> VerifyResult: + async def verify_object( + self, + content_address: ContentAddress, + *, + backend_id: str | None = None, + ) -> VerifyResult: expected = content_address.to_digest() primary_name = expected.algorithm - stream = await self._backend.get(content_address) + backend = self._select_backend(backend_id) + stream = await backend.get(content_address) pair = await digest_stream(stream, primary=primary_name) - head = await self._backend.head(content_address) + head = await backend.head(content_address) actual_size = head.size_bytes if pair.primary.hex == expected.hex: @@ -138,11 +160,26 @@ class InProcessDataPlane: mismatch=(f"primary digest mismatch: expected {expected.hex}, got {pair.primary.hex}"), ) - async def delete_object(self, content_address: ContentAddress) -> DeletionResult: - return await self._backend.delete(content_address) + async def delete_object( + self, + content_address: ContentAddress, + *, + backend_id: str | None = None, + ) -> DeletionResult: + return await self._select_backend(backend_id).delete(content_address) async def backend_health(self) -> BackendStatus: - return await self._backend.health() + return await self._select_backend(None).health() + + async def backend_health_all(self) -> list[BackendStatus]: + return [await backend.health() for backend in self._backends.values()] + + def _select_backend(self, backend_id: str | None) -> StorageBackend: + selected = backend_id or self._default_backend_id + try: + return self._backends[selected] + except KeyError as exc: + raise KeyError(f"unknown storage backend: {selected!r}") from exc async def _file_chunks(path: Path, chunk_size: int) -> AsyncIterator[bytes]: diff --git a/src/artifactstore/dataplane/spi.py b/src/artifactstore/dataplane/spi.py index a06675c..9001148 100644 --- a/src/artifactstore/dataplane/spi.py +++ b/src/artifactstore/dataplane/spi.py @@ -84,10 +84,21 @@ class DataPlane(Protocol): content_address: ContentAddress, *, byte_range: tuple[int, int] | None = None, + backend_id: str | None = None, ) -> AsyncIterator[bytes]: ... - async def verify_object(self, content_address: ContentAddress) -> VerifyResult: ... + async def verify_object( + self, + content_address: ContentAddress, + *, + backend_id: str | None = None, + ) -> VerifyResult: ... - async def delete_object(self, content_address: ContentAddress) -> DeletionResult: ... + async def delete_object( + self, + content_address: ContentAddress, + *, + backend_id: str | None = None, + ) -> DeletionResult: ... async def backend_health(self) -> BackendStatus: ... diff --git a/src/artifactstore/events/views.py b/src/artifactstore/events/views.py index afecaac..40c9511 100644 --- a/src/artifactstore/events/views.py +++ b/src/artifactstore/events/views.py @@ -250,6 +250,21 @@ async def _apply_retention_deletion_eligible( ) +async def _apply_storage_location_verified( + connection: AsyncConnection, + event: Event, +) -> None: + payload = cbor2.loads(event.payload) + await connection.execute( + update(storage_locations) + .where(storage_locations.c.id == UUID(payload["storage_location_id"])) + .values( + status="verified" if payload["verified"] else "failed", + last_verified_at=event.created_at, + ) + ) + + def _parse_iso(value: str | None) -> datetime | None: if value is None: return None @@ -268,4 +283,5 @@ _HANDLERS = { "v1.retention.hold_applied": _apply_retention_hold_applied, "v1.retention.hold_released": _apply_retention_hold_released, "v1.retention.deletion_eligible": _apply_retention_deletion_eligible, + "v1.storage.location_verified": _apply_storage_location_verified, } diff --git a/src/artifactstore/registry/__init__.py b/src/artifactstore/registry/__init__.py index f3c359d..83d8df3 100644 --- a/src/artifactstore/registry/__init__.py +++ b/src/artifactstore/registry/__init__.py @@ -19,17 +19,17 @@ source of truth (ADR-0002). from __future__ import annotations import uuid -from collections.abc import AsyncIterator, Sequence +from collections.abc import AsyncIterator, Awaitable, Callable, Sequence from dataclasses import dataclass from datetime import UTC, datetime -from typing import Any +from typing import Any, cast from uuid import UUID import cbor2 from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncEngine -from artifactstore.dataplane.spi import DataPlane +from artifactstore.dataplane.spi import DataPlane, IngestHints from artifactstore.db.schema import ( artifact_files, artifact_packages, @@ -76,6 +76,7 @@ __all__ = [ "RetentionClassRecord", "RetentionStateError", "RetentionStateRecord", + "StorageVerificationRecord", ] @@ -158,6 +159,18 @@ class RetentionStateRecord: eligible_for_deletion: bool +@dataclass(frozen=True, slots=True) +class StorageVerificationRecord: + """Result of verifying one storage location.""" + + storage_location_id: UUID + file_id: UUID + backend_id: str + content_address: str + verified: bool + mismatch: str | None + + _RETENTION_EVENT_TYPES = ( "v1.retention.default_applied", "v1.retention.extended", @@ -176,11 +189,13 @@ class Registry: dataplane: DataPlane, view_writer: RegistryViewWriter | None = None, retention_policy: RetentionPolicy | None = None, + backend_selector: Callable[[str, str], str | None] | None = None, ) -> None: self._engine = engine self._dataplane = dataplane self._view_writer = view_writer or RegistryViewWriter() self._retention_policy = retention_policy or RetentionPolicy() + self._backend_selector = backend_selector # ---- mutating operations ------------------------------------------------ @@ -279,7 +294,15 @@ class Registry: f"relative_path {relative_path!r} already exists in package {package_id}" ) - ingest = await self._dataplane.ingest_stream(stream) + selected_backend = ( + self._backend_selector(pkg_row.producer, pkg_row.retention_class) + if self._backend_selector is not None + else None + ) + ingest = await self._dataplane.ingest_stream( + stream, + hints=IngestHints(backend_id=selected_backend), + ) file_id = uuid.uuid4() storage_location_id = uuid.uuid4() @@ -691,6 +714,73 @@ class Registry: for r in rows ] + async def verify_storage_locations( + self, + *, + backend_id: str | None = None, + actor: str = "storage-verifier", + ) -> list[StorageVerificationRecord]: + """Re-read storage locations and emit verification events.""" + stmt = select(storage_locations) + if backend_id is not None: + stmt = stmt.where(storage_locations.c.backend_id == backend_id) + async with self._engine.connect() as conn: + rows = (await conn.execute(stmt.order_by(storage_locations.c.id))).all() + + results: list[StorageVerificationRecord] = [] + for row in rows: + ca = ContentAddress(row.content_address) + verified = False + mismatch: str | None = None + actual_size_bytes: int | None = None + actual_primary_hex: str | None = None + actual_sha256_hex: str | None = None + try: + result = await self._dataplane.verify_object(ca, backend_id=row.backend_id) + verified = result.verified + mismatch = result.mismatch + actual_size_bytes = result.actual_size_bytes + actual_primary_hex = result.actual_primary_digest.hex + actual_sha256_hex = result.actual_sha256_digest.hex + except Exception as exc: + mismatch = f"{type(exc).__name__}: {exc}" + + payload = cbor2.dumps( + { + "storage_location_id": str(row.id), + "file_id": str(row.artifact_file_id), + "backend_id": row.backend_id, + "content_address": row.content_address, + "verified": verified, + "mismatch": mismatch, + "actual_size_bytes": actual_size_bytes, + "actual_primary_hex": actual_primary_hex, + "actual_sha256_hex": actual_sha256_hex, + }, + canonical=True, + ) + event = make_event( + event_type="v1.storage.location_verified", + subject_kind="storage", + subject_id=row.artifact_file_id, + actor=actor, + payload=payload, + ) + async with self._engine.begin() as conn: + written = await write(conn, event) + await self._view_writer.apply(conn, written) + results.append( + StorageVerificationRecord( + storage_location_id=row.id, + file_id=row.artifact_file_id, + backend_id=row.backend_id, + content_address=row.content_address, + verified=verified, + mismatch=mismatch, + ) + ) + return results + async def get_manifest_bytes(self, package_id: UUID, *, format: str = "cbor") -> bytes: """Return the finalised manifest. ``format`` is ``cbor`` (canonical CBOR, the wire form) or ``json`` (the JCS projection).""" @@ -724,7 +814,11 @@ class Registry: """Return an async byte iterator for the bytes of a stored file.""" record = await self.get_file_metadata(file_id) ca = ContentAddress(record.content_address) - return await self._dataplane.serve_object(ca, byte_range=byte_range) + return await self._dataplane.serve_object( + ca, + byte_range=byte_range, + backend_id=record.backend_id, + ) async def fetch_events( self, @@ -769,6 +863,24 @@ class Registry: """Probe the configured storage backend through the data plane.""" return await self._dataplane.backend_health() + async def backend_health_all(self) -> list[BackendStatus]: + """Probe every configured storage backend when the data plane supports it.""" + probe_all = getattr(self._dataplane, "backend_health_all", None) + if probe_all is None: + return [await self.backend_health()] + typed_probe = cast(Callable[[], Awaitable[list[BackendStatus]]], probe_all) + return await typed_probe() + + async def failed_storage_locations_count(self) -> int: + """Count storage locations currently marked failed.""" + async with self._engine.connect() as conn: + rows = ( + await conn.execute( + select(storage_locations.c.id).where(storage_locations.c.status == "failed") + ) + ).all() + return len(rows) + async def dispose(self) -> None: """Release the engine's connection pool. Idempotent.""" await self._engine.dispose() diff --git a/src/artifactstore/storage/__init__.py b/src/artifactstore/storage/__init__.py index e9ad81e..191f771 100644 --- a/src/artifactstore/storage/__init__.py +++ b/src/artifactstore/storage/__init__.py @@ -6,6 +6,7 @@ adding adapters never touches the registry or API layers. """ from artifactstore.storage.backends.local import LocalBackend +from artifactstore.storage.backends.s3 import S3Backend, S3BackendConfig from artifactstore.storage.registry import ( clear as clear_backends, ) @@ -32,6 +33,8 @@ __all__ = [ "DeletionResult", "LocalBackend", "ObjectNotFoundError", + "S3Backend", + "S3BackendConfig", "StorageBackend", "StorageObjectMetadata", "StorageReceipt", diff --git a/src/artifactstore/storage/backends/s3.py b/src/artifactstore/storage/backends/s3.py new file mode 100644 index 0000000..87bf3d7 --- /dev/null +++ b/src/artifactstore/storage/backends/s3.py @@ -0,0 +1,261 @@ +"""S3-compatible storage backend (Ceph RGW / MinIO / AWS S3).""" + +from __future__ import annotations + +from collections.abc import AsyncIterator, Callable +from contextlib import AbstractAsyncContextManager +from dataclasses import dataclass +from typing import Any, cast + +from artifactstore.identity import ContentAddress +from artifactstore.storage.spi import ( + BackendStatus, + DeletionResult, + ObjectNotFoundError, + StorageObjectMetadata, + StorageReceipt, +) + +__all__ = ["S3Backend", "S3BackendConfig"] + +_DEFAULT_CHUNK_SIZE = 64 * 1024 + + +@dataclass(frozen=True, slots=True) +class S3BackendConfig: + endpoint_url: str + region: str + bucket: str + key_prefix: str = "" + access_key_id: str | None = None + secret_access_key: str | None = None + storage_class: str | None = None + sse: str | None = None + multipart_threshold_bytes: int = 64 * 1024 * 1024 + multipart_chunk_bytes: int = 8 * 1024 * 1024 + + +ClientFactory = Callable[[], AbstractAsyncContextManager[Any]] + + +class S3Backend: + """Storage SPI implementation over an S3-compatible object store.""" + + def __init__( + self, + config: S3BackendConfig, + *, + backend_id: str = "s3", + client_factory: ClientFactory | None = None, + chunk_size: int = _DEFAULT_CHUNK_SIZE, + ) -> None: + if not config.bucket: + raise ValueError("S3 bucket is required") + self._config = config + self._backend_id = backend_id + self._client_factory = client_factory + self._chunk_size = chunk_size + + @property + def backend_id(self) -> str: + return self._backend_id + + def _client(self) -> AbstractAsyncContextManager[Any]: + if self._client_factory is not None: + return self._client_factory() + try: + import aioboto3 # type: ignore[import-not-found] + except ModuleNotFoundError as exc: + raise RuntimeError( + "S3Backend requires the 'aioboto3' package; install artifactstore[s3]" + ) from exc + session = aioboto3.Session( + aws_access_key_id=self._config.access_key_id, + aws_secret_access_key=self._config.secret_access_key, + region_name=self._config.region, + ) + return cast( + AbstractAsyncContextManager[Any], + session.client("s3", endpoint_url=self._config.endpoint_url), + ) + + def _object_key(self, content_address: ContentAddress) -> str: + digest = content_address.to_digest() + if len(digest.hex) < 4: + raise ValueError(f"digest hex too short for sharded layout: {content_address}") + key = f"{digest.algorithm}/{digest.hex[0:2]}/{digest.hex[2:4]}/{digest.hex}" + prefix = self._config.key_prefix.strip("/") + return f"{prefix}/{key}" if prefix else key + + async def put( + self, + content_address: ContentAddress, + stream: AsyncIterator[bytes], + *, + size_hint: int | None = None, + ) -> StorageReceipt: + key = self._object_key(content_address) + if size_hint is not None and size_hint >= self._config.multipart_threshold_bytes: + return await self._put_multipart(content_address, key, stream, size_hint=size_hint) + + data = bytearray() + async for chunk in stream: + data.extend(chunk) + async with self._client() as client: + await client.put_object( + Bucket=self._config.bucket, + Key=key, + Body=bytes(data), + **self._put_options(), + ) + return StorageReceipt( + backend_id=self._backend_id, + content_address=content_address, + object_key=key, + size_bytes=len(data), + ) + + async def _put_multipart( + self, + content_address: ContentAddress, + key: str, + stream: AsyncIterator[bytes], + *, + size_hint: int, + ) -> StorageReceipt: + async with self._client() as client: + created = await client.create_multipart_upload( + Bucket=self._config.bucket, + Key=key, + **self._put_options(), + ) + upload_id = created["UploadId"] + parts: list[dict[str, Any]] = [] + part_number = 1 + buffered = bytearray() + try: + async for chunk in stream: + buffered.extend(chunk) + while len(buffered) >= self._config.multipart_chunk_bytes: + part = bytes(buffered[: self._config.multipart_chunk_bytes]) + del buffered[: self._config.multipart_chunk_bytes] + uploaded = await client.upload_part( + Bucket=self._config.bucket, + Key=key, + UploadId=upload_id, + PartNumber=part_number, + Body=part, + ) + parts.append({"ETag": uploaded["ETag"], "PartNumber": part_number}) + part_number += 1 + if buffered: + uploaded = await client.upload_part( + Bucket=self._config.bucket, + Key=key, + UploadId=upload_id, + PartNumber=part_number, + Body=bytes(buffered), + ) + parts.append({"ETag": uploaded["ETag"], "PartNumber": part_number}) + await client.complete_multipart_upload( + Bucket=self._config.bucket, + Key=key, + UploadId=upload_id, + MultipartUpload={"Parts": parts}, + ) + except Exception: + await client.abort_multipart_upload( + Bucket=self._config.bucket, + Key=key, + UploadId=upload_id, + ) + raise + return StorageReceipt( + backend_id=self._backend_id, + content_address=content_address, + object_key=key, + size_bytes=size_hint, + ) + + async def get( + self, + content_address: ContentAddress, + *, + byte_range: tuple[int, int] | None = None, + ) -> AsyncIterator[bytes]: + key = self._object_key(content_address) + + async def iterator() -> AsyncIterator[bytes]: + kwargs: dict[str, Any] = {"Bucket": self._config.bucket, "Key": key} + if byte_range is not None: + start, end = byte_range + kwargs["Range"] = f"bytes={start}-{end}" + async with self._client() as client: + try: + response = await client.get_object(**kwargs) + except Exception as exc: + if _is_not_found(exc): + raise ObjectNotFoundError(str(content_address)) from exc + raise + body = response["Body"] + while True: + chunk = await body.read(self._chunk_size) + if not chunk: + break + yield chunk + + return iterator() + + async def head(self, content_address: ContentAddress) -> StorageObjectMetadata: + key = self._object_key(content_address) + async with self._client() as client: + try: + response = await client.head_object(Bucket=self._config.bucket, Key=key) + except Exception as exc: + if _is_not_found(exc): + raise ObjectNotFoundError(str(content_address)) from exc + raise + return StorageObjectMetadata( + backend_id=self._backend_id, + content_address=content_address, + object_key=key, + size_bytes=int(response["ContentLength"]), + ) + + async def delete(self, content_address: ContentAddress) -> DeletionResult: + key = self._object_key(content_address) + async with self._client() as client: + await client.delete_object(Bucket=self._config.bucket, Key=key) + return DeletionResult( + backend_id=self._backend_id, + content_address=content_address, + deleted=True, + ) + + async def health(self) -> BackendStatus: + try: + async with self._client() as client: + await client.head_bucket(Bucket=self._config.bucket) + except Exception as exc: + return BackendStatus( + backend_id=self._backend_id, + healthy=False, + detail=f"{type(exc).__name__}: {exc}", + ) + return BackendStatus(backend_id=self._backend_id, healthy=True, detail="ok") + + def _put_options(self) -> dict[str, str]: + options: dict[str, str] = {} + if self._config.storage_class: + options["StorageClass"] = self._config.storage_class + if self._config.sse: + options["ServerSideEncryption"] = self._config.sse + return options + + +def _is_not_found(exc: Exception) -> bool: + response = getattr(exc, "response", None) + if isinstance(response, dict): + code = response.get("Error", {}).get("Code") + return str(code) in {"404", "NoSuchKey", "NotFound"} + return False diff --git a/tests/integration/test_cli_commands.py b/tests/integration/test_cli_commands.py index 1922069..bfce208 100644 --- a/tests/integration/test_cli_commands.py +++ b/tests/integration/test_cli_commands.py @@ -8,11 +8,11 @@ from pathlib import Path from typing import Any import pytest -from sqlalchemy import create_engine, insert, inspect +from sqlalchemy import create_engine, insert, inspect, select from typer.testing import CliRunner from artifactstore.cli import app as cli_app -from artifactstore.db.schema import metadata, retention_classes +from artifactstore.db.schema import metadata, retention_classes, storage_locations from artifactstore.db.seed import RETENTION_CLASS_SEEDS REPO_ROOT = Path(__file__).resolve().parents[2] @@ -252,3 +252,57 @@ def test_cli_retention_sweep_marks_expired_package( assert result.exit_code == 0, result.output payload = json.loads(result.output) assert payload == {"marked_package_ids": [package_id], "marked_count": 1} + + +def test_cli_storage_verify_marks_local_location_verified( + runner: CliRunner, + env_db: Path, +) -> None: + sync_engine = create_engine(f"sqlite:///{env_db}", future=True) + metadata.create_all(sync_engine) + with sync_engine.begin() as conn: + conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS]) + sync_engine.dispose() + + async def create_stored_file() -> None: + from collections.abc import AsyncIterator + + from artifactstore.app import build_registry + from artifactstore.config import get_settings + + async def stream() -> AsyncIterator[bytes]: + yield b"verify-me" + + registry = build_registry(get_settings()) + try: + package_id = await registry.create_package( + name="verify", + producer="tests", + subject="storage", + retention_class="raw-evidence", + actor="ops", + ) + await registry.ingest_file( + package_id, + relative_path="verify.txt", + media_type="text/plain", + stream=stream(), + actor="ops", + ) + finally: + await registry.dispose() + + asyncio.run(create_stored_file()) + result = runner.invoke(cli_app, ["storage", "verify", "--backend", "local"]) + + assert result.exit_code == 0, result.output + payload = json.loads(result.output) + assert payload["verified_count"] == 1 + assert payload["failed_count"] == 0 + assert payload["results"][0]["verified"] is True + + sync_engine = create_engine(f"sqlite:///{env_db}", future=True) + with sync_engine.connect() as conn: + status = conn.execute(select(storage_locations.c.status)).scalar_one() + sync_engine.dispose() + assert status == "verified" diff --git a/tests/unit/test_config_storage.py b/tests/unit/test_config_storage.py new file mode 100644 index 0000000..48102a2 --- /dev/null +++ b/tests/unit/test_config_storage.py @@ -0,0 +1,39 @@ +"""Storage configuration parsing tests.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from artifactstore.config import parse_backend_routes, resolve_secret_ref + + +def test_parse_backend_routes_supports_wildcards() -> None: + routes = parse_backend_routes("guide-board:release-evidence=s3,*:*=local") + assert len(routes) == 2 + assert routes[0].matches(producer="guide-board", retention_class="release-evidence") + assert not routes[0].matches(producer="guide-board", retention_class="raw-evidence") + assert routes[1].matches(producer="anything", retention_class="raw-evidence") + assert routes[0].backend_id == "s3" + + +def test_parse_backend_routes_rejects_invalid_entry() -> None: + with pytest.raises(ValueError): + parse_backend_routes("guide-board=s3") + + +def test_resolve_secret_ref_from_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("ARTIFACTSTORE_TEST_SECRET", "secret") + assert resolve_secret_ref("env:ARTIFACTSTORE_TEST_SECRET") == "secret" + + +def test_resolve_secret_ref_from_file(tmp_path: Path) -> None: + secret_file = tmp_path / "secret" + secret_file.write_text("secret\n", encoding="utf-8") + assert resolve_secret_ref(f"file:{secret_file}") == "secret" + + +def test_resolve_secret_ref_rejects_literal() -> None: + with pytest.raises(ValueError): + resolve_secret_ref("literal-secret") diff --git a/tests/unit/test_dataplane_inproc.py b/tests/unit/test_dataplane_inproc.py index 5f50df4..630d5d8 100644 --- a/tests/unit/test_dataplane_inproc.py +++ b/tests/unit/test_dataplane_inproc.py @@ -158,6 +158,24 @@ async def test_ingest_hints_override_primary_algorithm( assert result.primary_digest.hex == result.sha256_digest.hex +async def test_ingest_hints_route_to_named_backend(tmp_path: Path) -> None: + local = LocalBackend(tmp_path / "local", backend_id="local") + archive = LocalBackend(tmp_path / "archive", backend_id="archive") + dp = InProcessDataPlane( + {"local": local, "archive": archive}, + default_backend_id="local", + ) + + result = await dp.ingest_stream( + _stream(b"route-me"), + hints=IngestHints(backend_id="archive"), + ) + + assert result.receipt.backend_id == "archive" + assert not (local.root / result.receipt.object_key).exists() + assert (archive.root / result.receipt.object_key).exists() + + async def test_serve_missing_object_propagates_object_not_found( dataplane: InProcessDataPlane, ) -> None: diff --git a/tests/unit/test_storage_s3.py b/tests/unit/test_storage_s3.py new file mode 100644 index 0000000..dc23543 --- /dev/null +++ b/tests/unit/test_storage_s3.py @@ -0,0 +1,196 @@ +"""S3-compatible backend tests (ARTIFACT-STORE-WP-0004).""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from typing import Any + +import pytest + +from artifactstore.identity import ContentAddress, digest_bytes +from artifactstore.storage import ObjectNotFoundError, S3Backend, S3BackendConfig + + +async def _stream(data: bytes, chunk_size: int = 4) -> AsyncIterator[bytes]: + for i in range(0, len(data), chunk_size): + yield data[i : i + chunk_size] + + +async def _consume(stream: AsyncIterator[bytes]) -> bytes: + out = bytearray() + async for chunk in stream: + out.extend(chunk) + return bytes(out) + + +def _ca(data: bytes) -> ContentAddress: + return digest_bytes(data).primary.content_address + + +class FakeNotFoundError(Exception): + def __init__(self) -> None: + super().__init__("not found") + self.response = {"Error": {"Code": "NoSuchKey"}} + + +class FakeBody: + def __init__(self, data: bytes) -> None: + self._data = data + self._offset = 0 + + async def read(self, size: int) -> bytes: + if self._offset >= len(self._data): + return b"" + chunk = self._data[self._offset : self._offset + size] + self._offset += len(chunk) + return chunk + + +class FakeS3Client: + def __init__(self) -> None: + self.objects: dict[str, bytes] = {} + self.calls: list[tuple[str, dict[str, Any]]] = [] + self.uploads: dict[str, list[tuple[int, bytes]]] = {} + + async def __aenter__(self) -> FakeS3Client: + return self + + async def __aexit__(self, *_exc: object) -> None: + return None + + async def put_object(self, **kwargs: Any) -> None: + self.calls.append(("put_object", kwargs)) + self.objects[kwargs["Key"]] = kwargs["Body"] + + async def create_multipart_upload(self, **kwargs: Any) -> dict[str, str]: + self.calls.append(("create_multipart_upload", kwargs)) + upload_id = f"upload-{len(self.uploads) + 1}" + self.uploads[upload_id] = [] + return {"UploadId": upload_id} + + async def upload_part(self, **kwargs: Any) -> dict[str, str]: + self.calls.append(("upload_part", kwargs)) + self.uploads[kwargs["UploadId"]].append((kwargs["PartNumber"], kwargs["Body"])) + return {"ETag": f"etag-{kwargs['PartNumber']}"} + + async def complete_multipart_upload(self, **kwargs: Any) -> None: + self.calls.append(("complete_multipart_upload", kwargs)) + parts = self.uploads[kwargs["UploadId"]] + self.objects[kwargs["Key"]] = b"".join(part for _num, part in sorted(parts)) + + async def abort_multipart_upload(self, **kwargs: Any) -> None: + self.calls.append(("abort_multipart_upload", kwargs)) + + async def get_object(self, **kwargs: Any) -> dict[str, FakeBody]: + self.calls.append(("get_object", kwargs)) + try: + data = self.objects[kwargs["Key"]] + except KeyError as exc: + raise FakeNotFoundError from exc + range_header = kwargs.get("Range") + if range_header: + bounds = str(range_header).removeprefix("bytes=").split("-", maxsplit=1) + start = int(bounds[0]) + end = int(bounds[1]) + data = data[start : end + 1] + return {"Body": FakeBody(data)} + + async def head_object(self, **kwargs: Any) -> dict[str, int]: + self.calls.append(("head_object", kwargs)) + try: + data = self.objects[kwargs["Key"]] + except KeyError as exc: + raise FakeNotFoundError from exc + return {"ContentLength": len(data)} + + async def delete_object(self, **kwargs: Any) -> None: + self.calls.append(("delete_object", kwargs)) + self.objects.pop(kwargs["Key"], None) + + async def head_bucket(self, **kwargs: Any) -> None: + self.calls.append(("head_bucket", kwargs)) + + +@pytest.fixture +def fake_client() -> FakeS3Client: + return FakeS3Client() + + +@pytest.fixture +def backend(fake_client: FakeS3Client) -> S3Backend: + return S3Backend( + S3BackendConfig( + endpoint_url="http://minio.test", + region="us-east-1", + bucket="artifacts", + key_prefix="artifact-store", + storage_class="STANDARD", + sse="AES256", + multipart_threshold_bytes=8, + multipart_chunk_bytes=5, + ), + client_factory=lambda: fake_client, + chunk_size=3, + ) + + +async def test_put_get_head_delete_round_trip( + backend: S3Backend, + fake_client: FakeS3Client, +) -> None: + data = b"abc" + ca = _ca(data) + receipt = await backend.put(ca, _stream(data), size_hint=len(data)) + digest = ca.to_digest() + assert receipt.object_key == ( + f"artifact-store/{digest.algorithm}/{digest.hex[:2]}/{digest.hex[2:4]}/{digest.hex}" + ) + assert fake_client.calls[0][0] == "put_object" + assert fake_client.calls[0][1]["StorageClass"] == "STANDARD" + assert fake_client.calls[0][1]["ServerSideEncryption"] == "AES256" + + meta = await backend.head(ca) + assert meta.size_bytes == len(data) + + stream = await backend.get(ca) + assert await _consume(stream) == data + + await backend.delete(ca) + with pytest.raises(ObjectNotFoundError): + await backend.head(ca) + + +async def test_get_supports_range(backend: S3Backend, fake_client: FakeS3Client) -> None: + data = b"0123456789" + ca = _ca(data) + await backend.put(ca, _stream(data), size_hint=len(data)) + + stream = await backend.get(ca, byte_range=(2, 5)) + assert await _consume(stream) == b"2345" + assert fake_client.calls[-1][1]["Range"] == "bytes=2-5" + + +async def test_put_uses_multipart_above_threshold( + backend: S3Backend, + fake_client: FakeS3Client, +) -> None: + data = b"abcdefghijkl" + ca = _ca(data) + receipt = await backend.put(ca, _stream(data), size_hint=len(data)) + + assert receipt.size_bytes == len(data) + assert [name for name, _kwargs in fake_client.calls] == [ + "create_multipart_upload", + "upload_part", + "upload_part", + "upload_part", + "complete_multipart_upload", + ] + stream = await backend.get(ca) + assert await _consume(stream) == data + + +async def test_health_uses_head_bucket(backend: S3Backend) -> None: + status = await backend.health() + assert status.healthy is True + assert status.backend_id == "s3" diff --git a/workplans/ARTIFACT-STORE-WP-0004-s3-compatible-backend.md b/workplans/ARTIFACT-STORE-WP-0004-s3-compatible-backend.md index 318499f..98e569d 100644 --- a/workplans/ARTIFACT-STORE-WP-0004-s3-compatible-backend.md +++ b/workplans/ARTIFACT-STORE-WP-0004-s3-compatible-backend.md @@ -4,13 +4,13 @@ type: workplan title: "S3-Compatible Backend (Ceph RGW Target)" repo: artifact-store domain: stack -status: planned +status: active owner: codex topic_slug: stack planning_priority: medium planning_order: 4 created: "2026-05-15" -updated: "2026-05-15" +updated: "2026-05-16" state_hub_workstream_id: "d0526cfc-e532-431f-970d-f3e548d27a80" --- @@ -38,9 +38,9 @@ registry. ```task id: ARTIFACT-STORE-WP-0004-T001 -status: cancelled +status: done priority: high -state_hub_task_id: "7b980a55-2364-48c3-98ac-081629a8d2b7" +state_hub_task_id: "1db0d548-cdac-4b07-962b-bcafa3aae30e" ``` Acceptance: @@ -58,7 +58,7 @@ Acceptance: ```task id: ARTIFACT-STORE-WP-0004-T002 -status: todo +status: done priority: high state_hub_task_id: "14b50595-5820-4369-b037-b015fcbddcc4" ``` @@ -75,11 +75,15 @@ Acceptance: - `head`, `delete`, `health` implemented. - `delete` is idempotent (delete-of-missing returns success). +Decision: use `aioboto3` as the optional S3 client dependency. The +backend imports it lazily so local-only deployments do not need S3 +dependencies installed. + ## D4.3 - Backend Selection And Routing ```task id: ARTIFACT-STORE-WP-0004-T003 -status: todo +status: done priority: medium state_hub_task_id: "725dafd6-3337-4f81-b221-bb9f3a564d7e" ``` @@ -97,7 +101,7 @@ Acceptance: ```task id: ARTIFACT-STORE-WP-0004-T004 -status: todo +status: blocked priority: high state_hub_task_id: "4fd7b73b-7058-4edd-b5e3-edca396760d4" ``` @@ -111,11 +115,15 @@ Acceptance: endpoint; results recorded in `docs/OPERATOR.md`. - No CI dependency on a live Ceph or AWS account. +Blocked note: Docker is available, but this environment does not have +`aioboto3`, `boto3`, `testcontainers`, `uv`, or `pip`; MinIO container +tests need dependency/bootstrap support before they can be run honestly. + ## D4.5 - Verification Pass ```task id: ARTIFACT-STORE-WP-0004-T005 -status: todo +status: done priority: medium state_hub_task_id: "5a55546f-288f-4da0-a646-3d9319908279" ```