generated from coulomb/repo-seed
src/artifactstore/dataplane/: - spi.py: DataPlane Protocol with the five operations ingest_stream, serve_object, verify_object, delete_object, backend_health (ADR-0004). Dataclasses: IngestHints (size_hint, primary_algorithm, backend_id overrides), IngestResult (primary_digest + sha256_digest + size_bytes + StorageReceipt), VerifyResult (verified bool, mismatch reason, actual digests + size). - inproc.py: InProcessDataPlane wraps one StorageBackend. ingest_stream is two-pass against a tempfile (drain stream while dual-hashing into BLAKE3+SHA-256, then forward the tempfile to backend.put under the primary content address); fsync+cleanup on exception. serve_object passes byte ranges through; verify_object re-reads bytes via backend.get, re-digests with the stored algorithm, and reports mismatches. delete and health are thin pass-throughs. tests/unit/test_dataplane_inproc.py (11 cases): - ingest_stream computes correct dual digests, returns receipt, stores bytes at the content-addressed path. - empty-input ingest returns the BLAKE3/SHA-256 of empty. - serve_object round-trips ingested bytes; supports byte_range. - verify_object verifies intact bytes; detects on-disk corruption. - delete_object passes through (True then False). - backend_health passes through. - IngestHints override of primary_algorithm (sha256-as-primary path). - Missing-object serve raises ObjectNotFoundError. - Architectural test (ADR-0004 invariant): no control-plane module (api / registry / retention / audit) imports artifactstore.storage.backends.* or artifactstore.dataplane.inproc directly. Enforced via AST scan of every .py file in those packages. Gates: ruff clean, mypy --strict clean on 44 files, 70 tests pass. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
200 lines
6.5 KiB
Python
200 lines
6.5 KiB
Python
"""In-process data plane tests (ARTIFACT-STORE-WP-0001-T012)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
from collections.abc import AsyncIterator
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from artifactstore.dataplane import (
|
|
IngestHints,
|
|
InProcessDataPlane,
|
|
)
|
|
from artifactstore.identity import PRIMARY_ALGORITHM, digest_bytes
|
|
from artifactstore.storage import LocalBackend, ObjectNotFoundError
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
SRC_DIR = REPO_ROOT / "src" / "artifactstore"
|
|
|
|
|
|
async def _stream(data: bytes, chunk: int = 16) -> AsyncIterator[bytes]:
|
|
for i in range(0, len(data), chunk):
|
|
yield data[i : i + chunk]
|
|
|
|
|
|
async def _consume(it: AsyncIterator[bytes]) -> bytes:
|
|
out = bytearray()
|
|
async for chunk in it:
|
|
out.extend(chunk)
|
|
return bytes(out)
|
|
|
|
|
|
@pytest.fixture
|
|
def backend(tmp_path: Path) -> LocalBackend:
|
|
return LocalBackend(tmp_path / "store", backend_id="local")
|
|
|
|
|
|
@pytest.fixture
|
|
def dataplane(backend: LocalBackend, tmp_path: Path) -> InProcessDataPlane:
|
|
return InProcessDataPlane(backend, tmp_dir=tmp_path / "dp-tmp")
|
|
|
|
|
|
async def test_ingest_stream_computes_dual_digest_and_stores(
|
|
dataplane: InProcessDataPlane,
|
|
backend: LocalBackend,
|
|
) -> None:
|
|
data = b"the quick brown fox" * 500
|
|
expected = digest_bytes(data)
|
|
|
|
result = await dataplane.ingest_stream(_stream(data))
|
|
|
|
assert result.primary_digest == expected.primary
|
|
assert result.sha256_digest == expected.sha256
|
|
assert result.size_bytes == len(data)
|
|
assert result.receipt.content_address == expected.primary.content_address
|
|
assert result.receipt.backend_id == backend.backend_id
|
|
|
|
# Verify the backend actually stored the bytes at the content address.
|
|
stream = await backend.get(expected.primary.content_address)
|
|
assert await _consume(stream) == data
|
|
|
|
|
|
async def test_ingest_stream_empty_input_is_handled(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
result = await dataplane.ingest_stream(_stream(b""))
|
|
assert result.size_bytes == 0
|
|
expected = digest_bytes(b"")
|
|
assert result.primary_digest == expected.primary
|
|
|
|
|
|
async def test_serve_object_returns_ingested_bytes(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
data = b"serve-me" * 1000
|
|
result = await dataplane.ingest_stream(_stream(data))
|
|
stream = await dataplane.serve_object(result.receipt.content_address)
|
|
assert await _consume(stream) == data
|
|
|
|
|
|
async def test_serve_object_supports_byte_range(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
data = bytes(range(200))
|
|
result = await dataplane.ingest_stream(_stream(data))
|
|
stream = await dataplane.serve_object(
|
|
result.receipt.content_address,
|
|
byte_range=(50, 79),
|
|
)
|
|
chunk = await _consume(stream)
|
|
assert chunk == data[50:80]
|
|
|
|
|
|
async def test_verify_object_succeeds_for_intact_bytes(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
data = b"verify-intact" * 100
|
|
result = await dataplane.ingest_stream(_stream(data))
|
|
verify = await dataplane.verify_object(result.receipt.content_address)
|
|
assert verify.verified is True
|
|
assert verify.mismatch is None
|
|
assert verify.actual_primary_digest == result.primary_digest
|
|
assert verify.actual_sha256_digest == result.sha256_digest
|
|
assert verify.actual_size_bytes == len(data)
|
|
|
|
|
|
async def test_verify_object_detects_corruption(
|
|
dataplane: InProcessDataPlane,
|
|
backend: LocalBackend,
|
|
) -> None:
|
|
data = b"will-be-corrupted" * 50
|
|
result = await dataplane.ingest_stream(_stream(data))
|
|
|
|
# Corrupt the on-disk bytes directly behind the data plane's back.
|
|
object_path = backend.root / result.receipt.object_key
|
|
object_path.write_bytes(b"corrupted!!")
|
|
|
|
verify = await dataplane.verify_object(result.receipt.content_address)
|
|
assert verify.verified is False
|
|
assert verify.mismatch is not None
|
|
assert "mismatch" in verify.mismatch
|
|
|
|
|
|
async def test_delete_object_passes_through(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
data = b"to-be-deleted"
|
|
result = await dataplane.ingest_stream(_stream(data))
|
|
|
|
deletion = await dataplane.delete_object(result.receipt.content_address)
|
|
assert deletion.deleted is True
|
|
|
|
# Second delete is idempotent.
|
|
deletion2 = await dataplane.delete_object(result.receipt.content_address)
|
|
assert deletion2.deleted is False
|
|
|
|
|
|
async def test_backend_health_passes_through(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
status = await dataplane.backend_health()
|
|
assert status.healthy is True
|
|
assert status.backend_id == "local"
|
|
|
|
|
|
async def test_ingest_hints_override_primary_algorithm(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
data = b"sha-as-primary"
|
|
result = await dataplane.ingest_stream(
|
|
_stream(data), hints=IngestHints(primary_algorithm="sha256")
|
|
)
|
|
assert result.primary_digest.algorithm == "sha256"
|
|
# The interop digest is still SHA-256 by definition.
|
|
assert result.sha256_digest.algorithm == "sha256"
|
|
# In this special case both digests reduce to the same value.
|
|
assert result.primary_digest.hex == result.sha256_digest.hex
|
|
|
|
|
|
async def test_serve_missing_object_propagates_object_not_found(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
ca = digest_bytes(b"never-stored", primary=PRIMARY_ALGORITHM).primary.content_address
|
|
with pytest.raises(ObjectNotFoundError):
|
|
await dataplane.serve_object(ca)
|
|
|
|
|
|
# ----- Architectural test (ADR-0004) ----------------------------------------
|
|
|
|
|
|
def _module_imports(path: Path) -> set[str]:
|
|
tree = ast.parse(path.read_text(encoding="utf-8"))
|
|
imports: set[str] = set()
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.ImportFrom) and node.module:
|
|
imports.add(node.module)
|
|
elif isinstance(node, ast.Import):
|
|
for alias in node.names:
|
|
imports.add(alias.name)
|
|
return imports
|
|
|
|
|
|
def test_control_plane_does_not_import_backends_directly() -> None:
|
|
"""ADR-0004 invariant: registry / api / retention / audit talk to bytes
|
|
only through the data plane and storage SPI surfaces, never through the
|
|
concrete backend or in-process implementation modules."""
|
|
control_plane = ["api", "registry", "retention", "audit"]
|
|
forbidden_prefixes = (
|
|
"artifactstore.storage.backends",
|
|
"artifactstore.dataplane.inproc",
|
|
)
|
|
offenders: list[str] = []
|
|
for cp in control_plane:
|
|
for py_file in (SRC_DIR / cp).rglob("*.py"):
|
|
for mod in _module_imports(py_file):
|
|
if mod.startswith(forbidden_prefixes):
|
|
offenders.append(f"{py_file.relative_to(REPO_ROOT)} imports {mod}")
|
|
assert not offenders, "\n".join(offenders)
|