Files
artifact-store/tests/integration/test_garbage_collection.py

207 lines
6.2 KiB
Python

"""Garbage collection integration tests (ARTIFACT-STORE-WP-0006)."""
from __future__ import annotations
from collections.abc import AsyncIterator
from datetime import UTC, datetime, timedelta
from pathlib import Path
from uuid import UUID
import pytest
import pytest_asyncio
from sqlalchemy import insert, select
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from artifactstore.dataplane import InProcessDataPlane
from artifactstore.db.schema import (
metadata,
retention_classes,
storage_locations,
)
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
from artifactstore.events import RegistryViewWriter, replay
from artifactstore.registry import FileNotFoundError, Registry
from artifactstore.retention import RetentionPolicy
from artifactstore.storage import LocalBackend
@pytest_asyncio.fixture
async def engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]:
db_path = tmp_path / "gc.db"
eng = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
async with eng.begin() as conn:
await conn.run_sync(metadata.create_all)
for seed in RETENTION_CLASS_SEEDS:
await conn.execute(insert(retention_classes).values(**seed))
yield eng
await eng.dispose()
@pytest.fixture
def view_writer() -> RegistryViewWriter:
return RegistryViewWriter()
@pytest.fixture
def registry(
engine: AsyncEngine,
tmp_path: Path,
view_writer: RegistryViewWriter,
) -> Registry:
backend = LocalBackend(tmp_path / "storage", backend_id="local")
dataplane = InProcessDataPlane(backend, tmp_dir=tmp_path / "dp-tmp")
return Registry(
engine,
dataplane,
view_writer,
RetentionPolicy({"transient": 0}),
)
async def _stream(data: bytes) -> AsyncIterator[bytes]:
yield data
async def _consume(it: AsyncIterator[bytes]) -> bytes:
out = bytearray()
async for chunk in it:
out.extend(chunk)
return bytes(out)
async def _package_with_file(
registry: Registry,
*,
name: str,
retention_class: str,
data: bytes,
) -> tuple[UUID, UUID]:
package_id = await registry.create_package(
name=name,
producer="tests",
subject=name,
retention_class=retention_class,
actor="ops",
)
file_id = await registry.ingest_file(
package_id,
relative_path="payload.bin",
media_type="application/octet-stream",
stream=_stream(data),
actor="ops",
)
await registry.finalize_package(package_id, actor="ops")
return package_id, file_id
async def _location_statuses(engine: AsyncEngine) -> dict[UUID, str]:
async with engine.connect() as conn:
rows = (
await conn.execute(
select(
storage_locations.c.artifact_file_id,
storage_locations.c.status,
)
)
).all()
return {row.artifact_file_id: row.status for row in rows}
async def test_gc_deletes_unique_expired_object_and_replays(
registry: Registry,
engine: AsyncEngine,
view_writer: RegistryViewWriter,
) -> None:
package_id, file_id = await _package_with_file(
registry,
name="unique",
retention_class="transient",
data=b"unique bytes",
)
marked = await registry.sweep_deletion_eligibility(
now=datetime.now(UTC) + timedelta(seconds=1)
)
assert marked == [package_id]
results = await registry.collect_garbage()
assert len(results) == 1
assert results[0].file_id == file_id
assert results[0].object_delete_attempted is True
assert results[0].object_deleted is True
assert results[0].ref_count_before == 1
assert results[0].ref_count_after == 0
with pytest.raises(FileNotFoundError):
await registry.get_file(file_id)
assert await registry.collect_garbage() == []
pre = await _location_statuses(engine)
await replay(engine, view_writer, reset=True)
post = await _location_statuses(engine)
assert pre == post == {file_id: "deleted"}
package = await registry.get_package(package_id)
assert package.status == "garbage_collected"
async def test_gc_releases_shared_reference_without_deleting_retained_bytes(
registry: Registry,
engine: AsyncEngine,
) -> None:
data = b"shared bytes"
expired_package, expired_file = await _package_with_file(
registry,
name="expired",
retention_class="transient",
data=data,
)
retained_package, retained_file = await _package_with_file(
registry,
name="retained",
retention_class="raw-evidence",
data=data,
)
marked = await registry.sweep_deletion_eligibility(
now=datetime.now(UTC) + timedelta(seconds=1)
)
assert marked == [expired_package]
results = await registry.collect_garbage()
assert len(results) == 1
assert results[0].package_id == expired_package
assert results[0].object_delete_attempted is False
assert results[0].object_deleted is False
assert results[0].ref_count_before == 2
assert results[0].ref_count_after == 1
statuses = await _location_statuses(engine)
assert statuses[expired_file] == "deleted"
assert statuses[retained_file] == "recorded"
assert (await registry.get_package(expired_package)).status == "garbage_collected"
assert (await registry.get_package(retained_package)).status == "finalized"
with pytest.raises(FileNotFoundError):
await registry.get_file(expired_file)
retained_stream = await registry.get_file(retained_file)
assert await _consume(retained_stream) == data
async def test_gc_respects_active_hold(registry: Registry, engine: AsyncEngine) -> None:
package_id, file_id = await _package_with_file(
registry,
name="held",
retention_class="transient",
data=b"held bytes",
)
await registry.apply_retention_hold(package_id, reason="legal hold", actor="ops")
marked = await registry.sweep_deletion_eligibility(
now=datetime.now(UTC) + timedelta(seconds=1)
)
assert marked == []
assert await registry.collect_garbage() == []
assert await _location_statuses(engine) == {file_id: "recorded"}
stream = await registry.get_file(file_id)
assert await _consume(stream) == b"held bytes"