diff --git a/src/artifactstore/dataplane/__init__.py b/src/artifactstore/dataplane/__init__.py index ce95833..0901640 100644 --- a/src/artifactstore/dataplane/__init__.py +++ b/src/artifactstore/dataplane/__init__.py @@ -1,5 +1,22 @@ -"""Data plane SPI and in-process implementation. +"""Data plane SPI and in-process implementation (ADR-0004). -The SPI lands in ARTIFACT-STORE-WP-0001-T012. See ADR-0004 for the -control-plane / data-plane contract that this module isolates. +The control plane never touches storage backends directly. All byte +movement crosses the data-plane SPI, which can be swapped to a remote +(future Rust) implementation without API churn. """ + +from artifactstore.dataplane.inproc import InProcessDataPlane +from artifactstore.dataplane.spi import ( + DataPlane, + IngestHints, + IngestResult, + VerifyResult, +) + +__all__ = [ + "DataPlane", + "InProcessDataPlane", + "IngestHints", + "IngestResult", + "VerifyResult", +] diff --git a/src/artifactstore/dataplane/inproc.py b/src/artifactstore/dataplane/inproc.py new file mode 100644 index 0000000..3546532 --- /dev/null +++ b/src/artifactstore/dataplane/inproc.py @@ -0,0 +1,154 @@ +"""In-process data plane implementation (ADR-0004). + +Ingestion is two-pass against a tempfile: stream the input once while +dual-hashing into BLAKE3 + SHA-256, then stream the tempfile into the +configured :class:`StorageBackend`. Two disk passes for a local backend +is acceptable for v1; an optimisation pass (backend-supplied writer) +is deferred to the Rust data-plane workplan. +""" + +from __future__ import annotations + +import os +import tempfile +from collections.abc import AsyncIterator +from pathlib import Path + +from artifactstore.dataplane.spi import ( + IngestHints, + IngestResult, + VerifyResult, +) +from artifactstore.identity import ( + INTEROP_ALGORITHM, + PRIMARY_ALGORITHM, + ContentAddress, + Digest, + digest_stream, + get_algorithm, +) +from artifactstore.storage.spi import ( + BackendStatus, + DeletionResult, + StorageBackend, +) + +__all__ = ["InProcessDataPlane"] + +_DEFAULT_CHUNK_SIZE = 64 * 1024 + + +class InProcessDataPlane: + """The v1 data plane: wraps one :class:`StorageBackend`.""" + + def __init__( + self, + backend: StorageBackend, + *, + tmp_dir: str | os.PathLike[str] | None = None, + chunk_size: int = _DEFAULT_CHUNK_SIZE, + ) -> None: + self._backend = backend + self._tmp_dir = Path(tmp_dir) if tmp_dir is not None else None + self._chunk_size = chunk_size + + @property + def backend(self) -> StorageBackend: + return self._backend + + async def ingest_stream( + self, + stream: AsyncIterator[bytes], + *, + hints: IngestHints | None = None, + ) -> IngestResult: + effective_hints = hints or IngestHints() + primary_name = effective_hints.primary_algorithm or PRIMARY_ALGORITHM + + primary_h = get_algorithm(primary_name)() + sha_h = get_algorithm(INTEROP_ALGORITHM)() + size = 0 + + if self._tmp_dir is not None: + self._tmp_dir.mkdir(parents=True, exist_ok=True) + tmp_dir = str(self._tmp_dir) if self._tmp_dir is not None else None + fd, tmp_path_str = tempfile.mkstemp(prefix="artifactstore-ingest-", dir=tmp_dir) + tmp_path = Path(tmp_path_str) + try: + with os.fdopen(fd, "wb") as fh: + async for chunk in stream: + fh.write(chunk) + primary_h.update(chunk) + sha_h.update(chunk) + size += len(chunk) + fh.flush() + os.fsync(fh.fileno()) + + primary_digest = Digest(algorithm=primary_name, hex=primary_h.hexdigest()) + sha_digest = Digest(algorithm=INTEROP_ALGORITHM, hex=sha_h.hexdigest()) + content_address = primary_digest.content_address + + receipt = await self._backend.put( + content_address, + _file_chunks(tmp_path, self._chunk_size), + size_hint=size, + ) + finally: + tmp_path.unlink(missing_ok=True) + + return IngestResult( + primary_digest=primary_digest, + sha256_digest=sha_digest, + size_bytes=size, + receipt=receipt, + ) + + async def serve_object( + self, + content_address: ContentAddress, + *, + byte_range: tuple[int, int] | None = None, + ) -> AsyncIterator[bytes]: + return await self._backend.get(content_address, byte_range=byte_range) + + async def verify_object(self, content_address: ContentAddress) -> VerifyResult: + expected = content_address.to_digest() + primary_name = expected.algorithm + stream = await self._backend.get(content_address) + pair = await digest_stream(stream, primary=primary_name) + + head = await self._backend.head(content_address) + actual_size = head.size_bytes + + if pair.primary.hex == expected.hex: + return VerifyResult( + content_address=content_address, + verified=True, + actual_primary_digest=pair.primary, + actual_sha256_digest=pair.sha256, + actual_size_bytes=actual_size, + mismatch=None, + ) + return VerifyResult( + content_address=content_address, + verified=False, + actual_primary_digest=pair.primary, + actual_sha256_digest=pair.sha256, + actual_size_bytes=actual_size, + mismatch=(f"primary digest mismatch: expected {expected.hex}, got {pair.primary.hex}"), + ) + + async def delete_object(self, content_address: ContentAddress) -> DeletionResult: + return await self._backend.delete(content_address) + + async def backend_health(self) -> BackendStatus: + return await self._backend.health() + + +async def _file_chunks(path: Path, chunk_size: int) -> AsyncIterator[bytes]: + with open(path, "rb") as fh: + while True: + chunk = fh.read(chunk_size) + if not chunk: + break + yield chunk diff --git a/src/artifactstore/dataplane/spi.py b/src/artifactstore/dataplane/spi.py new file mode 100644 index 0000000..a06675c --- /dev/null +++ b/src/artifactstore/dataplane/spi.py @@ -0,0 +1,93 @@ +"""Data plane SPI (ADR-0004). + +The control plane (``registry`` / ``api`` / ``retention`` / ``audit``) +interacts with bytes exclusively through this surface. v1 ships an +in-process implementation backed by a :class:`StorageBackend`; a future +Rust daemon will satisfy the same protocol over a Unix socket. + +The SPI uses CBOR-serialisable input and output types so the in-process +implementation and a future RPC implementation share representations. +""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from dataclasses import dataclass +from typing import Protocol, runtime_checkable + +from artifactstore.identity import ContentAddress, Digest +from artifactstore.storage.spi import BackendStatus, DeletionResult, StorageReceipt + +__all__ = [ + "DataPlane", + "IngestHints", + "IngestResult", + "VerifyResult", +] + + +@dataclass(frozen=True, slots=True) +class IngestHints: + """Optional hints attached to an ingest call. + + ``size_hint`` lets backends pre-allocate or pick an upload strategy. + ``primary_algorithm`` overrides the default primary hash (BLAKE3). + ``backend_id`` lets the data plane select among configured backends + (ignored when only one backend is configured). + """ + + size_hint: int | None = None + primary_algorithm: str | None = None + backend_id: str | None = None + + +@dataclass(frozen=True, slots=True) +class IngestResult: + """Return value of :meth:`DataPlane.ingest_stream`.""" + + primary_digest: Digest + sha256_digest: Digest + size_bytes: int + receipt: StorageReceipt + + +@dataclass(frozen=True, slots=True) +class VerifyResult: + """Return value of :meth:`DataPlane.verify_object`. + + ``verified`` is ``True`` when the bytes currently held at + ``content_address`` re-digest to the expected primary digest. ``mismatch`` + is populated with a short explanation when verification fails. + """ + + content_address: ContentAddress + verified: bool + actual_primary_digest: Digest + actual_sha256_digest: Digest + actual_size_bytes: int + mismatch: str | None = None + + +@runtime_checkable +class DataPlane(Protocol): + """Byte-handling surface used by the control plane.""" + + async def ingest_stream( + self, + stream: AsyncIterator[bytes], + *, + hints: IngestHints | None = None, + ) -> IngestResult: ... + + async def serve_object( + self, + content_address: ContentAddress, + *, + byte_range: tuple[int, int] | None = None, + ) -> AsyncIterator[bytes]: ... + + async def verify_object(self, content_address: ContentAddress) -> VerifyResult: ... + + async def delete_object(self, content_address: ContentAddress) -> DeletionResult: ... + + async def backend_health(self) -> BackendStatus: ... diff --git a/tests/unit/test_dataplane_inproc.py b/tests/unit/test_dataplane_inproc.py new file mode 100644 index 0000000..5f50df4 --- /dev/null +++ b/tests/unit/test_dataplane_inproc.py @@ -0,0 +1,199 @@ +"""In-process data plane tests (ARTIFACT-STORE-WP-0001-T012).""" + +from __future__ import annotations + +import ast +from collections.abc import AsyncIterator +from pathlib import Path + +import pytest + +from artifactstore.dataplane import ( + IngestHints, + InProcessDataPlane, +) +from artifactstore.identity import PRIMARY_ALGORITHM, digest_bytes +from artifactstore.storage import LocalBackend, ObjectNotFoundError + +REPO_ROOT = Path(__file__).resolve().parents[2] +SRC_DIR = REPO_ROOT / "src" / "artifactstore" + + +async def _stream(data: bytes, chunk: int = 16) -> AsyncIterator[bytes]: + for i in range(0, len(data), chunk): + yield data[i : i + chunk] + + +async def _consume(it: AsyncIterator[bytes]) -> bytes: + out = bytearray() + async for chunk in it: + out.extend(chunk) + return bytes(out) + + +@pytest.fixture +def backend(tmp_path: Path) -> LocalBackend: + return LocalBackend(tmp_path / "store", backend_id="local") + + +@pytest.fixture +def dataplane(backend: LocalBackend, tmp_path: Path) -> InProcessDataPlane: + return InProcessDataPlane(backend, tmp_dir=tmp_path / "dp-tmp") + + +async def test_ingest_stream_computes_dual_digest_and_stores( + dataplane: InProcessDataPlane, + backend: LocalBackend, +) -> None: + data = b"the quick brown fox" * 500 + expected = digest_bytes(data) + + result = await dataplane.ingest_stream(_stream(data)) + + assert result.primary_digest == expected.primary + assert result.sha256_digest == expected.sha256 + assert result.size_bytes == len(data) + assert result.receipt.content_address == expected.primary.content_address + assert result.receipt.backend_id == backend.backend_id + + # Verify the backend actually stored the bytes at the content address. + stream = await backend.get(expected.primary.content_address) + assert await _consume(stream) == data + + +async def test_ingest_stream_empty_input_is_handled( + dataplane: InProcessDataPlane, +) -> None: + result = await dataplane.ingest_stream(_stream(b"")) + assert result.size_bytes == 0 + expected = digest_bytes(b"") + assert result.primary_digest == expected.primary + + +async def test_serve_object_returns_ingested_bytes( + dataplane: InProcessDataPlane, +) -> None: + data = b"serve-me" * 1000 + result = await dataplane.ingest_stream(_stream(data)) + stream = await dataplane.serve_object(result.receipt.content_address) + assert await _consume(stream) == data + + +async def test_serve_object_supports_byte_range( + dataplane: InProcessDataPlane, +) -> None: + data = bytes(range(200)) + result = await dataplane.ingest_stream(_stream(data)) + stream = await dataplane.serve_object( + result.receipt.content_address, + byte_range=(50, 79), + ) + chunk = await _consume(stream) + assert chunk == data[50:80] + + +async def test_verify_object_succeeds_for_intact_bytes( + dataplane: InProcessDataPlane, +) -> None: + data = b"verify-intact" * 100 + result = await dataplane.ingest_stream(_stream(data)) + verify = await dataplane.verify_object(result.receipt.content_address) + assert verify.verified is True + assert verify.mismatch is None + assert verify.actual_primary_digest == result.primary_digest + assert verify.actual_sha256_digest == result.sha256_digest + assert verify.actual_size_bytes == len(data) + + +async def test_verify_object_detects_corruption( + dataplane: InProcessDataPlane, + backend: LocalBackend, +) -> None: + data = b"will-be-corrupted" * 50 + result = await dataplane.ingest_stream(_stream(data)) + + # Corrupt the on-disk bytes directly behind the data plane's back. + object_path = backend.root / result.receipt.object_key + object_path.write_bytes(b"corrupted!!") + + verify = await dataplane.verify_object(result.receipt.content_address) + assert verify.verified is False + assert verify.mismatch is not None + assert "mismatch" in verify.mismatch + + +async def test_delete_object_passes_through( + dataplane: InProcessDataPlane, +) -> None: + data = b"to-be-deleted" + result = await dataplane.ingest_stream(_stream(data)) + + deletion = await dataplane.delete_object(result.receipt.content_address) + assert deletion.deleted is True + + # Second delete is idempotent. + deletion2 = await dataplane.delete_object(result.receipt.content_address) + assert deletion2.deleted is False + + +async def test_backend_health_passes_through( + dataplane: InProcessDataPlane, +) -> None: + status = await dataplane.backend_health() + assert status.healthy is True + assert status.backend_id == "local" + + +async def test_ingest_hints_override_primary_algorithm( + dataplane: InProcessDataPlane, +) -> None: + data = b"sha-as-primary" + result = await dataplane.ingest_stream( + _stream(data), hints=IngestHints(primary_algorithm="sha256") + ) + assert result.primary_digest.algorithm == "sha256" + # The interop digest is still SHA-256 by definition. + assert result.sha256_digest.algorithm == "sha256" + # In this special case both digests reduce to the same value. + assert result.primary_digest.hex == result.sha256_digest.hex + + +async def test_serve_missing_object_propagates_object_not_found( + dataplane: InProcessDataPlane, +) -> None: + ca = digest_bytes(b"never-stored", primary=PRIMARY_ALGORITHM).primary.content_address + with pytest.raises(ObjectNotFoundError): + await dataplane.serve_object(ca) + + +# ----- Architectural test (ADR-0004) ---------------------------------------- + + +def _module_imports(path: Path) -> set[str]: + tree = ast.parse(path.read_text(encoding="utf-8")) + imports: set[str] = set() + for node in ast.walk(tree): + if isinstance(node, ast.ImportFrom) and node.module: + imports.add(node.module) + elif isinstance(node, ast.Import): + for alias in node.names: + imports.add(alias.name) + return imports + + +def test_control_plane_does_not_import_backends_directly() -> None: + """ADR-0004 invariant: registry / api / retention / audit talk to bytes + only through the data plane and storage SPI surfaces, never through the + concrete backend or in-process implementation modules.""" + control_plane = ["api", "registry", "retention", "audit"] + forbidden_prefixes = ( + "artifactstore.storage.backends", + "artifactstore.dataplane.inproc", + ) + offenders: list[str] = [] + for cp in control_plane: + for py_file in (SRC_DIR / cp).rglob("*.py"): + for mod in _module_imports(py_file): + if mod.startswith(forbidden_prefixes): + offenders.append(f"{py_file.relative_to(REPO_ROOT)} imports {mod}") + assert not offenders, "\n".join(offenders) diff --git a/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md b/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md index 2af7609..505979b 100644 --- a/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md +++ b/workplans/ARTIFACT-STORE-WP-0001-service-baseline.md @@ -213,7 +213,7 @@ Acceptance: ```task id: ARTIFACT-STORE-WP-0001-T012 -status: todo +status: done priority: high state_hub_task_id: "8cb8a245-beb5-4713-8d1d-8a623431ad81" ```