WP-0001-T012: data plane SPI and in-process implementation

src/artifactstore/dataplane/:
- spi.py: DataPlane Protocol with the five operations ingest_stream,
  serve_object, verify_object, delete_object, backend_health
  (ADR-0004). Dataclasses: IngestHints (size_hint, primary_algorithm,
  backend_id overrides), IngestResult (primary_digest + sha256_digest +
  size_bytes + StorageReceipt), VerifyResult (verified bool, mismatch
  reason, actual digests + size).
- inproc.py: InProcessDataPlane wraps one StorageBackend. ingest_stream
  is two-pass against a tempfile (drain stream while dual-hashing into
  BLAKE3+SHA-256, then forward the tempfile to backend.put under the
  primary content address); fsync+cleanup on exception. serve_object
  passes byte ranges through; verify_object re-reads bytes via backend.get,
  re-digests with the stored algorithm, and reports mismatches. delete
  and health are thin pass-throughs.

tests/unit/test_dataplane_inproc.py (11 cases):
- ingest_stream computes correct dual digests, returns receipt, stores
  bytes at the content-addressed path.
- empty-input ingest returns the BLAKE3/SHA-256 of empty.
- serve_object round-trips ingested bytes; supports byte_range.
- verify_object verifies intact bytes; detects on-disk corruption.
- delete_object passes through (True then False).
- backend_health passes through.
- IngestHints override of primary_algorithm (sha256-as-primary path).
- Missing-object serve raises ObjectNotFoundError.
- Architectural test (ADR-0004 invariant): no control-plane module
  (api / registry / retention / audit) imports
  artifactstore.storage.backends.* or artifactstore.dataplane.inproc
  directly. Enforced via AST scan of every .py file in those packages.

Gates: ruff clean, mypy --strict clean on 44 files, 70 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-16 02:07:05 +02:00
parent 3d50564d12
commit 5ff02a2bf3
5 changed files with 467 additions and 4 deletions

View File

@@ -1,5 +1,22 @@
"""Data plane SPI and in-process implementation.
"""Data plane SPI and in-process implementation (ADR-0004).
The SPI lands in ARTIFACT-STORE-WP-0001-T012. See ADR-0004 for the
control-plane / data-plane contract that this module isolates.
The control plane never touches storage backends directly. All byte
movement crosses the data-plane SPI, which can be swapped to a remote
(future Rust) implementation without API churn.
"""
from artifactstore.dataplane.inproc import InProcessDataPlane
from artifactstore.dataplane.spi import (
DataPlane,
IngestHints,
IngestResult,
VerifyResult,
)
__all__ = [
"DataPlane",
"InProcessDataPlane",
"IngestHints",
"IngestResult",
"VerifyResult",
]

View File

@@ -0,0 +1,154 @@
"""In-process data plane implementation (ADR-0004).
Ingestion is two-pass against a tempfile: stream the input once while
dual-hashing into BLAKE3 + SHA-256, then stream the tempfile into the
configured :class:`StorageBackend`. Two disk passes for a local backend
is acceptable for v1; an optimisation pass (backend-supplied writer)
is deferred to the Rust data-plane workplan.
"""
from __future__ import annotations
import os
import tempfile
from collections.abc import AsyncIterator
from pathlib import Path
from artifactstore.dataplane.spi import (
IngestHints,
IngestResult,
VerifyResult,
)
from artifactstore.identity import (
INTEROP_ALGORITHM,
PRIMARY_ALGORITHM,
ContentAddress,
Digest,
digest_stream,
get_algorithm,
)
from artifactstore.storage.spi import (
BackendStatus,
DeletionResult,
StorageBackend,
)
__all__ = ["InProcessDataPlane"]
_DEFAULT_CHUNK_SIZE = 64 * 1024
class InProcessDataPlane:
"""The v1 data plane: wraps one :class:`StorageBackend`."""
def __init__(
self,
backend: StorageBackend,
*,
tmp_dir: str | os.PathLike[str] | None = None,
chunk_size: int = _DEFAULT_CHUNK_SIZE,
) -> None:
self._backend = backend
self._tmp_dir = Path(tmp_dir) if tmp_dir is not None else None
self._chunk_size = chunk_size
@property
def backend(self) -> StorageBackend:
return self._backend
async def ingest_stream(
self,
stream: AsyncIterator[bytes],
*,
hints: IngestHints | None = None,
) -> IngestResult:
effective_hints = hints or IngestHints()
primary_name = effective_hints.primary_algorithm or PRIMARY_ALGORITHM
primary_h = get_algorithm(primary_name)()
sha_h = get_algorithm(INTEROP_ALGORITHM)()
size = 0
if self._tmp_dir is not None:
self._tmp_dir.mkdir(parents=True, exist_ok=True)
tmp_dir = str(self._tmp_dir) if self._tmp_dir is not None else None
fd, tmp_path_str = tempfile.mkstemp(prefix="artifactstore-ingest-", dir=tmp_dir)
tmp_path = Path(tmp_path_str)
try:
with os.fdopen(fd, "wb") as fh:
async for chunk in stream:
fh.write(chunk)
primary_h.update(chunk)
sha_h.update(chunk)
size += len(chunk)
fh.flush()
os.fsync(fh.fileno())
primary_digest = Digest(algorithm=primary_name, hex=primary_h.hexdigest())
sha_digest = Digest(algorithm=INTEROP_ALGORITHM, hex=sha_h.hexdigest())
content_address = primary_digest.content_address
receipt = await self._backend.put(
content_address,
_file_chunks(tmp_path, self._chunk_size),
size_hint=size,
)
finally:
tmp_path.unlink(missing_ok=True)
return IngestResult(
primary_digest=primary_digest,
sha256_digest=sha_digest,
size_bytes=size,
receipt=receipt,
)
async def serve_object(
self,
content_address: ContentAddress,
*,
byte_range: tuple[int, int] | None = None,
) -> AsyncIterator[bytes]:
return await self._backend.get(content_address, byte_range=byte_range)
async def verify_object(self, content_address: ContentAddress) -> VerifyResult:
expected = content_address.to_digest()
primary_name = expected.algorithm
stream = await self._backend.get(content_address)
pair = await digest_stream(stream, primary=primary_name)
head = await self._backend.head(content_address)
actual_size = head.size_bytes
if pair.primary.hex == expected.hex:
return VerifyResult(
content_address=content_address,
verified=True,
actual_primary_digest=pair.primary,
actual_sha256_digest=pair.sha256,
actual_size_bytes=actual_size,
mismatch=None,
)
return VerifyResult(
content_address=content_address,
verified=False,
actual_primary_digest=pair.primary,
actual_sha256_digest=pair.sha256,
actual_size_bytes=actual_size,
mismatch=(f"primary digest mismatch: expected {expected.hex}, got {pair.primary.hex}"),
)
async def delete_object(self, content_address: ContentAddress) -> DeletionResult:
return await self._backend.delete(content_address)
async def backend_health(self) -> BackendStatus:
return await self._backend.health()
async def _file_chunks(path: Path, chunk_size: int) -> AsyncIterator[bytes]:
with open(path, "rb") as fh:
while True:
chunk = fh.read(chunk_size)
if not chunk:
break
yield chunk

View File

@@ -0,0 +1,93 @@
"""Data plane SPI (ADR-0004).
The control plane (``registry`` / ``api`` / ``retention`` / ``audit``)
interacts with bytes exclusively through this surface. v1 ships an
in-process implementation backed by a :class:`StorageBackend`; a future
Rust daemon will satisfy the same protocol over a Unix socket.
The SPI uses CBOR-serialisable input and output types so the in-process
implementation and a future RPC implementation share representations.
"""
from __future__ import annotations
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import Protocol, runtime_checkable
from artifactstore.identity import ContentAddress, Digest
from artifactstore.storage.spi import BackendStatus, DeletionResult, StorageReceipt
__all__ = [
"DataPlane",
"IngestHints",
"IngestResult",
"VerifyResult",
]
@dataclass(frozen=True, slots=True)
class IngestHints:
"""Optional hints attached to an ingest call.
``size_hint`` lets backends pre-allocate or pick an upload strategy.
``primary_algorithm`` overrides the default primary hash (BLAKE3).
``backend_id`` lets the data plane select among configured backends
(ignored when only one backend is configured).
"""
size_hint: int | None = None
primary_algorithm: str | None = None
backend_id: str | None = None
@dataclass(frozen=True, slots=True)
class IngestResult:
"""Return value of :meth:`DataPlane.ingest_stream`."""
primary_digest: Digest
sha256_digest: Digest
size_bytes: int
receipt: StorageReceipt
@dataclass(frozen=True, slots=True)
class VerifyResult:
"""Return value of :meth:`DataPlane.verify_object`.
``verified`` is ``True`` when the bytes currently held at
``content_address`` re-digest to the expected primary digest. ``mismatch``
is populated with a short explanation when verification fails.
"""
content_address: ContentAddress
verified: bool
actual_primary_digest: Digest
actual_sha256_digest: Digest
actual_size_bytes: int
mismatch: str | None = None
@runtime_checkable
class DataPlane(Protocol):
"""Byte-handling surface used by the control plane."""
async def ingest_stream(
self,
stream: AsyncIterator[bytes],
*,
hints: IngestHints | None = None,
) -> IngestResult: ...
async def serve_object(
self,
content_address: ContentAddress,
*,
byte_range: tuple[int, int] | None = None,
) -> AsyncIterator[bytes]: ...
async def verify_object(self, content_address: ContentAddress) -> VerifyResult: ...
async def delete_object(self, content_address: ContentAddress) -> DeletionResult: ...
async def backend_health(self) -> BackendStatus: ...