"""In-process data plane tests (ARTIFACT-STORE-WP-0001-T012).""" from __future__ import annotations import ast from collections.abc import AsyncIterator from pathlib import Path import pytest from artifactstore.dataplane import ( IngestHints, InProcessDataPlane, ) from artifactstore.identity import PRIMARY_ALGORITHM, digest_bytes from artifactstore.storage import LocalBackend, ObjectNotFoundError REPO_ROOT = Path(__file__).resolve().parents[2] SRC_DIR = REPO_ROOT / "src" / "artifactstore" async def _stream(data: bytes, chunk: int = 16) -> AsyncIterator[bytes]: for i in range(0, len(data), chunk): yield data[i : i + chunk] async def _consume(it: AsyncIterator[bytes]) -> bytes: out = bytearray() async for chunk in it: out.extend(chunk) return bytes(out) @pytest.fixture def backend(tmp_path: Path) -> LocalBackend: return LocalBackend(tmp_path / "store", backend_id="local") @pytest.fixture def dataplane(backend: LocalBackend, tmp_path: Path) -> InProcessDataPlane: return InProcessDataPlane(backend, tmp_dir=tmp_path / "dp-tmp") async def test_ingest_stream_computes_dual_digest_and_stores( dataplane: InProcessDataPlane, backend: LocalBackend, ) -> None: data = b"the quick brown fox" * 500 expected = digest_bytes(data) result = await dataplane.ingest_stream(_stream(data)) assert result.primary_digest == expected.primary assert result.sha256_digest == expected.sha256 assert result.size_bytes == len(data) assert result.receipt.content_address == expected.primary.content_address assert result.receipt.backend_id == backend.backend_id # Verify the backend actually stored the bytes at the content address. stream = await backend.get(expected.primary.content_address) assert await _consume(stream) == data async def test_ingest_stream_empty_input_is_handled( dataplane: InProcessDataPlane, ) -> None: result = await dataplane.ingest_stream(_stream(b"")) assert result.size_bytes == 0 expected = digest_bytes(b"") assert result.primary_digest == expected.primary async def test_serve_object_returns_ingested_bytes( dataplane: InProcessDataPlane, ) -> None: data = b"serve-me" * 1000 result = await dataplane.ingest_stream(_stream(data)) stream = await dataplane.serve_object(result.receipt.content_address) assert await _consume(stream) == data async def test_serve_object_supports_byte_range( dataplane: InProcessDataPlane, ) -> None: data = bytes(range(200)) result = await dataplane.ingest_stream(_stream(data)) stream = await dataplane.serve_object( result.receipt.content_address, byte_range=(50, 79), ) chunk = await _consume(stream) assert chunk == data[50:80] async def test_verify_object_succeeds_for_intact_bytes( dataplane: InProcessDataPlane, ) -> None: data = b"verify-intact" * 100 result = await dataplane.ingest_stream(_stream(data)) verify = await dataplane.verify_object(result.receipt.content_address) assert verify.verified is True assert verify.mismatch is None assert verify.actual_primary_digest == result.primary_digest assert verify.actual_sha256_digest == result.sha256_digest assert verify.actual_size_bytes == len(data) async def test_verify_object_detects_corruption( dataplane: InProcessDataPlane, backend: LocalBackend, ) -> None: data = b"will-be-corrupted" * 50 result = await dataplane.ingest_stream(_stream(data)) # Corrupt the on-disk bytes directly behind the data plane's back. object_path = backend.root / result.receipt.object_key object_path.write_bytes(b"corrupted!!") verify = await dataplane.verify_object(result.receipt.content_address) assert verify.verified is False assert verify.mismatch is not None assert "mismatch" in verify.mismatch async def test_delete_object_passes_through( dataplane: InProcessDataPlane, ) -> None: data = b"to-be-deleted" result = await dataplane.ingest_stream(_stream(data)) deletion = await dataplane.delete_object(result.receipt.content_address) assert deletion.deleted is True # Second delete is idempotent. deletion2 = await dataplane.delete_object(result.receipt.content_address) assert deletion2.deleted is False async def test_backend_health_passes_through( dataplane: InProcessDataPlane, ) -> None: status = await dataplane.backend_health() assert status.healthy is True assert status.backend_id == "local" async def test_ingest_hints_override_primary_algorithm( dataplane: InProcessDataPlane, ) -> None: data = b"sha-as-primary" result = await dataplane.ingest_stream( _stream(data), hints=IngestHints(primary_algorithm="sha256") ) assert result.primary_digest.algorithm == "sha256" # The interop digest is still SHA-256 by definition. assert result.sha256_digest.algorithm == "sha256" # In this special case both digests reduce to the same value. assert result.primary_digest.hex == result.sha256_digest.hex async def test_ingest_hints_route_to_named_backend(tmp_path: Path) -> None: local = LocalBackend(tmp_path / "local", backend_id="local") archive = LocalBackend(tmp_path / "archive", backend_id="archive") dp = InProcessDataPlane( {"local": local, "archive": archive}, default_backend_id="local", ) result = await dp.ingest_stream( _stream(b"route-me"), hints=IngestHints(backend_id="archive"), ) assert result.receipt.backend_id == "archive" assert not (local.root / result.receipt.object_key).exists() assert (archive.root / result.receipt.object_key).exists() async def test_serve_missing_object_propagates_object_not_found( dataplane: InProcessDataPlane, ) -> None: ca = digest_bytes(b"never-stored", primary=PRIMARY_ALGORITHM).primary.content_address with pytest.raises(ObjectNotFoundError): await dataplane.serve_object(ca) # ----- Architectural test (ADR-0004) ---------------------------------------- def _module_imports(path: Path) -> set[str]: tree = ast.parse(path.read_text(encoding="utf-8")) imports: set[str] = set() for node in ast.walk(tree): if isinstance(node, ast.ImportFrom) and node.module: imports.add(node.module) elif isinstance(node, ast.Import): for alias in node.names: imports.add(alias.name) return imports def test_control_plane_does_not_import_backends_directly() -> None: """ADR-0004 invariant: registry / api / retention / audit talk to bytes only through the data plane and storage SPI surfaces, never through the concrete backend or in-process implementation modules.""" control_plane = ["api", "registry", "retention", "audit"] forbidden_prefixes = ( "artifactstore.storage.backends", "artifactstore.dataplane.inproc", ) offenders: list[str] = [] for cp in control_plane: for py_file in (SRC_DIR / cp).rglob("*.py"): for mod in _module_imports(py_file): if mod.startswith(forbidden_prefixes): offenders.append(f"{py_file.relative_to(REPO_ROOT)} imports {mod}") assert not offenders, "\n".join(offenders)