Add S3 backend and storage verification

This commit is contained in:
2026-05-16 23:26:03 +02:00
parent b7ceaf7682
commit 864f7f203c
18 changed files with 1085 additions and 40 deletions

View File

@@ -176,7 +176,12 @@ def create_app(settings: Settings | None = None) -> FastAPI:
async def health(registry: Registry = Depends(get_registry)) -> dict[str, Any]:
db_ok, db_detail = await registry.db_health()
backend_status = await registry.backend_health()
overall = "ok" if db_ok and backend_status.healthy else "degraded"
failed_storage_locations = await registry.failed_storage_locations_count()
overall = (
"ok"
if db_ok and backend_status.healthy and failed_storage_locations == 0
else "degraded"
)
return {
"service": "artifact-store",
"version": __version__,
@@ -189,6 +194,7 @@ def create_app(settings: Settings | None = None) -> FastAPI:
"free_bytes": backend_status.free_bytes,
"total_bytes": backend_status.total_bytes,
},
"storage": {"failed_locations": failed_storage_locations},
}
@application.get("/backends")
@@ -196,7 +202,7 @@ def create_app(settings: Settings | None = None) -> FastAPI:
_actor: str = Depends(require_read_auth),
registry: Registry = Depends(get_registry),
) -> dict[str, Any]:
backend_status = await registry.backend_health()
statuses = await registry.backend_health_all()
return {
"backends": [
{
@@ -206,6 +212,7 @@ def create_app(settings: Settings | None = None) -> FastAPI:
"free_bytes": backend_status.free_bytes,
"total_bytes": backend_status.total_bytes,
}
for backend_status in statuses
]
}

View File

@@ -7,13 +7,15 @@ control-plane consumers stay thin (per ADR-0004).
from __future__ import annotations
from artifactstore.config import Settings, get_settings
from collections.abc import Callable
from artifactstore.config import Settings, get_settings, resolve_secret_ref
from artifactstore.dataplane import InProcessDataPlane
from artifactstore.db.engine import create_engine
from artifactstore.events import RegistryViewWriter
from artifactstore.registry import Registry
from artifactstore.retention import RetentionPolicy
from artifactstore.storage import LocalBackend
from artifactstore.storage import LocalBackend, S3Backend, S3BackendConfig, StorageBackend
__all__ = ["build_registry"]
@@ -22,8 +24,65 @@ def build_registry(settings: Settings | None = None) -> Registry:
"""Wire engine, local FS backend, in-process data plane, and registry."""
effective = settings or get_settings()
engine = create_engine(effective)
backend = LocalBackend(effective.storage_local_root, backend_id="local")
dataplane = InProcessDataPlane(backend)
backends = _build_backends(effective)
dataplane = InProcessDataPlane(
backends,
default_backend_id=effective.storage_default_backend,
)
view_writer = RegistryViewWriter()
retention_policy = RetentionPolicy.from_toml(effective.retention_config_path)
return Registry(engine, dataplane, view_writer, retention_policy)
return Registry(
engine,
dataplane,
view_writer,
retention_policy,
backend_selector=_backend_selector(effective),
)
def _build_backends(settings: Settings) -> dict[str, StorageBackend]:
configured = settings.configured_backend_ids or ("local",)
backends: dict[str, StorageBackend] = {}
if "local" in configured:
backends["local"] = LocalBackend(settings.storage_local_root, backend_id="local")
if "s3" in configured:
access_key = (
resolve_secret_ref(settings.s3_access_key_ref)
if settings.s3_access_key_ref
else None
)
secret_key = (
resolve_secret_ref(settings.s3_secret_key_ref)
if settings.s3_secret_key_ref
else None
)
backends["s3"] = S3Backend(
S3BackendConfig(
endpoint_url=settings.s3_endpoint_url,
region=settings.s3_region,
bucket=settings.s3_bucket,
key_prefix=settings.s3_key_prefix,
access_key_id=access_key,
secret_access_key=secret_key,
storage_class=settings.s3_storage_class or None,
sse=settings.s3_sse or None,
multipart_threshold_bytes=settings.s3_multipart_threshold_bytes,
multipart_chunk_bytes=settings.s3_multipart_chunk_bytes,
)
)
unknown = set(configured) - set(backends)
if unknown:
raise ValueError(f"unknown storage backend ids: {sorted(unknown)}")
return backends
def _backend_selector(settings: Settings) -> Callable[[str, str], str | None]:
routes = settings.backend_routes
def select(producer: str, retention_class: str) -> str | None:
for route in routes:
if route.matches(producer=producer, retention_class=retention_class):
return route.backend_id
return settings.storage_default_backend
return select

View File

@@ -34,7 +34,9 @@ app = typer.Typer(
no_args_is_help=True,
)
retention_app = typer.Typer(help="Retention lifecycle commands", no_args_is_help=True)
storage_app = typer.Typer(help="Storage backend commands", no_args_is_help=True)
app.add_typer(retention_app, name="retention")
app.add_typer(storage_app, name="storage")
@app.callback()
@@ -182,6 +184,30 @@ def retention_sweep() -> None:
typer.echo(json.dumps({"marked_package_ids": marked, "marked_count": len(marked)}, indent=2))
@storage_app.command("verify")
def storage_verify(
backend: str | None = typer.Option(
None,
"--backend",
help="Backend id to verify; omit to verify every storage location.",
),
) -> None:
"""Re-read storage locations, recompute digests, and record verification events."""
settings = get_settings()
results = asyncio.run(_storage_verify_async(settings, backend_id=backend))
failed = [r for r in results if not r["verified"]]
typer.echo(
json.dumps(
{
"verified_count": len(results) - len(failed),
"failed_count": len(failed),
"results": results,
},
indent=2,
)
)
# ---- internals -------------------------------------------------------------
@@ -200,9 +226,14 @@ async def _health_async(settings: Settings) -> dict[str, Any]:
try:
db_ok, db_detail = await registry.db_health()
backend_status = await registry.backend_health()
failed_storage_locations = await registry.failed_storage_locations_count()
finally:
await registry.dispose()
overall = "ok" if db_ok and backend_status.healthy else "degraded"
overall = (
"ok"
if db_ok and backend_status.healthy and failed_storage_locations == 0
else "degraded"
)
return {
"service": "artifact-store",
"version": __version__,
@@ -215,6 +246,7 @@ async def _health_async(settings: Settings) -> dict[str, Any]:
"free_bytes": backend_status.free_bytes,
"total_bytes": backend_status.total_bytes,
},
"storage": {"failed_locations": failed_storage_locations},
}
@@ -229,6 +261,31 @@ async def _retention_sweep_async(settings: Settings) -> list[str]:
return [str(package_id) for package_id in marked]
async def _storage_verify_async(
settings: Settings,
*,
backend_id: str | None,
) -> list[dict[str, Any]]:
from artifactstore.app import build_registry
registry: Registry = build_registry(settings)
try:
results = await registry.verify_storage_locations(backend_id=backend_id)
finally:
await registry.dispose()
return [
{
"storage_location_id": str(result.storage_location_id),
"file_id": str(result.file_id),
"backend_id": result.backend_id,
"content_address": result.content_address,
"verified": result.verified,
"mismatch": result.mismatch,
}
for result in results
]
def _http_json(
method: str,
base_url: str,

View File

@@ -7,8 +7,32 @@ local development; see ``.env.example``.
from __future__ import annotations
from dataclasses import dataclass
from pydantic_settings import BaseSettings, SettingsConfigDict
__all__ = [
"BackendRoute",
"Settings",
"get_settings",
"parse_backend_routes",
"resolve_secret_ref",
]
@dataclass(frozen=True, slots=True)
class BackendRoute:
"""Route one producer/retention-class pair to a storage backend."""
producer: str
retention_class: str
backend_id: str
def matches(self, *, producer: str, retention_class: str) -> bool:
producer_match = self.producer == "*" or self.producer == producer
retention_match = self.retention_class == "*" or self.retention_class == retention_class
return producer_match and retention_match
class Settings(BaseSettings):
"""Top-level service configuration."""
@@ -29,6 +53,19 @@ class Settings(BaseSettings):
api_token: str = ""
retention_config_path: str = ""
retention_sweep_interval_seconds: int = 3600
storage_backends: str = "local"
storage_default_backend: str = "local"
storage_backend_routes: str = ""
s3_endpoint_url: str = ""
s3_region: str = "us-east-1"
s3_bucket: str = ""
s3_key_prefix: str = ""
s3_access_key_ref: str = ""
s3_secret_key_ref: str = ""
s3_storage_class: str = ""
s3_sse: str = ""
s3_multipart_threshold_bytes: int = 64 * 1024 * 1024
s3_multipart_chunk_bytes: int = 8 * 1024 * 1024
@property
def bearer_tokens(self) -> frozenset[str]:
@@ -39,6 +76,58 @@ class Settings(BaseSettings):
if token.strip()
)
@property
def configured_backend_ids(self) -> tuple[str, ...]:
return tuple(
backend_id.strip()
for backend_id in self.storage_backends.split(",")
if backend_id.strip()
)
@property
def backend_routes(self) -> tuple[BackendRoute, ...]:
return parse_backend_routes(self.storage_backend_routes)
def parse_backend_routes(value: str) -> tuple[BackendRoute, ...]:
"""Parse ``producer:retention_class=backend`` route entries."""
routes: list[BackendRoute] = []
for raw_entry in value.split(","):
entry = raw_entry.strip()
if not entry:
continue
selector, sep, backend_id = entry.partition("=")
if sep == "" or not backend_id.strip():
raise ValueError(f"invalid storage backend route: {entry!r}")
producer, selector_sep, retention_class = selector.partition(":")
if selector_sep == "":
raise ValueError(f"invalid storage backend route selector: {selector!r}")
routes.append(
BackendRoute(
producer=producer.strip() or "*",
retention_class=retention_class.strip() or "*",
backend_id=backend_id.strip(),
)
)
return tuple(routes)
def resolve_secret_ref(ref: str) -> str:
"""Resolve a secret reference from ``env:NAME`` or ``file:/path``."""
import os
from pathlib import Path
if ref.startswith("env:"):
name = ref.removeprefix("env:")
try:
return os.environ[name]
except KeyError as exc:
raise ValueError(f"environment variable {name!r} is not set") from exc
if ref.startswith("file:"):
path = Path(ref.removeprefix("file:"))
return path.read_text(encoding="utf-8").strip()
raise ValueError("secret references must use env:NAME or file:/path")
def get_settings() -> Settings:
"""Return a freshly-loaded :class:`Settings` instance."""

View File

@@ -11,7 +11,7 @@ from __future__ import annotations
import os
import tempfile
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Mapping
from pathlib import Path
from artifactstore.dataplane.spi import (
@@ -39,22 +39,36 @@ _DEFAULT_CHUNK_SIZE = 64 * 1024
class InProcessDataPlane:
"""The v1 data plane: wraps one :class:`StorageBackend`."""
"""The v1 data plane: wraps one or more :class:`StorageBackend` instances."""
def __init__(
self,
backend: StorageBackend,
backend: StorageBackend | Mapping[str, StorageBackend],
*,
default_backend_id: str | None = None,
tmp_dir: str | os.PathLike[str] | None = None,
chunk_size: int = _DEFAULT_CHUNK_SIZE,
) -> None:
self._backend = backend
if isinstance(backend, Mapping):
self._backends = dict(backend)
if not self._backends:
raise ValueError("at least one storage backend is required")
self._default_backend_id = default_backend_id or sorted(self._backends)[0]
else:
self._backends = {backend.backend_id: backend}
self._default_backend_id = default_backend_id or backend.backend_id
if self._default_backend_id not in self._backends:
raise ValueError(f"unknown default backend: {self._default_backend_id!r}")
self._tmp_dir = Path(tmp_dir) if tmp_dir is not None else None
self._chunk_size = chunk_size
@property
def backend(self) -> StorageBackend:
return self._backend
return self._select_backend(None)
@property
def backends(self) -> Mapping[str, StorageBackend]:
return dict(self._backends)
async def ingest_stream(
self,
@@ -64,6 +78,7 @@ class InProcessDataPlane:
) -> IngestResult:
effective_hints = hints or IngestHints()
primary_name = effective_hints.primary_algorithm or PRIMARY_ALGORITHM
backend = self._select_backend(effective_hints.backend_id)
primary_h = get_algorithm(primary_name)()
sha_h = get_algorithm(INTEROP_ALGORITHM)()
@@ -88,7 +103,7 @@ class InProcessDataPlane:
sha_digest = Digest(algorithm=INTEROP_ALGORITHM, hex=sha_h.hexdigest())
content_address = primary_digest.content_address
receipt = await self._backend.put(
receipt = await backend.put(
content_address,
_file_chunks(tmp_path, self._chunk_size),
size_hint=size,
@@ -108,16 +123,23 @@ class InProcessDataPlane:
content_address: ContentAddress,
*,
byte_range: tuple[int, int] | None = None,
backend_id: str | None = None,
) -> AsyncIterator[bytes]:
return await self._backend.get(content_address, byte_range=byte_range)
return await self._select_backend(backend_id).get(content_address, byte_range=byte_range)
async def verify_object(self, content_address: ContentAddress) -> VerifyResult:
async def verify_object(
self,
content_address: ContentAddress,
*,
backend_id: str | None = None,
) -> VerifyResult:
expected = content_address.to_digest()
primary_name = expected.algorithm
stream = await self._backend.get(content_address)
backend = self._select_backend(backend_id)
stream = await backend.get(content_address)
pair = await digest_stream(stream, primary=primary_name)
head = await self._backend.head(content_address)
head = await backend.head(content_address)
actual_size = head.size_bytes
if pair.primary.hex == expected.hex:
@@ -138,11 +160,26 @@ class InProcessDataPlane:
mismatch=(f"primary digest mismatch: expected {expected.hex}, got {pair.primary.hex}"),
)
async def delete_object(self, content_address: ContentAddress) -> DeletionResult:
return await self._backend.delete(content_address)
async def delete_object(
self,
content_address: ContentAddress,
*,
backend_id: str | None = None,
) -> DeletionResult:
return await self._select_backend(backend_id).delete(content_address)
async def backend_health(self) -> BackendStatus:
return await self._backend.health()
return await self._select_backend(None).health()
async def backend_health_all(self) -> list[BackendStatus]:
return [await backend.health() for backend in self._backends.values()]
def _select_backend(self, backend_id: str | None) -> StorageBackend:
selected = backend_id or self._default_backend_id
try:
return self._backends[selected]
except KeyError as exc:
raise KeyError(f"unknown storage backend: {selected!r}") from exc
async def _file_chunks(path: Path, chunk_size: int) -> AsyncIterator[bytes]:

View File

@@ -84,10 +84,21 @@ class DataPlane(Protocol):
content_address: ContentAddress,
*,
byte_range: tuple[int, int] | None = None,
backend_id: str | None = None,
) -> AsyncIterator[bytes]: ...
async def verify_object(self, content_address: ContentAddress) -> VerifyResult: ...
async def verify_object(
self,
content_address: ContentAddress,
*,
backend_id: str | None = None,
) -> VerifyResult: ...
async def delete_object(self, content_address: ContentAddress) -> DeletionResult: ...
async def delete_object(
self,
content_address: ContentAddress,
*,
backend_id: str | None = None,
) -> DeletionResult: ...
async def backend_health(self) -> BackendStatus: ...

View File

@@ -250,6 +250,21 @@ async def _apply_retention_deletion_eligible(
)
async def _apply_storage_location_verified(
connection: AsyncConnection,
event: Event,
) -> None:
payload = cbor2.loads(event.payload)
await connection.execute(
update(storage_locations)
.where(storage_locations.c.id == UUID(payload["storage_location_id"]))
.values(
status="verified" if payload["verified"] else "failed",
last_verified_at=event.created_at,
)
)
def _parse_iso(value: str | None) -> datetime | None:
if value is None:
return None
@@ -268,4 +283,5 @@ _HANDLERS = {
"v1.retention.hold_applied": _apply_retention_hold_applied,
"v1.retention.hold_released": _apply_retention_hold_released,
"v1.retention.deletion_eligible": _apply_retention_deletion_eligible,
"v1.storage.location_verified": _apply_storage_location_verified,
}

View File

@@ -19,17 +19,17 @@ source of truth (ADR-0002).
from __future__ import annotations
import uuid
from collections.abc import AsyncIterator, Sequence
from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any
from typing import Any, cast
from uuid import UUID
import cbor2
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncEngine
from artifactstore.dataplane.spi import DataPlane
from artifactstore.dataplane.spi import DataPlane, IngestHints
from artifactstore.db.schema import (
artifact_files,
artifact_packages,
@@ -76,6 +76,7 @@ __all__ = [
"RetentionClassRecord",
"RetentionStateError",
"RetentionStateRecord",
"StorageVerificationRecord",
]
@@ -158,6 +159,18 @@ class RetentionStateRecord:
eligible_for_deletion: bool
@dataclass(frozen=True, slots=True)
class StorageVerificationRecord:
"""Result of verifying one storage location."""
storage_location_id: UUID
file_id: UUID
backend_id: str
content_address: str
verified: bool
mismatch: str | None
_RETENTION_EVENT_TYPES = (
"v1.retention.default_applied",
"v1.retention.extended",
@@ -176,11 +189,13 @@ class Registry:
dataplane: DataPlane,
view_writer: RegistryViewWriter | None = None,
retention_policy: RetentionPolicy | None = None,
backend_selector: Callable[[str, str], str | None] | None = None,
) -> None:
self._engine = engine
self._dataplane = dataplane
self._view_writer = view_writer or RegistryViewWriter()
self._retention_policy = retention_policy or RetentionPolicy()
self._backend_selector = backend_selector
# ---- mutating operations ------------------------------------------------
@@ -279,7 +294,15 @@ class Registry:
f"relative_path {relative_path!r} already exists in package {package_id}"
)
ingest = await self._dataplane.ingest_stream(stream)
selected_backend = (
self._backend_selector(pkg_row.producer, pkg_row.retention_class)
if self._backend_selector is not None
else None
)
ingest = await self._dataplane.ingest_stream(
stream,
hints=IngestHints(backend_id=selected_backend),
)
file_id = uuid.uuid4()
storage_location_id = uuid.uuid4()
@@ -691,6 +714,73 @@ class Registry:
for r in rows
]
async def verify_storage_locations(
self,
*,
backend_id: str | None = None,
actor: str = "storage-verifier",
) -> list[StorageVerificationRecord]:
"""Re-read storage locations and emit verification events."""
stmt = select(storage_locations)
if backend_id is not None:
stmt = stmt.where(storage_locations.c.backend_id == backend_id)
async with self._engine.connect() as conn:
rows = (await conn.execute(stmt.order_by(storage_locations.c.id))).all()
results: list[StorageVerificationRecord] = []
for row in rows:
ca = ContentAddress(row.content_address)
verified = False
mismatch: str | None = None
actual_size_bytes: int | None = None
actual_primary_hex: str | None = None
actual_sha256_hex: str | None = None
try:
result = await self._dataplane.verify_object(ca, backend_id=row.backend_id)
verified = result.verified
mismatch = result.mismatch
actual_size_bytes = result.actual_size_bytes
actual_primary_hex = result.actual_primary_digest.hex
actual_sha256_hex = result.actual_sha256_digest.hex
except Exception as exc:
mismatch = f"{type(exc).__name__}: {exc}"
payload = cbor2.dumps(
{
"storage_location_id": str(row.id),
"file_id": str(row.artifact_file_id),
"backend_id": row.backend_id,
"content_address": row.content_address,
"verified": verified,
"mismatch": mismatch,
"actual_size_bytes": actual_size_bytes,
"actual_primary_hex": actual_primary_hex,
"actual_sha256_hex": actual_sha256_hex,
},
canonical=True,
)
event = make_event(
event_type="v1.storage.location_verified",
subject_kind="storage",
subject_id=row.artifact_file_id,
actor=actor,
payload=payload,
)
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
results.append(
StorageVerificationRecord(
storage_location_id=row.id,
file_id=row.artifact_file_id,
backend_id=row.backend_id,
content_address=row.content_address,
verified=verified,
mismatch=mismatch,
)
)
return results
async def get_manifest_bytes(self, package_id: UUID, *, format: str = "cbor") -> bytes:
"""Return the finalised manifest. ``format`` is ``cbor`` (canonical
CBOR, the wire form) or ``json`` (the JCS projection)."""
@@ -724,7 +814,11 @@ class Registry:
"""Return an async byte iterator for the bytes of a stored file."""
record = await self.get_file_metadata(file_id)
ca = ContentAddress(record.content_address)
return await self._dataplane.serve_object(ca, byte_range=byte_range)
return await self._dataplane.serve_object(
ca,
byte_range=byte_range,
backend_id=record.backend_id,
)
async def fetch_events(
self,
@@ -769,6 +863,24 @@ class Registry:
"""Probe the configured storage backend through the data plane."""
return await self._dataplane.backend_health()
async def backend_health_all(self) -> list[BackendStatus]:
"""Probe every configured storage backend when the data plane supports it."""
probe_all = getattr(self._dataplane, "backend_health_all", None)
if probe_all is None:
return [await self.backend_health()]
typed_probe = cast(Callable[[], Awaitable[list[BackendStatus]]], probe_all)
return await typed_probe()
async def failed_storage_locations_count(self) -> int:
"""Count storage locations currently marked failed."""
async with self._engine.connect() as conn:
rows = (
await conn.execute(
select(storage_locations.c.id).where(storage_locations.c.status == "failed")
)
).all()
return len(rows)
async def dispose(self) -> None:
"""Release the engine's connection pool. Idempotent."""
await self._engine.dispose()

View File

@@ -6,6 +6,7 @@ adding adapters never touches the registry or API layers.
"""
from artifactstore.storage.backends.local import LocalBackend
from artifactstore.storage.backends.s3 import S3Backend, S3BackendConfig
from artifactstore.storage.registry import (
clear as clear_backends,
)
@@ -32,6 +33,8 @@ __all__ = [
"DeletionResult",
"LocalBackend",
"ObjectNotFoundError",
"S3Backend",
"S3BackendConfig",
"StorageBackend",
"StorageObjectMetadata",
"StorageReceipt",

View File

@@ -0,0 +1,261 @@
"""S3-compatible storage backend (Ceph RGW / MinIO / AWS S3)."""
from __future__ import annotations
from collections.abc import AsyncIterator, Callable
from contextlib import AbstractAsyncContextManager
from dataclasses import dataclass
from typing import Any, cast
from artifactstore.identity import ContentAddress
from artifactstore.storage.spi import (
BackendStatus,
DeletionResult,
ObjectNotFoundError,
StorageObjectMetadata,
StorageReceipt,
)
__all__ = ["S3Backend", "S3BackendConfig"]
_DEFAULT_CHUNK_SIZE = 64 * 1024
@dataclass(frozen=True, slots=True)
class S3BackendConfig:
endpoint_url: str
region: str
bucket: str
key_prefix: str = ""
access_key_id: str | None = None
secret_access_key: str | None = None
storage_class: str | None = None
sse: str | None = None
multipart_threshold_bytes: int = 64 * 1024 * 1024
multipart_chunk_bytes: int = 8 * 1024 * 1024
ClientFactory = Callable[[], AbstractAsyncContextManager[Any]]
class S3Backend:
"""Storage SPI implementation over an S3-compatible object store."""
def __init__(
self,
config: S3BackendConfig,
*,
backend_id: str = "s3",
client_factory: ClientFactory | None = None,
chunk_size: int = _DEFAULT_CHUNK_SIZE,
) -> None:
if not config.bucket:
raise ValueError("S3 bucket is required")
self._config = config
self._backend_id = backend_id
self._client_factory = client_factory
self._chunk_size = chunk_size
@property
def backend_id(self) -> str:
return self._backend_id
def _client(self) -> AbstractAsyncContextManager[Any]:
if self._client_factory is not None:
return self._client_factory()
try:
import aioboto3 # type: ignore[import-not-found]
except ModuleNotFoundError as exc:
raise RuntimeError(
"S3Backend requires the 'aioboto3' package; install artifactstore[s3]"
) from exc
session = aioboto3.Session(
aws_access_key_id=self._config.access_key_id,
aws_secret_access_key=self._config.secret_access_key,
region_name=self._config.region,
)
return cast(
AbstractAsyncContextManager[Any],
session.client("s3", endpoint_url=self._config.endpoint_url),
)
def _object_key(self, content_address: ContentAddress) -> str:
digest = content_address.to_digest()
if len(digest.hex) < 4:
raise ValueError(f"digest hex too short for sharded layout: {content_address}")
key = f"{digest.algorithm}/{digest.hex[0:2]}/{digest.hex[2:4]}/{digest.hex}"
prefix = self._config.key_prefix.strip("/")
return f"{prefix}/{key}" if prefix else key
async def put(
self,
content_address: ContentAddress,
stream: AsyncIterator[bytes],
*,
size_hint: int | None = None,
) -> StorageReceipt:
key = self._object_key(content_address)
if size_hint is not None and size_hint >= self._config.multipart_threshold_bytes:
return await self._put_multipart(content_address, key, stream, size_hint=size_hint)
data = bytearray()
async for chunk in stream:
data.extend(chunk)
async with self._client() as client:
await client.put_object(
Bucket=self._config.bucket,
Key=key,
Body=bytes(data),
**self._put_options(),
)
return StorageReceipt(
backend_id=self._backend_id,
content_address=content_address,
object_key=key,
size_bytes=len(data),
)
async def _put_multipart(
self,
content_address: ContentAddress,
key: str,
stream: AsyncIterator[bytes],
*,
size_hint: int,
) -> StorageReceipt:
async with self._client() as client:
created = await client.create_multipart_upload(
Bucket=self._config.bucket,
Key=key,
**self._put_options(),
)
upload_id = created["UploadId"]
parts: list[dict[str, Any]] = []
part_number = 1
buffered = bytearray()
try:
async for chunk in stream:
buffered.extend(chunk)
while len(buffered) >= self._config.multipart_chunk_bytes:
part = bytes(buffered[: self._config.multipart_chunk_bytes])
del buffered[: self._config.multipart_chunk_bytes]
uploaded = await client.upload_part(
Bucket=self._config.bucket,
Key=key,
UploadId=upload_id,
PartNumber=part_number,
Body=part,
)
parts.append({"ETag": uploaded["ETag"], "PartNumber": part_number})
part_number += 1
if buffered:
uploaded = await client.upload_part(
Bucket=self._config.bucket,
Key=key,
UploadId=upload_id,
PartNumber=part_number,
Body=bytes(buffered),
)
parts.append({"ETag": uploaded["ETag"], "PartNumber": part_number})
await client.complete_multipart_upload(
Bucket=self._config.bucket,
Key=key,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
except Exception:
await client.abort_multipart_upload(
Bucket=self._config.bucket,
Key=key,
UploadId=upload_id,
)
raise
return StorageReceipt(
backend_id=self._backend_id,
content_address=content_address,
object_key=key,
size_bytes=size_hint,
)
async def get(
self,
content_address: ContentAddress,
*,
byte_range: tuple[int, int] | None = None,
) -> AsyncIterator[bytes]:
key = self._object_key(content_address)
async def iterator() -> AsyncIterator[bytes]:
kwargs: dict[str, Any] = {"Bucket": self._config.bucket, "Key": key}
if byte_range is not None:
start, end = byte_range
kwargs["Range"] = f"bytes={start}-{end}"
async with self._client() as client:
try:
response = await client.get_object(**kwargs)
except Exception as exc:
if _is_not_found(exc):
raise ObjectNotFoundError(str(content_address)) from exc
raise
body = response["Body"]
while True:
chunk = await body.read(self._chunk_size)
if not chunk:
break
yield chunk
return iterator()
async def head(self, content_address: ContentAddress) -> StorageObjectMetadata:
key = self._object_key(content_address)
async with self._client() as client:
try:
response = await client.head_object(Bucket=self._config.bucket, Key=key)
except Exception as exc:
if _is_not_found(exc):
raise ObjectNotFoundError(str(content_address)) from exc
raise
return StorageObjectMetadata(
backend_id=self._backend_id,
content_address=content_address,
object_key=key,
size_bytes=int(response["ContentLength"]),
)
async def delete(self, content_address: ContentAddress) -> DeletionResult:
key = self._object_key(content_address)
async with self._client() as client:
await client.delete_object(Bucket=self._config.bucket, Key=key)
return DeletionResult(
backend_id=self._backend_id,
content_address=content_address,
deleted=True,
)
async def health(self) -> BackendStatus:
try:
async with self._client() as client:
await client.head_bucket(Bucket=self._config.bucket)
except Exception as exc:
return BackendStatus(
backend_id=self._backend_id,
healthy=False,
detail=f"{type(exc).__name__}: {exc}",
)
return BackendStatus(backend_id=self._backend_id, healthy=True, detail="ok")
def _put_options(self) -> dict[str, str]:
options: dict[str, str] = {}
if self._config.storage_class:
options["StorageClass"] = self._config.storage_class
if self._config.sse:
options["ServerSideEncryption"] = self._config.sse
return options
def _is_not_found(exc: Exception) -> bool:
response = getattr(exc, "response", None)
if isinstance(response, dict):
code = response.get("Error", {}).get("Code")
return str(code) in {"404", "NoSuchKey", "NotFound"}
return False