WP-0001-T003: storage adapter SPI and local filesystem backend

src/artifactstore/storage/:
- spi.py: StorageBackend Protocol (backend_id, put, get, head, delete,
  health) and result dataclasses (StorageReceipt, StorageObjectMetadata,
  DeletionResult, BackendStatus). ObjectNotFoundError exception type.
- registry.py: backend lookup by string ID (register/get/list_backends/
  clear) per ADR-0004.
- backends/local.py: LocalBackend implementation.
  * Object layout <root>/<algorithm>/<hex[0:2]>/<hex[2:4]>/<hex>.
  * Atomic writes: tmpfile + fsync + rename (idempotent re-puts drain the
    stream without rewriting).
  * Defence in depth: resolves the final path and asserts it remains under
    the configured root.
  * Range reads honour HTTP-style inclusive (start, end) tuples.
  * health() returns disk usage via shutil.disk_usage and surfaces an
    unhealthy status when the root has disappeared.
  * delete() cleans up emptied shard directories opportunistically.

tests/unit/test_storage_local.py (14 cases): put/get round-trip; object
key layout matches blueprint; head returns metadata; head/get missing
raise ObjectNotFoundError; put is idempotent; delete returns True then
False; range read returns subrange; range read rejects invalid range;
health reports disk usage; health reports unhealthy when root vanished;
ContentAddress validation blocks path-traversal-flavoured inputs;
registry register/get/list/clear round-trip; idempotent re-put leaves
bytes intact.

Gates: ruff clean, mypy --strict clean on 41 files, 59 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-16 02:01:25 +02:00
parent 0a49798699
commit 28ec2922d3
7 changed files with 558 additions and 3 deletions

View File

@@ -1,5 +1,42 @@
"""Storage adapter SPI and backend registry.
The SPI and local filesystem backend land in ARTIFACT-STORE-WP-0001-T003.
The S3-compatible backend lands in workplan WP-0004.
Backends address bytes by content address (ADR-0001). The SPI is small
(``put`` / ``get`` / ``head`` / ``delete`` / ``health``) so swapping or
adding adapters never touches the registry or API layers.
"""
from artifactstore.storage.backends.local import LocalBackend
from artifactstore.storage.registry import (
clear as clear_backends,
)
from artifactstore.storage.registry import (
get as get_backend,
)
from artifactstore.storage.registry import (
list_backends,
)
from artifactstore.storage.registry import (
register as register_backend,
)
from artifactstore.storage.spi import (
BackendStatus,
DeletionResult,
ObjectNotFoundError,
StorageBackend,
StorageObjectMetadata,
StorageReceipt,
)
__all__ = [
"BackendStatus",
"DeletionResult",
"LocalBackend",
"ObjectNotFoundError",
"StorageBackend",
"StorageObjectMetadata",
"StorageReceipt",
"clear_backends",
"get_backend",
"list_backends",
"register_backend",
]

View File

@@ -0,0 +1 @@
"""Concrete storage backend implementations."""

View File

@@ -0,0 +1,197 @@
"""Local filesystem storage backend (ARTIFACT-STORE-WP-0001-T003).
Object layout:
::
<root>/<algorithm>/<hex[0:2]>/<hex[2:4]>/<hex>
Writes are atomic (write to ``.tmp.<uuid>`` in the same directory, fsync,
rename). Path traversal is rejected at the boundary by validating the
``ContentAddress`` and resolving the final path against the configured root.
"""
from __future__ import annotations
import os
import shutil
import uuid
from collections.abc import AsyncIterator
from pathlib import Path
from artifactstore.identity import ContentAddress
from artifactstore.storage.spi import (
BackendStatus,
DeletionResult,
ObjectNotFoundError,
StorageObjectMetadata,
StorageReceipt,
)
__all__ = ["LocalBackend"]
_DEFAULT_CHUNK_SIZE = 64 * 1024
class LocalBackend:
"""Filesystem-backed :class:`StorageBackend` for dev and tests."""
def __init__(
self,
root: str | os.PathLike[str],
*,
backend_id: str = "local",
chunk_size: int = _DEFAULT_CHUNK_SIZE,
) -> None:
self._root = Path(root).resolve()
self._root.mkdir(parents=True, exist_ok=True)
self._backend_id = backend_id
self._chunk_size = chunk_size
@property
def backend_id(self) -> str:
return self._backend_id
@property
def root(self) -> Path:
return self._root
def _path_for(self, content_address: ContentAddress) -> Path:
digest = content_address.to_digest()
if len(digest.hex) < 4:
raise ValueError(f"digest hex too short for sharded layout: {content_address}")
path = self._root / digest.algorithm / digest.hex[0:2] / digest.hex[2:4] / digest.hex
# Defence in depth: the resolved path must live under the configured root.
resolved = path.resolve()
try:
resolved.relative_to(self._root)
except ValueError as exc:
raise ValueError(f"path traversal blocked for {content_address}") from exc
return path
def _object_key(self, path: Path) -> str:
return str(path.relative_to(self._root))
async def put(
self,
content_address: ContentAddress,
stream: AsyncIterator[bytes],
*,
size_hint: int | None = None,
) -> StorageReceipt:
path = self._path_for(content_address)
if path.is_file():
# Idempotent: content-addressed objects are immutable by definition.
# Drain the stream so the caller's generator can finalise.
async for _ in stream:
pass
return StorageReceipt(
backend_id=self._backend_id,
content_address=content_address,
object_key=self._object_key(path),
size_bytes=path.stat().st_size,
)
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_name(f"{path.name}.tmp.{uuid.uuid4().hex}")
size = 0
try:
with open(tmp, "wb") as fh:
async for chunk in stream:
fh.write(chunk)
size += len(chunk)
fh.flush()
os.fsync(fh.fileno())
tmp.replace(path)
finally:
tmp.unlink(missing_ok=True)
return StorageReceipt(
backend_id=self._backend_id,
content_address=content_address,
object_key=self._object_key(path),
size_bytes=size,
)
async def get(
self,
content_address: ContentAddress,
*,
byte_range: tuple[int, int] | None = None,
) -> AsyncIterator[bytes]:
path = self._path_for(content_address)
if not path.is_file():
raise ObjectNotFoundError(str(content_address))
return _stream_file(path, byte_range, self._chunk_size)
async def head(self, content_address: ContentAddress) -> StorageObjectMetadata:
path = self._path_for(content_address)
if not path.is_file():
raise ObjectNotFoundError(str(content_address))
return StorageObjectMetadata(
backend_id=self._backend_id,
content_address=content_address,
object_key=self._object_key(path),
size_bytes=path.stat().st_size,
)
async def delete(self, content_address: ContentAddress) -> DeletionResult:
path = self._path_for(content_address)
existed = path.is_file()
if existed:
path.unlink()
# Clean up empty parent directories opportunistically.
for parent in (path.parent, path.parent.parent, path.parent.parent.parent):
try:
if parent != self._root and not any(parent.iterdir()):
parent.rmdir()
except OSError:
break
return DeletionResult(
backend_id=self._backend_id,
content_address=content_address,
deleted=existed,
)
async def health(self) -> BackendStatus:
if not self._root.is_dir():
return BackendStatus(
backend_id=self._backend_id,
healthy=False,
detail=f"root is not a directory: {self._root}",
)
usage = shutil.disk_usage(self._root)
return BackendStatus(
backend_id=self._backend_id,
healthy=True,
detail="ok",
free_bytes=usage.free,
total_bytes=usage.total,
)
async def _stream_file(
path: Path,
byte_range: tuple[int, int] | None,
chunk_size: int,
) -> AsyncIterator[bytes]:
with open(path, "rb") as fh:
if byte_range is None:
while True:
chunk = fh.read(chunk_size)
if not chunk:
break
yield chunk
return
start, end_inclusive = byte_range
if start < 0 or end_inclusive < start:
raise ValueError(f"invalid byte range: {byte_range}")
fh.seek(start)
remaining = end_inclusive - start + 1
while remaining > 0:
chunk = fh.read(min(chunk_size, remaining))
if not chunk:
break
yield chunk
remaining -= len(chunk)

View File

@@ -0,0 +1,36 @@
"""Storage backend registry — lookup by ``backend_id`` (ADR-0004)."""
from __future__ import annotations
from artifactstore.storage.spi import StorageBackend
__all__ = ["clear", "get", "list_backends", "register"]
_backends: dict[str, StorageBackend] = {}
def register(backend: StorageBackend) -> None:
"""Register a backend instance under its ``backend_id``.
Re-registering the same ``backend_id`` replaces the prior instance.
"""
_backends[backend.backend_id] = backend
def get(backend_id: str) -> StorageBackend:
"""Return the backend registered as ``backend_id``."""
try:
return _backends[backend_id]
except KeyError as exc:
raise KeyError(f"unknown storage backend: {backend_id!r}") from exc
def list_backends() -> list[str]:
"""Return registered backend IDs, sorted."""
return sorted(_backends)
def clear() -> None:
"""Test helper: drop all registered backends."""
_backends.clear()

View File

@@ -0,0 +1,104 @@
"""Storage adapter SPI (ADR-0001 + ADR-0004).
A :class:`StorageBackend` addresses bytes by :class:`ContentAddress`; the
registry never sees backend-specific keys. The protocol surface is the
contract any future backend (S3-compatible, Ceph, cloud archive) must
satisfy.
``get`` is a coroutine that returns an :class:`AsyncIterator[bytes]`;
callers ``await`` the call and then ``async for`` the iterator.
"""
from __future__ import annotations
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import Protocol, runtime_checkable
from artifactstore.identity import ContentAddress
__all__ = [
"BackendStatus",
"DeletionResult",
"ObjectNotFoundError",
"StorageBackend",
"StorageObjectMetadata",
"StorageReceipt",
]
class ObjectNotFoundError(KeyError):
"""Raised when get/head/delete reference an object the backend does not hold."""
@dataclass(frozen=True, slots=True)
class StorageReceipt:
"""Record of a successful ``put``."""
backend_id: str
content_address: ContentAddress
object_key: str
size_bytes: int
@dataclass(frozen=True, slots=True)
class StorageObjectMetadata:
"""Object metadata returned by ``head``."""
backend_id: str
content_address: ContentAddress
object_key: str
size_bytes: int
@dataclass(frozen=True, slots=True)
class DeletionResult:
"""Outcome of ``delete``; ``deleted`` is ``False`` if the object did not exist."""
backend_id: str
content_address: ContentAddress
deleted: bool
@dataclass(frozen=True, slots=True)
class BackendStatus:
"""Result of ``health``."""
backend_id: str
healthy: bool
detail: str
free_bytes: int | None = None
total_bytes: int | None = None
@runtime_checkable
class StorageBackend(Protocol):
"""Backend contract.
Implementations capture configuration at construction time and expose
a stable, lowercase ``backend_id`` for registry lookup.
"""
@property
def backend_id(self) -> str: ...
async def put(
self,
content_address: ContentAddress,
stream: AsyncIterator[bytes],
*,
size_hint: int | None = None,
) -> StorageReceipt: ...
async def get(
self,
content_address: ContentAddress,
*,
byte_range: tuple[int, int] | None = None,
) -> AsyncIterator[bytes]: ...
async def head(self, content_address: ContentAddress) -> StorageObjectMetadata: ...
async def delete(self, content_address: ContentAddress) -> DeletionResult: ...
async def health(self) -> BackendStatus: ...