Add reference-counted garbage collection

This commit is contained in:
2026-05-17 00:26:19 +02:00
parent 1dae855700
commit a60d24f814
7 changed files with 631 additions and 3 deletions

View File

@@ -99,8 +99,20 @@ default_duration_seconds = 220752000
```
Run `artifactstore retention sweep` from cron or another scheduler to mark
expired, unheld packages eligible for deletion. This work only records
eligibility; it never deletes bytes.
expired, unheld packages eligible for deletion. Then run
`artifactstore retention gc` to release the eligible packages' storage
locations and delete physical objects whose final reference has been
released:
```sh
artifactstore retention sweep
artifactstore retention gc
```
GC is reference-counted by `(backend_id, content_address)`: shared bytes stay in
the backend until every non-deleted storage location has been released. Each
released location emits a `v1.storage.location_deleted` event. A package becomes
`garbage_collected` only after all of its storage locations are released.
## Database backends
@@ -205,6 +217,7 @@ digest, emits `v1.storage.location_verified`, and marks failed locations as
| `artifactstore push <dir>` | Push a directory through the HTTP API and finalize the package. |
| `artifactstore manifest <package_id>` | Fetch the JSON manifest projection through the HTTP API. |
| `artifactstore retention sweep` | Run one deletion-eligibility sweep against the configured DB. |
| `artifactstore retention gc` | Run one reference-counted garbage-collection pass. |
| `artifactstore storage verify --backend <id>` | Re-read stored objects for a backend and record verification events. |
| `artifactstore guide-board ingest <run-dir>` | Ingest one guide-board run directory as an artifact package. |

View File

@@ -186,6 +186,34 @@ def retention_sweep() -> None:
typer.echo(json.dumps({"marked_package_ids": marked, "marked_count": len(marked)}, indent=2))
@retention_app.command("gc")
def retention_gc() -> None:
"""Run one garbage-collection pass for deletion-eligible packages."""
settings = get_settings()
results = asyncio.run(_garbage_collect_async(settings))
object_keys = {
(r["backend_id"], r["content_address"])
for r in results
if r["object_delete_attempted"]
}
deleted_object_keys = {
(r["backend_id"], r["content_address"])
for r in results
if r["object_delete_attempted"] and r["object_deleted"]
}
typer.echo(
json.dumps(
{
"released_location_count": len(results),
"delete_attempted_object_count": len(object_keys),
"deleted_object_count": len(deleted_object_keys),
"results": results,
},
indent=2,
)
)
@storage_app.command("verify")
def storage_verify(
backend: str | None = typer.Option(
@@ -285,6 +313,30 @@ async def _retention_sweep_async(settings: Settings) -> list[str]:
return [str(package_id) for package_id in marked]
async def _garbage_collect_async(settings: Settings) -> list[dict[str, Any]]:
from artifactstore.app import build_registry
registry: Registry = build_registry(settings)
try:
results = await registry.collect_garbage()
finally:
await registry.dispose()
return [
{
"storage_location_id": str(result.storage_location_id),
"file_id": str(result.file_id),
"package_id": str(result.package_id),
"backend_id": result.backend_id,
"content_address": result.content_address,
"object_delete_attempted": result.object_delete_attempted,
"object_deleted": result.object_deleted,
"ref_count_before": result.ref_count_before,
"ref_count_after": result.ref_count_after,
}
for result in results
]
async def _storage_verify_async(
settings: Settings,
*,

View File

@@ -23,7 +23,7 @@ from datetime import UTC, datetime
from uuid import UUID
import cbor2
from sqlalchemy import delete, insert, update
from sqlalchemy import delete, insert, select, update
from sqlalchemy.ext.asyncio import AsyncConnection
from artifactstore.db.schema import (
@@ -267,6 +267,47 @@ async def _apply_storage_location_verified(
)
async def _apply_storage_location_deleted(
connection: AsyncConnection,
event: Event,
) -> None:
if event.subject_id is None:
raise ValueError("v1.storage.location_deleted event must have subject_id")
payload = cbor2.loads(event.payload)
await connection.execute(
update(storage_locations)
.where(storage_locations.c.id == UUID(payload["storage_location_id"]))
.values(
status="deleted",
restore_status="object_deleted"
if payload.get("object_deleted")
else "reference_released",
)
)
remaining = (
await connection.execute(
select(storage_locations.c.id)
.join(
artifact_files,
artifact_files.c.id == storage_locations.c.artifact_file_id,
)
.where(
artifact_files.c.package_id == event.subject_id,
storage_locations.c.status != "deleted",
)
.limit(1)
)
).first()
package_values: dict[str, object] = {"last_event_sequence": event.sequence}
if remaining is None:
package_values["status"] = "garbage_collected"
await connection.execute(
update(artifact_packages)
.where(artifact_packages.c.id == event.subject_id)
.values(**package_values)
)
def _parse_iso(value: str | None) -> datetime | None:
if value is None:
return None
@@ -286,4 +327,5 @@ _HANDLERS = {
"v1.retention.hold_released": _apply_retention_hold_released,
"v1.retention.deletion_eligible": _apply_retention_deletion_eligible,
"v1.storage.location_verified": _apply_storage_location_verified,
"v1.storage.location_deleted": _apply_storage_location_deleted,
}

View File

@@ -70,6 +70,7 @@ __all__ = [
"DuplicateRelativePathError",
"FileNotFoundError",
"FileRecord",
"GarbageCollectionRecord",
"IllegalPackageStateError",
"MetadataSchemaRecord",
"PackageNotFoundError",
@@ -183,6 +184,21 @@ class StorageVerificationRecord:
mismatch: str | None
@dataclass(frozen=True, slots=True)
class GarbageCollectionRecord:
"""Result of releasing one storage location during garbage collection."""
storage_location_id: UUID
file_id: UUID
package_id: UUID
backend_id: str
content_address: str
object_delete_attempted: bool
object_deleted: bool
ref_count_before: int
ref_count_after: int
_RETENTION_EVENT_TYPES = (
"v1.retention.default_applied",
"v1.retention.extended",
@@ -785,6 +801,7 @@ class Registry:
stmt = select(storage_locations)
if backend_id is not None:
stmt = stmt.where(storage_locations.c.backend_id == backend_id)
stmt = stmt.where(storage_locations.c.status != "deleted")
async with self._engine.connect() as conn:
rows = (await conn.execute(stmt.order_by(storage_locations.c.id))).all()
@@ -842,6 +859,109 @@ class Registry:
)
return results
async def collect_garbage(
self,
*,
actor: str = "garbage-collector",
) -> list[GarbageCollectionRecord]:
"""Release deletion-eligible storage locations and delete unreferenced bytes."""
stmt = (
select(
artifact_files.c.package_id,
artifact_files.c.id.label("file_id"),
storage_locations.c.id.label("storage_location_id"),
storage_locations.c.backend_id,
storage_locations.c.content_address,
)
.join(
artifact_files,
artifact_files.c.id == storage_locations.c.artifact_file_id,
)
.join(
retention_state,
retention_state.c.package_id == artifact_files.c.package_id,
)
.where(
retention_state.c.eligible_for_deletion.is_(True),
retention_state.c.active_hold_id.is_(None),
storage_locations.c.status != "deleted",
)
.order_by(
artifact_files.c.package_id,
artifact_files.c.relative_path,
storage_locations.c.id,
)
)
async with self._engine.connect() as conn:
rows = (await conn.execute(stmt)).all()
groups: dict[tuple[str, str], list[Any]] = {}
for row in rows:
groups.setdefault((row.backend_id, row.content_address), []).append(row)
results: list[GarbageCollectionRecord] = []
for (backend_id, content_address), group_rows in groups.items():
async with self._engine.connect() as conn:
active_refs = (
await conn.execute(
select(storage_locations.c.id).where(
storage_locations.c.backend_id == backend_id,
storage_locations.c.content_address == content_address,
storage_locations.c.status != "deleted",
)
)
).all()
ref_count_before = len(active_refs)
ref_count_after = max(ref_count_before - len(group_rows), 0)
object_delete_attempted = ref_count_after == 0
object_deleted = False
if object_delete_attempted:
deletion = await self._dataplane.delete_object(
ContentAddress(content_address),
backend_id=backend_id,
)
object_deleted = deletion.deleted
for row in group_rows:
payload = cbor2.dumps(
{
"storage_location_id": str(row.storage_location_id),
"file_id": str(row.file_id),
"package_id": str(row.package_id),
"backend_id": backend_id,
"content_address": content_address,
"object_delete_attempted": object_delete_attempted,
"object_deleted": object_deleted,
"ref_count_before": ref_count_before,
"ref_count_after": ref_count_after,
},
canonical=True,
)
event = make_event(
event_type="v1.storage.location_deleted",
subject_kind="package",
subject_id=row.package_id,
actor=actor,
payload=payload,
)
async with self._engine.begin() as conn:
written = await write(conn, event)
await self._view_writer.apply(conn, written)
results.append(
GarbageCollectionRecord(
storage_location_id=row.storage_location_id,
file_id=row.file_id,
package_id=row.package_id,
backend_id=backend_id,
content_address=content_address,
object_delete_attempted=object_delete_attempted,
object_deleted=object_deleted,
ref_count_before=ref_count_before,
ref_count_after=ref_count_after,
)
)
return results
async def get_manifest_bytes(self, package_id: UUID, *, format: str = "cbor") -> bytes:
"""Return the finalised manifest. ``format`` is ``cbor`` (canonical
CBOR, the wire form) or ``json`` (the JCS projection)."""
@@ -874,6 +994,8 @@ class Registry:
) -> AsyncIterator[bytes]:
"""Return an async byte iterator for the bytes of a stored file."""
record = await self.get_file_metadata(file_id)
if record.storage_status == "deleted":
raise FileNotFoundError(f"file has been garbage collected: {file_id}")
ca = ContentAddress(record.content_address)
return await self._dataplane.serve_object(
ca,

View File

@@ -254,6 +254,75 @@ def test_cli_retention_sweep_marks_expired_package(
assert payload == {"marked_package_ids": [package_id], "marked_count": 1}
def test_cli_retention_gc_collects_eligible_package(
runner: CliRunner,
env_db: Path,
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
sync_engine = create_engine(f"sqlite:///{env_db}", future=True)
metadata.create_all(sync_engine)
with sync_engine.begin() as conn:
conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
sync_engine.dispose()
retention_config = tmp_path / "retention.toml"
retention_config.write_text(
'[retention_classes.transient]\ndefault_duration_seconds = 0\n',
encoding="utf-8",
)
monkeypatch.setenv("ARTIFACTSTORE_RETENTION_CONFIG_PATH", str(retention_config))
async def create_expired_file() -> None:
from collections.abc import AsyncIterator
from datetime import UTC, datetime, timedelta
from artifactstore.app import build_registry
from artifactstore.config import get_settings
async def stream() -> AsyncIterator[bytes]:
yield b"collect-me"
registry = build_registry(get_settings())
try:
package_id = await registry.create_package(
name="collect",
producer="tests",
subject="cli-gc",
retention_class="transient",
actor="ops",
)
await registry.ingest_file(
package_id,
relative_path="collect.bin",
media_type="application/octet-stream",
stream=stream(),
actor="ops",
)
await registry.finalize_package(package_id, actor="ops")
await registry.sweep_deletion_eligibility(
now=datetime.now(UTC) + timedelta(seconds=1)
)
finally:
await registry.dispose()
asyncio.run(create_expired_file())
result = runner.invoke(cli_app, ["retention", "gc"])
assert result.exit_code == 0, result.output
payload = json.loads(result.output)
assert payload["released_location_count"] == 1
assert payload["delete_attempted_object_count"] == 1
assert payload["deleted_object_count"] == 1
assert payload["results"][0]["object_deleted"] is True
sync_engine = create_engine(f"sqlite:///{env_db}", future=True)
with sync_engine.connect() as conn:
status = conn.execute(select(storage_locations.c.status)).scalar_one()
sync_engine.dispose()
assert status == "deleted"
def test_cli_storage_verify_marks_local_location_verified(
runner: CliRunner,
env_db: Path,

View File

@@ -0,0 +1,206 @@
"""Garbage collection integration tests (ARTIFACT-STORE-WP-0006)."""
from __future__ import annotations
from collections.abc import AsyncIterator
from datetime import UTC, datetime, timedelta
from pathlib import Path
from uuid import UUID
import pytest
import pytest_asyncio
from sqlalchemy import insert, select
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from artifactstore.dataplane import InProcessDataPlane
from artifactstore.db.schema import (
metadata,
retention_classes,
storage_locations,
)
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
from artifactstore.events import RegistryViewWriter, replay
from artifactstore.registry import FileNotFoundError, Registry
from artifactstore.retention import RetentionPolicy
from artifactstore.storage import LocalBackend
@pytest_asyncio.fixture
async def engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]:
db_path = tmp_path / "gc.db"
eng = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
async with eng.begin() as conn:
await conn.run_sync(metadata.create_all)
for seed in RETENTION_CLASS_SEEDS:
await conn.execute(insert(retention_classes).values(**seed))
yield eng
await eng.dispose()
@pytest.fixture
def view_writer() -> RegistryViewWriter:
return RegistryViewWriter()
@pytest.fixture
def registry(
engine: AsyncEngine,
tmp_path: Path,
view_writer: RegistryViewWriter,
) -> Registry:
backend = LocalBackend(tmp_path / "storage", backend_id="local")
dataplane = InProcessDataPlane(backend, tmp_dir=tmp_path / "dp-tmp")
return Registry(
engine,
dataplane,
view_writer,
RetentionPolicy({"transient": 0}),
)
async def _stream(data: bytes) -> AsyncIterator[bytes]:
yield data
async def _consume(it: AsyncIterator[bytes]) -> bytes:
out = bytearray()
async for chunk in it:
out.extend(chunk)
return bytes(out)
async def _package_with_file(
registry: Registry,
*,
name: str,
retention_class: str,
data: bytes,
) -> tuple[UUID, UUID]:
package_id = await registry.create_package(
name=name,
producer="tests",
subject=name,
retention_class=retention_class,
actor="ops",
)
file_id = await registry.ingest_file(
package_id,
relative_path="payload.bin",
media_type="application/octet-stream",
stream=_stream(data),
actor="ops",
)
await registry.finalize_package(package_id, actor="ops")
return package_id, file_id
async def _location_statuses(engine: AsyncEngine) -> dict[UUID, str]:
async with engine.connect() as conn:
rows = (
await conn.execute(
select(
storage_locations.c.artifact_file_id,
storage_locations.c.status,
)
)
).all()
return {row.artifact_file_id: row.status for row in rows}
async def test_gc_deletes_unique_expired_object_and_replays(
registry: Registry,
engine: AsyncEngine,
view_writer: RegistryViewWriter,
) -> None:
package_id, file_id = await _package_with_file(
registry,
name="unique",
retention_class="transient",
data=b"unique bytes",
)
marked = await registry.sweep_deletion_eligibility(
now=datetime.now(UTC) + timedelta(seconds=1)
)
assert marked == [package_id]
results = await registry.collect_garbage()
assert len(results) == 1
assert results[0].file_id == file_id
assert results[0].object_delete_attempted is True
assert results[0].object_deleted is True
assert results[0].ref_count_before == 1
assert results[0].ref_count_after == 0
with pytest.raises(FileNotFoundError):
await registry.get_file(file_id)
assert await registry.collect_garbage() == []
pre = await _location_statuses(engine)
await replay(engine, view_writer, reset=True)
post = await _location_statuses(engine)
assert pre == post == {file_id: "deleted"}
package = await registry.get_package(package_id)
assert package.status == "garbage_collected"
async def test_gc_releases_shared_reference_without_deleting_retained_bytes(
registry: Registry,
engine: AsyncEngine,
) -> None:
data = b"shared bytes"
expired_package, expired_file = await _package_with_file(
registry,
name="expired",
retention_class="transient",
data=data,
)
retained_package, retained_file = await _package_with_file(
registry,
name="retained",
retention_class="raw-evidence",
data=data,
)
marked = await registry.sweep_deletion_eligibility(
now=datetime.now(UTC) + timedelta(seconds=1)
)
assert marked == [expired_package]
results = await registry.collect_garbage()
assert len(results) == 1
assert results[0].package_id == expired_package
assert results[0].object_delete_attempted is False
assert results[0].object_deleted is False
assert results[0].ref_count_before == 2
assert results[0].ref_count_after == 1
statuses = await _location_statuses(engine)
assert statuses[expired_file] == "deleted"
assert statuses[retained_file] == "recorded"
assert (await registry.get_package(expired_package)).status == "garbage_collected"
assert (await registry.get_package(retained_package)).status == "finalized"
with pytest.raises(FileNotFoundError):
await registry.get_file(expired_file)
retained_stream = await registry.get_file(retained_file)
assert await _consume(retained_stream) == data
async def test_gc_respects_active_hold(registry: Registry, engine: AsyncEngine) -> None:
package_id, file_id = await _package_with_file(
registry,
name="held",
retention_class="transient",
data=b"held bytes",
)
await registry.apply_retention_hold(package_id, reason="legal hold", actor="ops")
marked = await registry.sweep_deletion_eligibility(
now=datetime.now(UTC) + timedelta(seconds=1)
)
assert marked == []
assert await registry.collect_garbage() == []
assert await _location_statuses(engine) == {file_id: "recorded"}
stream = await registry.get_file(file_id)
assert await _consume(stream) == b"held bytes"

View File

@@ -0,0 +1,124 @@
---
id: ARTIFACT-STORE-WP-0006
type: workplan
title: "Garbage Collection And Reference Counting"
repo: artifact-store
domain: stack
status: done
owner: codex
topic_slug: stack
planning_priority: high
planning_order: 6
created: "2026-05-16"
updated: "2026-05-16"
state_hub_workstream_id: "ccef72e9-a160-45c0-9952-c64be7c8cfa4"
---
# ARTIFACT-STORE-WP-0006: Garbage Collection And Reference Counting
## Purpose
Turn WP-0003 deletion eligibility into actual byte reclamation while
preserving auditability and global content-addressed deduplication. GC
must never delete bytes still referenced by a non-deleted storage
location.
## Constraints
- ADR-0001: content-addressed storage with global deduplication.
- ADR-0002: event log is the source of truth; materialised views are
replayable.
- ADR-0004: byte deletion goes through the data plane, not through
registry-specific backend code.
- WP-0003 deletion eligibility and retention holds are the policy gate.
## Prerequisites
- WP-0001 through WP-0003 done.
- WP-0004 backend SPI delete exists for all configured backends.
## D6.1 - Reference-Counted GC Planner
```task
id: ARTIFACT-STORE-WP-0006-T001
status: done
priority: high
state_hub_task_id: "438ed392-0f07-46cb-a6f5-88ce57b33fce"
```
Acceptance:
- GC selects only packages whose `retention_state.eligible_for_deletion`
is true and `active_hold_id` is null.
- It computes references by `(backend_id, content_address)` across all
non-deleted storage locations.
- It releases an eligible package's storage locations without deleting
bytes that are still referenced elsewhere.
## D6.2 - Byte Deletion And Audit Events
```task
id: ARTIFACT-STORE-WP-0006-T002
status: done
priority: high
state_hub_task_id: "8f512753-c402-480a-8517-990fccf09295"
```
Acceptance:
- When the eligible package set owns the final reference to a content
address, GC calls `DataPlane.delete_object`.
- GC emits replayable audit events for every released storage location,
including whether the physical object was deleted or retained due to
remaining references.
- Replay marks released storage locations as `deleted` and packages as
`garbage_collected` once every storage location for that package is
deleted.
## D6.3 - Operator Command And Docs
```task
id: ARTIFACT-STORE-WP-0006-T003
status: done
priority: medium
state_hub_task_id: "a36dce56-f87b-431a-b875-fc567593ddd3"
```
Acceptance:
- `artifactstore retention gc` runs one GC pass and prints a JSON
summary.
- `docs/OPERATOR.md` documents the safe sequence:
`artifactstore retention sweep` then `artifactstore retention gc`.
- The command is idempotent: running it again after a clean pass does
not delete or rewrite anything.
## D6.4 - Verification Tests
```task
id: ARTIFACT-STORE-WP-0006-T004
status: done
priority: high
state_hub_task_id: "b2a2d94f-bc5a-47ca-b540-920d94bff06e"
```
Acceptance:
- Tests cover unique-object deletion, shared-object reference retention,
hold-protected packages, idempotent reruns, replay, and CLI output.
- Full `pytest`, `ruff`, and `mypy` pass.
## Verification
- Focused tests: `tests/integration/test_garbage_collection.py` and
`tests/integration/test_cli_commands.py` passed.
- `ruff check .` passed.
- `mypy src tests` passed.
## Success criteria
- Expired, unheld packages can be reclaimed without losing bytes still
referenced by retained packages.
- The event log explains every logical release and physical delete.
- A replayed database reconstructs the same `deleted` storage-location
state and `garbage_collected` package status.