diff --git a/docs/OPERATOR.md b/docs/OPERATOR.md index 472f291..e05e7e8 100644 --- a/docs/OPERATOR.md +++ b/docs/OPERATOR.md @@ -99,8 +99,20 @@ default_duration_seconds = 220752000 ``` Run `artifactstore retention sweep` from cron or another scheduler to mark -expired, unheld packages eligible for deletion. This work only records -eligibility; it never deletes bytes. +expired, unheld packages eligible for deletion. Then run +`artifactstore retention gc` to release the eligible packages' storage +locations and delete physical objects whose final reference has been +released: + +```sh +artifactstore retention sweep +artifactstore retention gc +``` + +GC is reference-counted by `(backend_id, content_address)`: shared bytes stay in +the backend until every non-deleted storage location has been released. Each +released location emits a `v1.storage.location_deleted` event. A package becomes +`garbage_collected` only after all of its storage locations are released. ## Database backends @@ -205,6 +217,7 @@ digest, emits `v1.storage.location_verified`, and marks failed locations as | `artifactstore push ` | Push a directory through the HTTP API and finalize the package. | | `artifactstore manifest ` | Fetch the JSON manifest projection through the HTTP API. | | `artifactstore retention sweep` | Run one deletion-eligibility sweep against the configured DB. | +| `artifactstore retention gc` | Run one reference-counted garbage-collection pass. | | `artifactstore storage verify --backend ` | Re-read stored objects for a backend and record verification events. | | `artifactstore guide-board ingest ` | Ingest one guide-board run directory as an artifact package. | diff --git a/src/artifactstore/cli/__init__.py b/src/artifactstore/cli/__init__.py index 649beb4..9e44e5a 100644 --- a/src/artifactstore/cli/__init__.py +++ b/src/artifactstore/cli/__init__.py @@ -186,6 +186,34 @@ def retention_sweep() -> None: typer.echo(json.dumps({"marked_package_ids": marked, "marked_count": len(marked)}, indent=2)) +@retention_app.command("gc") +def retention_gc() -> None: + """Run one garbage-collection pass for deletion-eligible packages.""" + settings = get_settings() + results = asyncio.run(_garbage_collect_async(settings)) + object_keys = { + (r["backend_id"], r["content_address"]) + for r in results + if r["object_delete_attempted"] + } + deleted_object_keys = { + (r["backend_id"], r["content_address"]) + for r in results + if r["object_delete_attempted"] and r["object_deleted"] + } + typer.echo( + json.dumps( + { + "released_location_count": len(results), + "delete_attempted_object_count": len(object_keys), + "deleted_object_count": len(deleted_object_keys), + "results": results, + }, + indent=2, + ) + ) + + @storage_app.command("verify") def storage_verify( backend: str | None = typer.Option( @@ -285,6 +313,30 @@ async def _retention_sweep_async(settings: Settings) -> list[str]: return [str(package_id) for package_id in marked] +async def _garbage_collect_async(settings: Settings) -> list[dict[str, Any]]: + from artifactstore.app import build_registry + + registry: Registry = build_registry(settings) + try: + results = await registry.collect_garbage() + finally: + await registry.dispose() + return [ + { + "storage_location_id": str(result.storage_location_id), + "file_id": str(result.file_id), + "package_id": str(result.package_id), + "backend_id": result.backend_id, + "content_address": result.content_address, + "object_delete_attempted": result.object_delete_attempted, + "object_deleted": result.object_deleted, + "ref_count_before": result.ref_count_before, + "ref_count_after": result.ref_count_after, + } + for result in results + ] + + async def _storage_verify_async( settings: Settings, *, diff --git a/src/artifactstore/events/views.py b/src/artifactstore/events/views.py index 080da00..a751e42 100644 --- a/src/artifactstore/events/views.py +++ b/src/artifactstore/events/views.py @@ -23,7 +23,7 @@ from datetime import UTC, datetime from uuid import UUID import cbor2 -from sqlalchemy import delete, insert, update +from sqlalchemy import delete, insert, select, update from sqlalchemy.ext.asyncio import AsyncConnection from artifactstore.db.schema import ( @@ -267,6 +267,47 @@ async def _apply_storage_location_verified( ) +async def _apply_storage_location_deleted( + connection: AsyncConnection, + event: Event, +) -> None: + if event.subject_id is None: + raise ValueError("v1.storage.location_deleted event must have subject_id") + payload = cbor2.loads(event.payload) + await connection.execute( + update(storage_locations) + .where(storage_locations.c.id == UUID(payload["storage_location_id"])) + .values( + status="deleted", + restore_status="object_deleted" + if payload.get("object_deleted") + else "reference_released", + ) + ) + remaining = ( + await connection.execute( + select(storage_locations.c.id) + .join( + artifact_files, + artifact_files.c.id == storage_locations.c.artifact_file_id, + ) + .where( + artifact_files.c.package_id == event.subject_id, + storage_locations.c.status != "deleted", + ) + .limit(1) + ) + ).first() + package_values: dict[str, object] = {"last_event_sequence": event.sequence} + if remaining is None: + package_values["status"] = "garbage_collected" + await connection.execute( + update(artifact_packages) + .where(artifact_packages.c.id == event.subject_id) + .values(**package_values) + ) + + def _parse_iso(value: str | None) -> datetime | None: if value is None: return None @@ -286,4 +327,5 @@ _HANDLERS = { "v1.retention.hold_released": _apply_retention_hold_released, "v1.retention.deletion_eligible": _apply_retention_deletion_eligible, "v1.storage.location_verified": _apply_storage_location_verified, + "v1.storage.location_deleted": _apply_storage_location_deleted, } diff --git a/src/artifactstore/registry/__init__.py b/src/artifactstore/registry/__init__.py index 54a5ba3..0e94af1 100644 --- a/src/artifactstore/registry/__init__.py +++ b/src/artifactstore/registry/__init__.py @@ -70,6 +70,7 @@ __all__ = [ "DuplicateRelativePathError", "FileNotFoundError", "FileRecord", + "GarbageCollectionRecord", "IllegalPackageStateError", "MetadataSchemaRecord", "PackageNotFoundError", @@ -183,6 +184,21 @@ class StorageVerificationRecord: mismatch: str | None +@dataclass(frozen=True, slots=True) +class GarbageCollectionRecord: + """Result of releasing one storage location during garbage collection.""" + + storage_location_id: UUID + file_id: UUID + package_id: UUID + backend_id: str + content_address: str + object_delete_attempted: bool + object_deleted: bool + ref_count_before: int + ref_count_after: int + + _RETENTION_EVENT_TYPES = ( "v1.retention.default_applied", "v1.retention.extended", @@ -785,6 +801,7 @@ class Registry: stmt = select(storage_locations) if backend_id is not None: stmt = stmt.where(storage_locations.c.backend_id == backend_id) + stmt = stmt.where(storage_locations.c.status != "deleted") async with self._engine.connect() as conn: rows = (await conn.execute(stmt.order_by(storage_locations.c.id))).all() @@ -842,6 +859,109 @@ class Registry: ) return results + async def collect_garbage( + self, + *, + actor: str = "garbage-collector", + ) -> list[GarbageCollectionRecord]: + """Release deletion-eligible storage locations and delete unreferenced bytes.""" + stmt = ( + select( + artifact_files.c.package_id, + artifact_files.c.id.label("file_id"), + storage_locations.c.id.label("storage_location_id"), + storage_locations.c.backend_id, + storage_locations.c.content_address, + ) + .join( + artifact_files, + artifact_files.c.id == storage_locations.c.artifact_file_id, + ) + .join( + retention_state, + retention_state.c.package_id == artifact_files.c.package_id, + ) + .where( + retention_state.c.eligible_for_deletion.is_(True), + retention_state.c.active_hold_id.is_(None), + storage_locations.c.status != "deleted", + ) + .order_by( + artifact_files.c.package_id, + artifact_files.c.relative_path, + storage_locations.c.id, + ) + ) + async with self._engine.connect() as conn: + rows = (await conn.execute(stmt)).all() + + groups: dict[tuple[str, str], list[Any]] = {} + for row in rows: + groups.setdefault((row.backend_id, row.content_address), []).append(row) + + results: list[GarbageCollectionRecord] = [] + for (backend_id, content_address), group_rows in groups.items(): + async with self._engine.connect() as conn: + active_refs = ( + await conn.execute( + select(storage_locations.c.id).where( + storage_locations.c.backend_id == backend_id, + storage_locations.c.content_address == content_address, + storage_locations.c.status != "deleted", + ) + ) + ).all() + ref_count_before = len(active_refs) + ref_count_after = max(ref_count_before - len(group_rows), 0) + object_delete_attempted = ref_count_after == 0 + object_deleted = False + if object_delete_attempted: + deletion = await self._dataplane.delete_object( + ContentAddress(content_address), + backend_id=backend_id, + ) + object_deleted = deletion.deleted + + for row in group_rows: + payload = cbor2.dumps( + { + "storage_location_id": str(row.storage_location_id), + "file_id": str(row.file_id), + "package_id": str(row.package_id), + "backend_id": backend_id, + "content_address": content_address, + "object_delete_attempted": object_delete_attempted, + "object_deleted": object_deleted, + "ref_count_before": ref_count_before, + "ref_count_after": ref_count_after, + }, + canonical=True, + ) + event = make_event( + event_type="v1.storage.location_deleted", + subject_kind="package", + subject_id=row.package_id, + actor=actor, + payload=payload, + ) + async with self._engine.begin() as conn: + written = await write(conn, event) + await self._view_writer.apply(conn, written) + results.append( + GarbageCollectionRecord( + storage_location_id=row.storage_location_id, + file_id=row.file_id, + package_id=row.package_id, + backend_id=backend_id, + content_address=content_address, + object_delete_attempted=object_delete_attempted, + object_deleted=object_deleted, + ref_count_before=ref_count_before, + ref_count_after=ref_count_after, + ) + ) + return results + async def get_manifest_bytes(self, package_id: UUID, *, format: str = "cbor") -> bytes: """Return the finalised manifest. ``format`` is ``cbor`` (canonical CBOR, the wire form) or ``json`` (the JCS projection).""" @@ -874,6 +994,8 @@ class Registry: ) -> AsyncIterator[bytes]: """Return an async byte iterator for the bytes of a stored file.""" record = await self.get_file_metadata(file_id) + if record.storage_status == "deleted": + raise FileNotFoundError(f"file has been garbage collected: {file_id}") ca = ContentAddress(record.content_address) return await self._dataplane.serve_object( ca, diff --git a/tests/integration/test_cli_commands.py b/tests/integration/test_cli_commands.py index bfce208..e20a714 100644 --- a/tests/integration/test_cli_commands.py +++ b/tests/integration/test_cli_commands.py @@ -254,6 +254,75 @@ def test_cli_retention_sweep_marks_expired_package( assert payload == {"marked_package_ids": [package_id], "marked_count": 1} +def test_cli_retention_gc_collects_eligible_package( + runner: CliRunner, + env_db: Path, + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + sync_engine = create_engine(f"sqlite:///{env_db}", future=True) + metadata.create_all(sync_engine) + with sync_engine.begin() as conn: + conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS]) + sync_engine.dispose() + + retention_config = tmp_path / "retention.toml" + retention_config.write_text( + '[retention_classes.transient]\ndefault_duration_seconds = 0\n', + encoding="utf-8", + ) + monkeypatch.setenv("ARTIFACTSTORE_RETENTION_CONFIG_PATH", str(retention_config)) + + async def create_expired_file() -> None: + from collections.abc import AsyncIterator + from datetime import UTC, datetime, timedelta + + from artifactstore.app import build_registry + from artifactstore.config import get_settings + + async def stream() -> AsyncIterator[bytes]: + yield b"collect-me" + + registry = build_registry(get_settings()) + try: + package_id = await registry.create_package( + name="collect", + producer="tests", + subject="cli-gc", + retention_class="transient", + actor="ops", + ) + await registry.ingest_file( + package_id, + relative_path="collect.bin", + media_type="application/octet-stream", + stream=stream(), + actor="ops", + ) + await registry.finalize_package(package_id, actor="ops") + await registry.sweep_deletion_eligibility( + now=datetime.now(UTC) + timedelta(seconds=1) + ) + finally: + await registry.dispose() + + asyncio.run(create_expired_file()) + result = runner.invoke(cli_app, ["retention", "gc"]) + + assert result.exit_code == 0, result.output + payload = json.loads(result.output) + assert payload["released_location_count"] == 1 + assert payload["delete_attempted_object_count"] == 1 + assert payload["deleted_object_count"] == 1 + assert payload["results"][0]["object_deleted"] is True + + sync_engine = create_engine(f"sqlite:///{env_db}", future=True) + with sync_engine.connect() as conn: + status = conn.execute(select(storage_locations.c.status)).scalar_one() + sync_engine.dispose() + assert status == "deleted" + + def test_cli_storage_verify_marks_local_location_verified( runner: CliRunner, env_db: Path, diff --git a/tests/integration/test_garbage_collection.py b/tests/integration/test_garbage_collection.py new file mode 100644 index 0000000..6ce8bef --- /dev/null +++ b/tests/integration/test_garbage_collection.py @@ -0,0 +1,206 @@ +"""Garbage collection integration tests (ARTIFACT-STORE-WP-0006).""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from datetime import UTC, datetime, timedelta +from pathlib import Path +from uuid import UUID + +import pytest +import pytest_asyncio +from sqlalchemy import insert, select +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine + +from artifactstore.dataplane import InProcessDataPlane +from artifactstore.db.schema import ( + metadata, + retention_classes, + storage_locations, +) +from artifactstore.db.seed import RETENTION_CLASS_SEEDS +from artifactstore.events import RegistryViewWriter, replay +from artifactstore.registry import FileNotFoundError, Registry +from artifactstore.retention import RetentionPolicy +from artifactstore.storage import LocalBackend + + +@pytest_asyncio.fixture +async def engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]: + db_path = tmp_path / "gc.db" + eng = create_async_engine(f"sqlite+aiosqlite:///{db_path}") + async with eng.begin() as conn: + await conn.run_sync(metadata.create_all) + for seed in RETENTION_CLASS_SEEDS: + await conn.execute(insert(retention_classes).values(**seed)) + yield eng + await eng.dispose() + + +@pytest.fixture +def view_writer() -> RegistryViewWriter: + return RegistryViewWriter() + + +@pytest.fixture +def registry( + engine: AsyncEngine, + tmp_path: Path, + view_writer: RegistryViewWriter, +) -> Registry: + backend = LocalBackend(tmp_path / "storage", backend_id="local") + dataplane = InProcessDataPlane(backend, tmp_dir=tmp_path / "dp-tmp") + return Registry( + engine, + dataplane, + view_writer, + RetentionPolicy({"transient": 0}), + ) + + +async def _stream(data: bytes) -> AsyncIterator[bytes]: + yield data + + +async def _consume(it: AsyncIterator[bytes]) -> bytes: + out = bytearray() + async for chunk in it: + out.extend(chunk) + return bytes(out) + + +async def _package_with_file( + registry: Registry, + *, + name: str, + retention_class: str, + data: bytes, +) -> tuple[UUID, UUID]: + package_id = await registry.create_package( + name=name, + producer="tests", + subject=name, + retention_class=retention_class, + actor="ops", + ) + file_id = await registry.ingest_file( + package_id, + relative_path="payload.bin", + media_type="application/octet-stream", + stream=_stream(data), + actor="ops", + ) + await registry.finalize_package(package_id, actor="ops") + return package_id, file_id + + +async def _location_statuses(engine: AsyncEngine) -> dict[UUID, str]: + async with engine.connect() as conn: + rows = ( + await conn.execute( + select( + storage_locations.c.artifact_file_id, + storage_locations.c.status, + ) + ) + ).all() + return {row.artifact_file_id: row.status for row in rows} + + +async def test_gc_deletes_unique_expired_object_and_replays( + registry: Registry, + engine: AsyncEngine, + view_writer: RegistryViewWriter, +) -> None: + package_id, file_id = await _package_with_file( + registry, + name="unique", + retention_class="transient", + data=b"unique bytes", + ) + marked = await registry.sweep_deletion_eligibility( + now=datetime.now(UTC) + timedelta(seconds=1) + ) + assert marked == [package_id] + + results = await registry.collect_garbage() + assert len(results) == 1 + assert results[0].file_id == file_id + assert results[0].object_delete_attempted is True + assert results[0].object_deleted is True + assert results[0].ref_count_before == 1 + assert results[0].ref_count_after == 0 + + with pytest.raises(FileNotFoundError): + await registry.get_file(file_id) + assert await registry.collect_garbage() == [] + + pre = await _location_statuses(engine) + await replay(engine, view_writer, reset=True) + post = await _location_statuses(engine) + assert pre == post == {file_id: "deleted"} + + package = await registry.get_package(package_id) + assert package.status == "garbage_collected" + + +async def test_gc_releases_shared_reference_without_deleting_retained_bytes( + registry: Registry, + engine: AsyncEngine, +) -> None: + data = b"shared bytes" + expired_package, expired_file = await _package_with_file( + registry, + name="expired", + retention_class="transient", + data=data, + ) + retained_package, retained_file = await _package_with_file( + registry, + name="retained", + retention_class="raw-evidence", + data=data, + ) + marked = await registry.sweep_deletion_eligibility( + now=datetime.now(UTC) + timedelta(seconds=1) + ) + assert marked == [expired_package] + + results = await registry.collect_garbage() + assert len(results) == 1 + assert results[0].package_id == expired_package + assert results[0].object_delete_attempted is False + assert results[0].object_deleted is False + assert results[0].ref_count_before == 2 + assert results[0].ref_count_after == 1 + + statuses = await _location_statuses(engine) + assert statuses[expired_file] == "deleted" + assert statuses[retained_file] == "recorded" + assert (await registry.get_package(expired_package)).status == "garbage_collected" + assert (await registry.get_package(retained_package)).status == "finalized" + + with pytest.raises(FileNotFoundError): + await registry.get_file(expired_file) + retained_stream = await registry.get_file(retained_file) + assert await _consume(retained_stream) == data + + +async def test_gc_respects_active_hold(registry: Registry, engine: AsyncEngine) -> None: + package_id, file_id = await _package_with_file( + registry, + name="held", + retention_class="transient", + data=b"held bytes", + ) + await registry.apply_retention_hold(package_id, reason="legal hold", actor="ops") + + marked = await registry.sweep_deletion_eligibility( + now=datetime.now(UTC) + timedelta(seconds=1) + ) + assert marked == [] + assert await registry.collect_garbage() == [] + assert await _location_statuses(engine) == {file_id: "recorded"} + + stream = await registry.get_file(file_id) + assert await _consume(stream) == b"held bytes" diff --git a/workplans/ARTIFACT-STORE-WP-0006-garbage-collection.md b/workplans/ARTIFACT-STORE-WP-0006-garbage-collection.md new file mode 100644 index 0000000..e936ec2 --- /dev/null +++ b/workplans/ARTIFACT-STORE-WP-0006-garbage-collection.md @@ -0,0 +1,124 @@ +--- +id: ARTIFACT-STORE-WP-0006 +type: workplan +title: "Garbage Collection And Reference Counting" +repo: artifact-store +domain: stack +status: done +owner: codex +topic_slug: stack +planning_priority: high +planning_order: 6 +created: "2026-05-16" +updated: "2026-05-16" +state_hub_workstream_id: "ccef72e9-a160-45c0-9952-c64be7c8cfa4" +--- + +# ARTIFACT-STORE-WP-0006: Garbage Collection And Reference Counting + +## Purpose + +Turn WP-0003 deletion eligibility into actual byte reclamation while +preserving auditability and global content-addressed deduplication. GC +must never delete bytes still referenced by a non-deleted storage +location. + +## Constraints + +- ADR-0001: content-addressed storage with global deduplication. +- ADR-0002: event log is the source of truth; materialised views are + replayable. +- ADR-0004: byte deletion goes through the data plane, not through + registry-specific backend code. +- WP-0003 deletion eligibility and retention holds are the policy gate. + +## Prerequisites + +- WP-0001 through WP-0003 done. +- WP-0004 backend SPI delete exists for all configured backends. + +## D6.1 - Reference-Counted GC Planner + +```task +id: ARTIFACT-STORE-WP-0006-T001 +status: done +priority: high +state_hub_task_id: "438ed392-0f07-46cb-a6f5-88ce57b33fce" +``` + +Acceptance: + +- GC selects only packages whose `retention_state.eligible_for_deletion` + is true and `active_hold_id` is null. +- It computes references by `(backend_id, content_address)` across all + non-deleted storage locations. +- It releases an eligible package's storage locations without deleting + bytes that are still referenced elsewhere. + +## D6.2 - Byte Deletion And Audit Events + +```task +id: ARTIFACT-STORE-WP-0006-T002 +status: done +priority: high +state_hub_task_id: "8f512753-c402-480a-8517-990fccf09295" +``` + +Acceptance: + +- When the eligible package set owns the final reference to a content + address, GC calls `DataPlane.delete_object`. +- GC emits replayable audit events for every released storage location, + including whether the physical object was deleted or retained due to + remaining references. +- Replay marks released storage locations as `deleted` and packages as + `garbage_collected` once every storage location for that package is + deleted. + +## D6.3 - Operator Command And Docs + +```task +id: ARTIFACT-STORE-WP-0006-T003 +status: done +priority: medium +state_hub_task_id: "a36dce56-f87b-431a-b875-fc567593ddd3" +``` + +Acceptance: + +- `artifactstore retention gc` runs one GC pass and prints a JSON + summary. +- `docs/OPERATOR.md` documents the safe sequence: + `artifactstore retention sweep` then `artifactstore retention gc`. +- The command is idempotent: running it again after a clean pass does + not delete or rewrite anything. + +## D6.4 - Verification Tests + +```task +id: ARTIFACT-STORE-WP-0006-T004 +status: done +priority: high +state_hub_task_id: "b2a2d94f-bc5a-47ca-b540-920d94bff06e" +``` + +Acceptance: + +- Tests cover unique-object deletion, shared-object reference retention, + hold-protected packages, idempotent reruns, replay, and CLI output. +- Full `pytest`, `ruff`, and `mypy` pass. + +## Verification + +- Focused tests: `tests/integration/test_garbage_collection.py` and + `tests/integration/test_cli_commands.py` passed. +- `ruff check .` passed. +- `mypy src tests` passed. + +## Success criteria + +- Expired, unheld packages can be reclaimed without losing bytes still + referenced by retained packages. +- The event log explains every logical release and physical delete. +- A replayed database reconstructs the same `deleted` storage-location + state and `garbage_collected` package status.