generated from coulomb/repo-seed
218 lines
7.1 KiB
Python
218 lines
7.1 KiB
Python
"""In-process data plane tests (ARTIFACT-STORE-WP-0001-T012)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
from collections.abc import AsyncIterator
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from artifactstore.dataplane import (
|
|
IngestHints,
|
|
InProcessDataPlane,
|
|
)
|
|
from artifactstore.identity import PRIMARY_ALGORITHM, digest_bytes
|
|
from artifactstore.storage import LocalBackend, ObjectNotFoundError
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
SRC_DIR = REPO_ROOT / "src" / "artifactstore"
|
|
|
|
|
|
async def _stream(data: bytes, chunk: int = 16) -> AsyncIterator[bytes]:
|
|
for i in range(0, len(data), chunk):
|
|
yield data[i : i + chunk]
|
|
|
|
|
|
async def _consume(it: AsyncIterator[bytes]) -> bytes:
|
|
out = bytearray()
|
|
async for chunk in it:
|
|
out.extend(chunk)
|
|
return bytes(out)
|
|
|
|
|
|
@pytest.fixture
|
|
def backend(tmp_path: Path) -> LocalBackend:
|
|
return LocalBackend(tmp_path / "store", backend_id="local")
|
|
|
|
|
|
@pytest.fixture
|
|
def dataplane(backend: LocalBackend, tmp_path: Path) -> InProcessDataPlane:
|
|
return InProcessDataPlane(backend, tmp_dir=tmp_path / "dp-tmp")
|
|
|
|
|
|
async def test_ingest_stream_computes_dual_digest_and_stores(
|
|
dataplane: InProcessDataPlane,
|
|
backend: LocalBackend,
|
|
) -> None:
|
|
data = b"the quick brown fox" * 500
|
|
expected = digest_bytes(data)
|
|
|
|
result = await dataplane.ingest_stream(_stream(data))
|
|
|
|
assert result.primary_digest == expected.primary
|
|
assert result.sha256_digest == expected.sha256
|
|
assert result.size_bytes == len(data)
|
|
assert result.receipt.content_address == expected.primary.content_address
|
|
assert result.receipt.backend_id == backend.backend_id
|
|
|
|
# Verify the backend actually stored the bytes at the content address.
|
|
stream = await backend.get(expected.primary.content_address)
|
|
assert await _consume(stream) == data
|
|
|
|
|
|
async def test_ingest_stream_empty_input_is_handled(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
result = await dataplane.ingest_stream(_stream(b""))
|
|
assert result.size_bytes == 0
|
|
expected = digest_bytes(b"")
|
|
assert result.primary_digest == expected.primary
|
|
|
|
|
|
async def test_serve_object_returns_ingested_bytes(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
data = b"serve-me" * 1000
|
|
result = await dataplane.ingest_stream(_stream(data))
|
|
stream = await dataplane.serve_object(result.receipt.content_address)
|
|
assert await _consume(stream) == data
|
|
|
|
|
|
async def test_serve_object_supports_byte_range(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
data = bytes(range(200))
|
|
result = await dataplane.ingest_stream(_stream(data))
|
|
stream = await dataplane.serve_object(
|
|
result.receipt.content_address,
|
|
byte_range=(50, 79),
|
|
)
|
|
chunk = await _consume(stream)
|
|
assert chunk == data[50:80]
|
|
|
|
|
|
async def test_verify_object_succeeds_for_intact_bytes(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
data = b"verify-intact" * 100
|
|
result = await dataplane.ingest_stream(_stream(data))
|
|
verify = await dataplane.verify_object(result.receipt.content_address)
|
|
assert verify.verified is True
|
|
assert verify.mismatch is None
|
|
assert verify.actual_primary_digest == result.primary_digest
|
|
assert verify.actual_sha256_digest == result.sha256_digest
|
|
assert verify.actual_size_bytes == len(data)
|
|
|
|
|
|
async def test_verify_object_detects_corruption(
|
|
dataplane: InProcessDataPlane,
|
|
backend: LocalBackend,
|
|
) -> None:
|
|
data = b"will-be-corrupted" * 50
|
|
result = await dataplane.ingest_stream(_stream(data))
|
|
|
|
# Corrupt the on-disk bytes directly behind the data plane's back.
|
|
object_path = backend.root / result.receipt.object_key
|
|
object_path.write_bytes(b"corrupted!!")
|
|
|
|
verify = await dataplane.verify_object(result.receipt.content_address)
|
|
assert verify.verified is False
|
|
assert verify.mismatch is not None
|
|
assert "mismatch" in verify.mismatch
|
|
|
|
|
|
async def test_delete_object_passes_through(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
data = b"to-be-deleted"
|
|
result = await dataplane.ingest_stream(_stream(data))
|
|
|
|
deletion = await dataplane.delete_object(result.receipt.content_address)
|
|
assert deletion.deleted is True
|
|
|
|
# Second delete is idempotent.
|
|
deletion2 = await dataplane.delete_object(result.receipt.content_address)
|
|
assert deletion2.deleted is False
|
|
|
|
|
|
async def test_backend_health_passes_through(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
status = await dataplane.backend_health()
|
|
assert status.healthy is True
|
|
assert status.backend_id == "local"
|
|
|
|
|
|
async def test_ingest_hints_override_primary_algorithm(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
data = b"sha-as-primary"
|
|
result = await dataplane.ingest_stream(
|
|
_stream(data), hints=IngestHints(primary_algorithm="sha256")
|
|
)
|
|
assert result.primary_digest.algorithm == "sha256"
|
|
# The interop digest is still SHA-256 by definition.
|
|
assert result.sha256_digest.algorithm == "sha256"
|
|
# In this special case both digests reduce to the same value.
|
|
assert result.primary_digest.hex == result.sha256_digest.hex
|
|
|
|
|
|
async def test_ingest_hints_route_to_named_backend(tmp_path: Path) -> None:
|
|
local = LocalBackend(tmp_path / "local", backend_id="local")
|
|
archive = LocalBackend(tmp_path / "archive", backend_id="archive")
|
|
dp = InProcessDataPlane(
|
|
{"local": local, "archive": archive},
|
|
default_backend_id="local",
|
|
)
|
|
|
|
result = await dp.ingest_stream(
|
|
_stream(b"route-me"),
|
|
hints=IngestHints(backend_id="archive"),
|
|
)
|
|
|
|
assert result.receipt.backend_id == "archive"
|
|
assert not (local.root / result.receipt.object_key).exists()
|
|
assert (archive.root / result.receipt.object_key).exists()
|
|
|
|
|
|
async def test_serve_missing_object_propagates_object_not_found(
|
|
dataplane: InProcessDataPlane,
|
|
) -> None:
|
|
ca = digest_bytes(b"never-stored", primary=PRIMARY_ALGORITHM).primary.content_address
|
|
with pytest.raises(ObjectNotFoundError):
|
|
await dataplane.serve_object(ca)
|
|
|
|
|
|
# ----- Architectural test (ADR-0004) ----------------------------------------
|
|
|
|
|
|
def _module_imports(path: Path) -> set[str]:
|
|
tree = ast.parse(path.read_text(encoding="utf-8"))
|
|
imports: set[str] = set()
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.ImportFrom) and node.module:
|
|
imports.add(node.module)
|
|
elif isinstance(node, ast.Import):
|
|
for alias in node.names:
|
|
imports.add(alias.name)
|
|
return imports
|
|
|
|
|
|
def test_control_plane_does_not_import_backends_directly() -> None:
|
|
"""ADR-0004 invariant: registry / api / retention / audit talk to bytes
|
|
only through the data plane and storage SPI surfaces, never through the
|
|
concrete backend or in-process implementation modules."""
|
|
control_plane = ["api", "registry", "retention", "audit"]
|
|
forbidden_prefixes = (
|
|
"artifactstore.storage.backends",
|
|
"artifactstore.dataplane.inproc",
|
|
)
|
|
offenders: list[str] = []
|
|
for cp in control_plane:
|
|
for py_file in (SRC_DIR / cp).rglob("*.py"):
|
|
for mod in _module_imports(py_file):
|
|
if mod.startswith(forbidden_prefixes):
|
|
offenders.append(f"{py_file.relative_to(REPO_ROOT)} imports {mod}")
|
|
assert not offenders, "\n".join(offenders)
|