generated from coulomb/repo-seed
src/artifactstore/storage/:
- spi.py: StorageBackend Protocol (backend_id, put, get, head, delete,
health) and result dataclasses (StorageReceipt, StorageObjectMetadata,
DeletionResult, BackendStatus). ObjectNotFoundError exception type.
- registry.py: backend lookup by string ID (register/get/list_backends/
clear) per ADR-0004.
- backends/local.py: LocalBackend implementation.
* Object layout <root>/<algorithm>/<hex[0:2]>/<hex[2:4]>/<hex>.
* Atomic writes: tmpfile + fsync + rename (idempotent re-puts drain the
stream without rewriting).
* Defence in depth: resolves the final path and asserts it remains under
the configured root.
* Range reads honour HTTP-style inclusive (start, end) tuples.
* health() returns disk usage via shutil.disk_usage and surfaces an
unhealthy status when the root has disappeared.
* delete() cleans up emptied shard directories opportunistically.
tests/unit/test_storage_local.py (14 cases): put/get round-trip; object
key layout matches blueprint; head returns metadata; head/get missing
raise ObjectNotFoundError; put is idempotent; delete returns True then
False; range read returns subrange; range read rejects invalid range;
health reports disk usage; health reports unhealthy when root vanished;
ContentAddress validation blocks path-traversal-flavoured inputs;
registry register/get/list/clear round-trip; idempotent re-put leaves
bytes intact.
Gates: ruff clean, mypy --strict clean on 41 files, 59 tests pass.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
181 lines
5.3 KiB
Python
181 lines
5.3 KiB
Python
"""Local filesystem backend tests (ARTIFACT-STORE-WP-0001-T003)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import AsyncIterator
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from artifactstore.identity import ContentAddress, digest_bytes
|
|
from artifactstore.storage import (
|
|
LocalBackend,
|
|
ObjectNotFoundError,
|
|
clear_backends,
|
|
get_backend,
|
|
list_backends,
|
|
register_backend,
|
|
)
|
|
|
|
|
|
async def _stream(data: bytes, chunk_size: int = 16) -> AsyncIterator[bytes]:
|
|
for i in range(0, len(data), chunk_size):
|
|
yield data[i : i + chunk_size]
|
|
|
|
|
|
def _ca_of(data: bytes) -> ContentAddress:
|
|
return digest_bytes(data).primary.content_address
|
|
|
|
|
|
@pytest.fixture
|
|
def backend(tmp_path: Path) -> LocalBackend:
|
|
return LocalBackend(tmp_path / "storage", backend_id="local")
|
|
|
|
|
|
async def _consume(it: AsyncIterator[bytes]) -> bytes:
|
|
out = bytearray()
|
|
async for chunk in it:
|
|
out.extend(chunk)
|
|
return bytes(out)
|
|
|
|
|
|
async def test_put_then_get_round_trips(backend: LocalBackend) -> None:
|
|
data = b"hello world" * 100
|
|
ca = _ca_of(data)
|
|
|
|
receipt = await backend.put(ca, _stream(data))
|
|
assert receipt.backend_id == "local"
|
|
assert receipt.content_address == ca
|
|
assert receipt.size_bytes == len(data)
|
|
|
|
stream = await backend.get(ca)
|
|
assert await _consume(stream) == data
|
|
|
|
|
|
async def test_object_key_layout_matches_blueprint(backend: LocalBackend) -> None:
|
|
data = b"layout-check"
|
|
ca = _ca_of(data)
|
|
receipt = await backend.put(ca, _stream(data))
|
|
digest = ca.to_digest()
|
|
expected = f"{digest.algorithm}/{digest.hex[:2]}/{digest.hex[2:4]}/{digest.hex}"
|
|
assert receipt.object_key == expected
|
|
assert (backend.root / expected).is_file()
|
|
|
|
|
|
async def test_head_returns_metadata(backend: LocalBackend) -> None:
|
|
data = b"abcde"
|
|
ca = _ca_of(data)
|
|
await backend.put(ca, _stream(data))
|
|
meta = await backend.head(ca)
|
|
assert meta.size_bytes == len(data)
|
|
assert meta.content_address == ca
|
|
assert meta.backend_id == "local"
|
|
|
|
|
|
async def test_head_missing_raises_object_not_found(backend: LocalBackend) -> None:
|
|
ca = _ca_of(b"never-stored")
|
|
with pytest.raises(ObjectNotFoundError):
|
|
await backend.head(ca)
|
|
|
|
|
|
async def test_get_missing_raises_object_not_found(backend: LocalBackend) -> None:
|
|
ca = _ca_of(b"never-stored")
|
|
with pytest.raises(ObjectNotFoundError):
|
|
await backend.get(ca)
|
|
|
|
|
|
async def test_put_is_idempotent(backend: LocalBackend) -> None:
|
|
data = b"idempotent"
|
|
ca = _ca_of(data)
|
|
r1 = await backend.put(ca, _stream(data))
|
|
r2 = await backend.put(ca, _stream(data))
|
|
assert r1.size_bytes == r2.size_bytes == len(data)
|
|
assert r1.content_address == r2.content_address
|
|
|
|
|
|
async def test_delete_returns_true_then_false(backend: LocalBackend) -> None:
|
|
data = b"to-delete"
|
|
ca = _ca_of(data)
|
|
await backend.put(ca, _stream(data))
|
|
|
|
first = await backend.delete(ca)
|
|
assert first.deleted is True
|
|
assert first.content_address == ca
|
|
|
|
second = await backend.delete(ca)
|
|
assert second.deleted is False
|
|
|
|
|
|
async def test_range_read_returns_subrange(backend: LocalBackend) -> None:
|
|
data = bytes(range(100)) # 100 bytes 0..99
|
|
ca = _ca_of(data)
|
|
await backend.put(ca, _stream(data))
|
|
|
|
stream = await backend.get(ca, byte_range=(10, 19))
|
|
chunk = await _consume(stream)
|
|
assert chunk == data[10:20]
|
|
assert len(chunk) == 10
|
|
|
|
|
|
async def test_range_read_rejects_invalid_range(backend: LocalBackend) -> None:
|
|
data = b"range-check"
|
|
ca = _ca_of(data)
|
|
await backend.put(ca, _stream(data))
|
|
stream = await backend.get(ca, byte_range=(20, 5))
|
|
with pytest.raises(ValueError):
|
|
await _consume(stream)
|
|
|
|
|
|
async def test_health_reports_disk_usage(backend: LocalBackend) -> None:
|
|
status = await backend.health()
|
|
assert status.healthy is True
|
|
assert status.free_bytes is not None
|
|
assert status.total_bytes is not None
|
|
assert status.free_bytes >= 0
|
|
assert status.backend_id == "local"
|
|
|
|
|
|
async def test_health_reports_unhealthy_if_root_missing(tmp_path: Path) -> None:
|
|
root = tmp_path / "vanished"
|
|
backend = LocalBackend(root)
|
|
# Remove the root directory after construction.
|
|
root.rmdir()
|
|
status = await backend.health()
|
|
assert status.healthy is False
|
|
assert "root" in status.detail
|
|
|
|
|
|
async def test_content_address_validation_blocks_path_traversal() -> None:
|
|
# The ContentAddress constructor rejects malformed inputs before they
|
|
# ever reach the backend.
|
|
with pytest.raises(ValueError):
|
|
ContentAddress("sha256:../etc/passwd")
|
|
with pytest.raises(ValueError):
|
|
ContentAddress("../sha256:deadbeef")
|
|
|
|
|
|
async def test_registry_register_get_list_clear(backend: LocalBackend) -> None:
|
|
clear_backends()
|
|
assert list_backends() == []
|
|
|
|
register_backend(backend)
|
|
assert list_backends() == ["local"]
|
|
assert get_backend("local") is backend
|
|
|
|
with pytest.raises(KeyError):
|
|
get_backend("nope")
|
|
|
|
clear_backends()
|
|
assert list_backends() == []
|
|
|
|
|
|
async def test_idempotent_put_does_not_corrupt_existing(backend: LocalBackend) -> None:
|
|
data = b"original-bytes"
|
|
ca = _ca_of(data)
|
|
await backend.put(ca, _stream(data))
|
|
|
|
# A re-put with the same content address yields the same bytes back.
|
|
await backend.put(ca, _stream(data))
|
|
stream = await backend.get(ca)
|
|
assert await _consume(stream) == data
|