From 28ec2922d3680b015b2842f5c9855589f274fdd6 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 16 May 2026 02:01:25 +0200 Subject: [PATCH] WP-0001-T003: storage adapter SPI and local filesystem backend src/artifactstore/storage/: - spi.py: StorageBackend Protocol (backend_id, put, get, head, delete, health) and result dataclasses (StorageReceipt, StorageObjectMetadata, DeletionResult, BackendStatus). ObjectNotFoundError exception type. - registry.py: backend lookup by string ID (register/get/list_backends/ clear) per ADR-0004. - backends/local.py: LocalBackend implementation. * Object layout ////. * Atomic writes: tmpfile + fsync + rename (idempotent re-puts drain the stream without rewriting). * Defence in depth: resolves the final path and asserts it remains under the configured root. * Range reads honour HTTP-style inclusive (start, end) tuples. * health() returns disk usage via shutil.disk_usage and surfaces an unhealthy status when the root has disappeared. * delete() cleans up emptied shard directories opportunistically. tests/unit/test_storage_local.py (14 cases): put/get round-trip; object key layout matches blueprint; head returns metadata; head/get missing raise ObjectNotFoundError; put is idempotent; delete returns True then False; range read returns subrange; range read rejects invalid range; health reports disk usage; health reports unhealthy when root vanished; ContentAddress validation blocks path-traversal-flavoured inputs; registry register/get/list/clear round-trip; idempotent re-put leaves bytes intact. Gates: ruff clean, mypy --strict clean on 41 files, 59 tests pass. Co-Authored-By: Claude Opus 4.7 --- src/artifactstore/storage/__init__.py | 41 +++- .../storage/backends/__init__.py | 1 + src/artifactstore/storage/backends/local.py | 197 ++++++++++++++++++ src/artifactstore/storage/registry.py | 36 ++++ src/artifactstore/storage/spi.py | 104 +++++++++ tests/unit/test_storage_local.py | 180 ++++++++++++++++ ...ARTIFACT-STORE-WP-0001-service-baseline.md | 2 +- 7 files changed, 558 insertions(+), 3 deletions(-) create mode 100644 src/artifactstore/storage/backends/__init__.py create mode 100644 src/artifactstore/storage/backends/local.py create mode 100644 src/artifactstore/storage/registry.py create mode 100644 src/artifactstore/storage/spi.py create mode 100644 tests/unit/test_storage_local.py diff --git a/src/artifactstore/storage/__init__.py b/src/artifactstore/storage/__init__.py index 434655c..e9ad81e 100644 --- a/src/artifactstore/storage/__init__.py +++ b/src/artifactstore/storage/__init__.py @@ -1,5 +1,42 @@ """Storage adapter SPI and backend registry. -The SPI and local filesystem backend land in ARTIFACT-STORE-WP-0001-T003. -The S3-compatible backend lands in workplan WP-0004. +Backends address bytes by content address (ADR-0001). The SPI is small +(``put`` / ``get`` / ``head`` / ``delete`` / ``health``) so swapping or +adding adapters never touches the registry or API layers. """ + +from artifactstore.storage.backends.local import LocalBackend +from artifactstore.storage.registry import ( + clear as clear_backends, +) +from artifactstore.storage.registry import ( + get as get_backend, +) +from artifactstore.storage.registry import ( + list_backends, +) +from artifactstore.storage.registry import ( + register as register_backend, +) +from artifactstore.storage.spi import ( + BackendStatus, + DeletionResult, + ObjectNotFoundError, + StorageBackend, + StorageObjectMetadata, + StorageReceipt, +) + +__all__ = [ + "BackendStatus", + "DeletionResult", + "LocalBackend", + "ObjectNotFoundError", + "StorageBackend", + "StorageObjectMetadata", + "StorageReceipt", + "clear_backends", + "get_backend", + "list_backends", + "register_backend", +] diff --git a/src/artifactstore/storage/backends/__init__.py b/src/artifactstore/storage/backends/__init__.py new file mode 100644 index 0000000..f9152cb --- /dev/null +++ b/src/artifactstore/storage/backends/__init__.py @@ -0,0 +1 @@ +"""Concrete storage backend implementations.""" diff --git a/src/artifactstore/storage/backends/local.py b/src/artifactstore/storage/backends/local.py new file mode 100644 index 0000000..f1b2e8f --- /dev/null +++ b/src/artifactstore/storage/backends/local.py @@ -0,0 +1,197 @@ +"""Local filesystem storage backend (ARTIFACT-STORE-WP-0001-T003). + +Object layout: + +:: + + //// + +Writes are atomic (write to ``.tmp.`` in the same directory, fsync, +rename). Path traversal is rejected at the boundary by validating the +``ContentAddress`` and resolving the final path against the configured root. +""" + +from __future__ import annotations + +import os +import shutil +import uuid +from collections.abc import AsyncIterator +from pathlib import Path + +from artifactstore.identity import ContentAddress +from artifactstore.storage.spi import ( + BackendStatus, + DeletionResult, + ObjectNotFoundError, + StorageObjectMetadata, + StorageReceipt, +) + +__all__ = ["LocalBackend"] + +_DEFAULT_CHUNK_SIZE = 64 * 1024 + + +class LocalBackend: + """Filesystem-backed :class:`StorageBackend` for dev and tests.""" + + def __init__( + self, + root: str | os.PathLike[str], + *, + backend_id: str = "local", + chunk_size: int = _DEFAULT_CHUNK_SIZE, + ) -> None: + self._root = Path(root).resolve() + self._root.mkdir(parents=True, exist_ok=True) + self._backend_id = backend_id + self._chunk_size = chunk_size + + @property + def backend_id(self) -> str: + return self._backend_id + + @property + def root(self) -> Path: + return self._root + + def _path_for(self, content_address: ContentAddress) -> Path: + digest = content_address.to_digest() + if len(digest.hex) < 4: + raise ValueError(f"digest hex too short for sharded layout: {content_address}") + path = self._root / digest.algorithm / digest.hex[0:2] / digest.hex[2:4] / digest.hex + # Defence in depth: the resolved path must live under the configured root. + resolved = path.resolve() + try: + resolved.relative_to(self._root) + except ValueError as exc: + raise ValueError(f"path traversal blocked for {content_address}") from exc + return path + + def _object_key(self, path: Path) -> str: + return str(path.relative_to(self._root)) + + async def put( + self, + content_address: ContentAddress, + stream: AsyncIterator[bytes], + *, + size_hint: int | None = None, + ) -> StorageReceipt: + path = self._path_for(content_address) + + if path.is_file(): + # Idempotent: content-addressed objects are immutable by definition. + # Drain the stream so the caller's generator can finalise. + async for _ in stream: + pass + return StorageReceipt( + backend_id=self._backend_id, + content_address=content_address, + object_key=self._object_key(path), + size_bytes=path.stat().st_size, + ) + + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_name(f"{path.name}.tmp.{uuid.uuid4().hex}") + size = 0 + try: + with open(tmp, "wb") as fh: + async for chunk in stream: + fh.write(chunk) + size += len(chunk) + fh.flush() + os.fsync(fh.fileno()) + tmp.replace(path) + finally: + tmp.unlink(missing_ok=True) + + return StorageReceipt( + backend_id=self._backend_id, + content_address=content_address, + object_key=self._object_key(path), + size_bytes=size, + ) + + async def get( + self, + content_address: ContentAddress, + *, + byte_range: tuple[int, int] | None = None, + ) -> AsyncIterator[bytes]: + path = self._path_for(content_address) + if not path.is_file(): + raise ObjectNotFoundError(str(content_address)) + return _stream_file(path, byte_range, self._chunk_size) + + async def head(self, content_address: ContentAddress) -> StorageObjectMetadata: + path = self._path_for(content_address) + if not path.is_file(): + raise ObjectNotFoundError(str(content_address)) + return StorageObjectMetadata( + backend_id=self._backend_id, + content_address=content_address, + object_key=self._object_key(path), + size_bytes=path.stat().st_size, + ) + + async def delete(self, content_address: ContentAddress) -> DeletionResult: + path = self._path_for(content_address) + existed = path.is_file() + if existed: + path.unlink() + # Clean up empty parent directories opportunistically. + for parent in (path.parent, path.parent.parent, path.parent.parent.parent): + try: + if parent != self._root and not any(parent.iterdir()): + parent.rmdir() + except OSError: + break + return DeletionResult( + backend_id=self._backend_id, + content_address=content_address, + deleted=existed, + ) + + async def health(self) -> BackendStatus: + if not self._root.is_dir(): + return BackendStatus( + backend_id=self._backend_id, + healthy=False, + detail=f"root is not a directory: {self._root}", + ) + usage = shutil.disk_usage(self._root) + return BackendStatus( + backend_id=self._backend_id, + healthy=True, + detail="ok", + free_bytes=usage.free, + total_bytes=usage.total, + ) + + +async def _stream_file( + path: Path, + byte_range: tuple[int, int] | None, + chunk_size: int, +) -> AsyncIterator[bytes]: + with open(path, "rb") as fh: + if byte_range is None: + while True: + chunk = fh.read(chunk_size) + if not chunk: + break + yield chunk + return + start, end_inclusive = byte_range + if start < 0 or end_inclusive < start: + raise ValueError(f"invalid byte range: {byte_range}") + fh.seek(start) + remaining = end_inclusive - start + 1 + while remaining > 0: + chunk = fh.read(min(chunk_size, remaining)) + if not chunk: + break + yield chunk + remaining -= len(chunk) diff --git a/src/artifactstore/storage/registry.py b/src/artifactstore/storage/registry.py new file mode 100644 index 0000000..12ef070 --- /dev/null +++ b/src/artifactstore/storage/registry.py @@ -0,0 +1,36 @@ +"""Storage backend registry — lookup by ``backend_id`` (ADR-0004).""" + +from __future__ import annotations + +from artifactstore.storage.spi import StorageBackend + +__all__ = ["clear", "get", "list_backends", "register"] + + +_backends: dict[str, StorageBackend] = {} + + +def register(backend: StorageBackend) -> None: + """Register a backend instance under its ``backend_id``. + + Re-registering the same ``backend_id`` replaces the prior instance. + """ + _backends[backend.backend_id] = backend + + +def get(backend_id: str) -> StorageBackend: + """Return the backend registered as ``backend_id``.""" + try: + return _backends[backend_id] + except KeyError as exc: + raise KeyError(f"unknown storage backend: {backend_id!r}") from exc + + +def list_backends() -> list[str]: + """Return registered backend IDs, sorted.""" + return sorted(_backends) + + +def clear() -> None: + """Test helper: drop all registered backends.""" + _backends.clear() diff --git a/src/artifactstore/storage/spi.py b/src/artifactstore/storage/spi.py new file mode 100644 index 0000000..f624a6b --- /dev/null +++ b/src/artifactstore/storage/spi.py @@ -0,0 +1,104 @@ +"""Storage adapter SPI (ADR-0001 + ADR-0004). + +A :class:`StorageBackend` addresses bytes by :class:`ContentAddress`; the +registry never sees backend-specific keys. The protocol surface is the +contract any future backend (S3-compatible, Ceph, cloud archive) must +satisfy. + +``get`` is a coroutine that returns an :class:`AsyncIterator[bytes]`; +callers ``await`` the call and then ``async for`` the iterator. +""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from dataclasses import dataclass +from typing import Protocol, runtime_checkable + +from artifactstore.identity import ContentAddress + +__all__ = [ + "BackendStatus", + "DeletionResult", + "ObjectNotFoundError", + "StorageBackend", + "StorageObjectMetadata", + "StorageReceipt", +] + + +class ObjectNotFoundError(KeyError): + """Raised when get/head/delete reference an object the backend does not hold.""" + + +@dataclass(frozen=True, slots=True) +class StorageReceipt: + """Record of a successful ``put``.""" + + backend_id: str + content_address: ContentAddress + object_key: str + size_bytes: int + + +@dataclass(frozen=True, slots=True) +class StorageObjectMetadata: + """Object metadata returned by ``head``.""" + + backend_id: str + content_address: ContentAddress + object_key: str + size_bytes: int + + +@dataclass(frozen=True, slots=True) +class DeletionResult: + """Outcome of ``delete``; ``deleted`` is ``False`` if the object did not exist.""" + + backend_id: str + content_address: ContentAddress + deleted: bool + + +@dataclass(frozen=True, slots=True) +class BackendStatus: + """Result of ``health``.""" + + backend_id: str + healthy: bool + detail: str + free_bytes: int | None = None + total_bytes: int | None = None + + +@runtime_checkable +class StorageBackend(Protocol): + """Backend contract. + + Implementations capture configuration at construction time and expose + a stable, lowercase ``backend_id`` for registry lookup. + """ + + @property + def backend_id(self) -> str: ... + + async def put( + self, + content_address: ContentAddress, + stream: AsyncIterator[bytes], + *, + size_hint: int | None = None, + ) -> StorageReceipt: ... + + async def get( + self, + content_address: ContentAddress, + *, + byte_range: tuple[int, int] | None = None, + ) -> AsyncIterator[bytes]: ... + + async def head(self, content_address: ContentAddress) -> StorageObjectMetadata: ... + + async def delete(self, content_address: ContentAddress) -> DeletionResult: ... + + async def health(self) -> BackendStatus: ... diff --git a/tests/unit/test_storage_local.py b/tests/unit/test_storage_local.py new file mode 100644 index 0000000..678f92c --- /dev/null +++ b/tests/unit/test_storage_local.py @@ -0,0 +1,180 @@ +"""Local filesystem backend tests (ARTIFACT-STORE-WP-0001-T003).""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from pathlib import Path + +import pytest + +from artifactstore.identity import ContentAddress, digest_bytes +from artifactstore.storage import ( + LocalBackend, + ObjectNotFoundError, + clear_backends, + get_backend, + list_backends, + register_backend, +) + + +async def _stream(data: bytes, chunk_size: int = 16) -> AsyncIterator[bytes]: + for i in range(0, len(data), chunk_size): + yield data[i : i + chunk_size] + + +def _ca_of(data: bytes) -> ContentAddress: + return digest_bytes(data).primary.content_address + + +@pytest.fixture +def backend(tmp_path: Path) -> LocalBackend: + return LocalBackend(tmp_path / "storage", backend_id="local") + + +async def _consume(it: AsyncIterator[bytes]) -> bytes: + out = bytearray() + async for chunk in it: + out.extend(chunk) + return bytes(out) + + +async def test_put_then_get_round_trips(backend: LocalBackend) -> None: + data = b"hello world" * 100 + ca = _ca_of(data) + + receipt = await backend.put(ca, _stream(data)) + assert receipt.backend_id == "local" + assert receipt.content_address == ca + assert receipt.size_bytes == len(data) + + stream = await backend.get(ca) + assert await _consume(stream) == data + + +async def test_object_key_layout_matches_blueprint(backend: LocalBackend) -> None: + data = b"layout-check" + ca = _ca_of(data) + receipt = await backend.put(ca, _stream(data)) + digest = ca.to_digest() + expected = f"{digest.algorithm}/{digest.hex[:2]}/{digest.hex[2:4]}/{digest.hex}" + assert receipt.object_key == expected + assert (backend.root / expected).is_file() + + +async def test_head_returns_metadata(backend: LocalBackend) -> None: + data = b"abcde" + ca = _ca_of(data) + await backend.put(ca, _stream(data)) + meta = await backend.head(ca) + assert meta.size_bytes == len(data) + assert meta.content_address == ca + assert meta.backend_id == "local" + + +async def test_head_missing_raises_object_not_found(backend: LocalBackend) -> None: + ca = _ca_of(b"never-stored") + with pytest.raises(ObjectNotFoundError): + await backend.head(ca) + + +async def test_get_missing_raises_object_not_found(backend: LocalBackend) -> None: + ca = _ca_of(b"never-stored") + with pytest.raises(ObjectNotFoundError): + await backend.get(ca) + + +async def test_put_is_idempotent(backend: LocalBackend) -> None: + data = b"idempotent" + ca = _ca_of(data) + r1 = await backend.put(ca, _stream(data)) + r2 = await backend.put(ca, _stream(data)) + assert r1.size_bytes == r2.size_bytes == len(data) + assert r1.content_address == r2.content_address + + +async def test_delete_returns_true_then_false(backend: LocalBackend) -> None: + data = b"to-delete" + ca = _ca_of(data) + await backend.put(ca, _stream(data)) + + first = await backend.delete(ca) + assert first.deleted is True + assert first.content_address == ca + + second = await backend.delete(ca) + assert second.deleted is False + + +async def test_range_read_returns_subrange(backend: LocalBackend) -> None: + data = bytes(range(100)) # 100 bytes 0..99 + ca = _ca_of(data) + await backend.put(ca, _stream(data)) + + stream = await backend.get(ca, byte_range=(10, 19)) + chunk = await _consume(stream) + assert chunk == data[10:20] + assert len(chunk) == 10 + + +async def test_range_read_rejects_invalid_range(backend: LocalBackend) -> None: + data = b"range-check" + ca = _ca_of(data) + await backend.put(ca, _stream(data)) + stream = await backend.get(ca, byte_range=(20, 5)) + with pytest.raises(ValueError): + await _consume(stream) + + +async def test_health_reports_disk_usage(backend: LocalBackend) -> None: + status = await backend.health() + assert status.healthy is True + assert status.free_bytes is not None + assert status.total_bytes is not None + assert status.free_bytes >= 0 + assert status.backend_id == "local" + + +async def test_health_reports_unhealthy_if_root_missing(tmp_path: Path) -> None: + root = tmp_path / "vanished" + backend = LocalBackend(root) + # Remove the root directory after construction. + root.rmdir() + status = await backend.health() + assert status.healthy is False + assert "root" in status.detail + + +async def test_content_address_validation_blocks_path_traversal() -> None: + # The ContentAddress constructor rejects malformed inputs before they + # ever reach the backend. + with pytest.raises(ValueError): + ContentAddress("sha256:../etc/passwd") + with pytest.raises(ValueError): + ContentAddress("../sha256:deadbeef") + + +async def test_registry_register_get_list_clear(backend: LocalBackend) -> None: + clear_backends() + assert list_backends() == [] + + register_backend(backend) + assert list_backends() == ["local"] + assert get_backend("local") is backend + + with pytest.raises(KeyError): + get_backend("nope") + + clear_backends() + assert list_backends() == [] + + +async def test_idempotent_put_does_not_corrupt_existing(backend: LocalBackend) -> None: + data = b"original-bytes" + ca = _ca_of(data) + await backend.put(ca, _stream(data)) + + # A re-put with the same content address yields the same bytes back. + await backend.put(ca, _stream(data)) + stream = await backend.get(ca) + assert await _consume(stream) == data diff --git a/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md b/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md index 40d3fcf..2af7609 100644 --- a/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md +++ b/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md @@ -192,7 +192,7 @@ Acceptance: ```task id: ARTIFACT-STORE-WP-0001-T003 -status: todo +status: done priority: high state_hub_task_id: "68f9a752-0012-4cc1-8768-ec3f75295e7a" ```