Files
artifact-store/tests/integration/test_retention_lifecycle.py

251 lines
8.6 KiB
Python

"""Retention lifecycle integration tests for ARTIFACT-STORE-WP-0003."""
from __future__ import annotations
from collections.abc import AsyncIterator
from datetime import datetime, timedelta
from pathlib import Path
import cbor2
import pytest
import pytest_asyncio
from fastapi.testclient import TestClient
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from artifactstore.api.http import create_app
from artifactstore.config import Settings
from artifactstore.dataplane import InProcessDataPlane
from artifactstore.db.schema import metadata, retention_classes
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
from artifactstore.events import RegistryViewWriter
from artifactstore.registry import Registry, RetentionStateError
from artifactstore.retention import RetentionPolicy
from artifactstore.storage import LocalBackend
AUTH = {"Authorization": "Bearer test-token"}
@pytest_asyncio.fixture
async def engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]:
db_path = tmp_path / "retention.db"
eng = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
async with eng.begin() as conn:
await conn.run_sync(metadata.create_all)
for seed in RETENTION_CLASS_SEEDS:
await conn.execute(insert(retention_classes).values(**seed))
yield eng
await eng.dispose()
@pytest.fixture
def registry(engine: AsyncEngine, tmp_path: Path) -> Registry:
backend = LocalBackend(tmp_path / "store", backend_id="local")
dataplane = InProcessDataPlane(backend, tmp_dir=tmp_path / "dp-tmp")
return Registry(
engine,
dataplane,
RegistryViewWriter(),
RetentionPolicy({"transient": 0}),
)
def _http_settings(tmp_path: Path, retention_config_path: Path | None = None) -> Settings:
db_path = tmp_path / "retention-http.db"
storage_root = tmp_path / "storage"
storage_root.mkdir(parents=True, exist_ok=True)
from sqlalchemy import create_engine
sync_engine = create_engine(f"sqlite:///{db_path}", future=True)
metadata.create_all(sync_engine)
with sync_engine.begin() as conn:
conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
sync_engine.dispose()
return Settings(
database_url=f"sqlite+aiosqlite:///{db_path}",
storage_local_root=str(storage_root),
log_level="INFO",
auth_tokens="test-token",
retention_config_path=str(retention_config_path or ""),
)
def _create_package(
client: TestClient,
*,
retention_class: str = "raw-evidence",
) -> dict[str, object]:
resp = client.post(
"/packages",
headers=AUTH,
json={
"name": "retention",
"producer": "tests",
"subject": "retention-subject",
"retention_class": retention_class,
"metadata": {},
},
)
assert resp.status_code == 201, resp.text
return dict(resp.json())
async def test_default_retention_and_permanent_record(registry: Registry) -> None:
transient_id = await registry.create_package(
name="short",
producer="tests",
subject="transient",
retention_class="transient",
actor="ops",
)
transient_state = await registry.get_retention_state(transient_id)
assert transient_state.current_expires_at is not None
assert transient_state.eligible_for_deletion is False
permanent_id = await registry.create_package(
name="forever",
producer="tests",
subject="permanent",
retention_class="permanent-record",
actor="ops",
)
permanent_state = await registry.get_retention_state(permanent_id)
assert permanent_state.current_expires_at is None
assert permanent_state.eligible_for_deletion is False
history = await registry.retention_history(transient_id)
assert [event.event_type for event in history] == ["v1.retention.default_applied"]
assert cbor2.loads(history[0].payload)["default_duration_seconds"] == 0
async def test_retention_extension_requires_later_expiry(registry: Registry) -> None:
package_id = await registry.create_package(
name="extend",
producer="tests",
subject="extension",
retention_class="transient",
actor="ops",
)
current = await registry.get_retention_state(package_id)
assert current.current_expires_at is not None
with pytest.raises(RetentionStateError, match="strictly later"):
await registry.extend_retention(
package_id,
new_expires_at=current.current_expires_at,
reason="not later",
actor="ops",
)
new_expiry = current.current_expires_at + timedelta(days=1)
extended = await registry.extend_retention(
package_id,
new_expires_at=new_expiry,
reason="needed for quarterly review",
actor="ops",
)
assert extended.current_expires_at == new_expiry
history = await registry.retention_history(package_id)
assert [event.event_type for event in history] == [
"v1.retention.default_applied",
"v1.retention.extended",
]
async def test_hold_release_and_sweeper_eligibility_transition(registry: Registry) -> None:
package_id = await registry.create_package(
name="held",
producer="tests",
subject="hold-release",
retention_class="transient",
actor="ops",
)
initial = await registry.get_retention_state(package_id)
assert initial.current_expires_at is not None
after_expiry = initial.current_expires_at + timedelta(seconds=5)
hold_id = await registry.apply_retention_hold(
package_id,
reason="quarterly hold",
actor="ops",
)
held = await registry.get_retention_state(package_id)
assert held.active_hold_id == hold_id
assert await registry.sweep_deletion_eligibility(now=after_expiry) == []
still_held = await registry.get_retention_state(package_id)
assert still_held.eligible_for_deletion is False
released = await registry.release_retention_hold(
package_id,
hold_id,
reason="hold complete",
actor="ops",
now=after_expiry,
)
assert released.active_hold_id is None
assert released.eligible_for_deletion is True
assert await registry.sweep_deletion_eligibility(now=after_expiry) == []
history = await registry.retention_history(package_id)
assert [event.event_type for event in history] == [
"v1.retention.default_applied",
"v1.retention.hold_applied",
"v1.retention.hold_released",
"v1.retention.deletion_eligible",
]
def test_http_retention_controls_and_history_formats(tmp_path: Path) -> None:
app = create_app(_http_settings(tmp_path))
with TestClient(app) as client:
package = _create_package(client)
package_id = str(package["id"])
current_expires_at = datetime.fromisoformat(str(package["expires_at"]))
new_expiry = current_expires_at + timedelta(days=7)
extended = client.post(
f"/packages/{package_id}/retention/extensions",
headers=AUTH,
json={
"new_expires_at": new_expiry.isoformat(),
"reason": "retain for release signoff",
},
)
assert extended.status_code == 200, extended.text
assert extended.json()["current_expires_at"] == new_expiry.isoformat()
hold = client.post(
f"/packages/{package_id}/retention/holds",
headers=AUTH,
json={"reason": "external audit"},
)
assert hold.status_code == 201, hold.text
hold_id = hold.json()["hold_id"]
released = client.post(
f"/packages/{package_id}/retention/holds/{hold_id}/release",
headers=AUTH,
json={"reason": "audit complete"},
)
assert released.status_code == 200, released.text
assert released.json()["active_hold_id"] is None
history_json = client.get(f"/packages/{package_id}/retention/history", headers=AUTH)
assert history_json.status_code == 200
assert [event["event_type"] for event in history_json.json()["events"]] == [
"v1.retention.default_applied",
"v1.retention.extended",
"v1.retention.hold_applied",
"v1.retention.hold_released",
]
history_cbor = client.get(
f"/packages/{package_id}/retention/history",
headers={**AUTH, "Accept": "application/cbor"},
)
assert history_cbor.status_code == 200
assert cbor2.loads(history_cbor.content)["events"][1]["event_type"] == (
"v1.retention.extended"
)