generated from coulomb/repo-seed
251 lines
8.6 KiB
Python
251 lines
8.6 KiB
Python
"""Retention lifecycle integration tests for ARTIFACT-STORE-WP-0003."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import AsyncIterator
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import cbor2
|
|
import pytest
|
|
import pytest_asyncio
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import insert
|
|
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
|
|
|
|
from artifactstore.api.http import create_app
|
|
from artifactstore.config import Settings
|
|
from artifactstore.dataplane import InProcessDataPlane
|
|
from artifactstore.db.schema import metadata, retention_classes
|
|
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
|
|
from artifactstore.events import RegistryViewWriter
|
|
from artifactstore.registry import Registry, RetentionStateError
|
|
from artifactstore.retention import RetentionPolicy
|
|
from artifactstore.storage import LocalBackend
|
|
|
|
AUTH = {"Authorization": "Bearer test-token"}
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def engine(tmp_path: Path) -> AsyncIterator[AsyncEngine]:
|
|
db_path = tmp_path / "retention.db"
|
|
eng = create_async_engine(f"sqlite+aiosqlite:///{db_path}")
|
|
async with eng.begin() as conn:
|
|
await conn.run_sync(metadata.create_all)
|
|
for seed in RETENTION_CLASS_SEEDS:
|
|
await conn.execute(insert(retention_classes).values(**seed))
|
|
yield eng
|
|
await eng.dispose()
|
|
|
|
|
|
@pytest.fixture
|
|
def registry(engine: AsyncEngine, tmp_path: Path) -> Registry:
|
|
backend = LocalBackend(tmp_path / "store", backend_id="local")
|
|
dataplane = InProcessDataPlane(backend, tmp_dir=tmp_path / "dp-tmp")
|
|
return Registry(
|
|
engine,
|
|
dataplane,
|
|
RegistryViewWriter(),
|
|
RetentionPolicy({"transient": 0}),
|
|
)
|
|
|
|
|
|
def _http_settings(tmp_path: Path, retention_config_path: Path | None = None) -> Settings:
|
|
db_path = tmp_path / "retention-http.db"
|
|
storage_root = tmp_path / "storage"
|
|
storage_root.mkdir(parents=True, exist_ok=True)
|
|
from sqlalchemy import create_engine
|
|
|
|
sync_engine = create_engine(f"sqlite:///{db_path}", future=True)
|
|
metadata.create_all(sync_engine)
|
|
with sync_engine.begin() as conn:
|
|
conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
|
|
sync_engine.dispose()
|
|
return Settings(
|
|
database_url=f"sqlite+aiosqlite:///{db_path}",
|
|
storage_local_root=str(storage_root),
|
|
log_level="INFO",
|
|
auth_tokens="test-token",
|
|
retention_config_path=str(retention_config_path or ""),
|
|
)
|
|
|
|
|
|
def _create_package(
|
|
client: TestClient,
|
|
*,
|
|
retention_class: str = "raw-evidence",
|
|
) -> dict[str, object]:
|
|
resp = client.post(
|
|
"/packages",
|
|
headers=AUTH,
|
|
json={
|
|
"name": "retention",
|
|
"producer": "tests",
|
|
"subject": "retention-subject",
|
|
"retention_class": retention_class,
|
|
"metadata": {},
|
|
},
|
|
)
|
|
assert resp.status_code == 201, resp.text
|
|
return dict(resp.json())
|
|
|
|
|
|
async def test_default_retention_and_permanent_record(registry: Registry) -> None:
|
|
transient_id = await registry.create_package(
|
|
name="short",
|
|
producer="tests",
|
|
subject="transient",
|
|
retention_class="transient",
|
|
actor="ops",
|
|
)
|
|
transient_state = await registry.get_retention_state(transient_id)
|
|
assert transient_state.current_expires_at is not None
|
|
assert transient_state.eligible_for_deletion is False
|
|
|
|
permanent_id = await registry.create_package(
|
|
name="forever",
|
|
producer="tests",
|
|
subject="permanent",
|
|
retention_class="permanent-record",
|
|
actor="ops",
|
|
)
|
|
permanent_state = await registry.get_retention_state(permanent_id)
|
|
assert permanent_state.current_expires_at is None
|
|
assert permanent_state.eligible_for_deletion is False
|
|
|
|
history = await registry.retention_history(transient_id)
|
|
assert [event.event_type for event in history] == ["v1.retention.default_applied"]
|
|
assert cbor2.loads(history[0].payload)["default_duration_seconds"] == 0
|
|
|
|
|
|
async def test_retention_extension_requires_later_expiry(registry: Registry) -> None:
|
|
package_id = await registry.create_package(
|
|
name="extend",
|
|
producer="tests",
|
|
subject="extension",
|
|
retention_class="transient",
|
|
actor="ops",
|
|
)
|
|
current = await registry.get_retention_state(package_id)
|
|
assert current.current_expires_at is not None
|
|
|
|
with pytest.raises(RetentionStateError, match="strictly later"):
|
|
await registry.extend_retention(
|
|
package_id,
|
|
new_expires_at=current.current_expires_at,
|
|
reason="not later",
|
|
actor="ops",
|
|
)
|
|
|
|
new_expiry = current.current_expires_at + timedelta(days=1)
|
|
extended = await registry.extend_retention(
|
|
package_id,
|
|
new_expires_at=new_expiry,
|
|
reason="needed for quarterly review",
|
|
actor="ops",
|
|
)
|
|
assert extended.current_expires_at == new_expiry
|
|
history = await registry.retention_history(package_id)
|
|
assert [event.event_type for event in history] == [
|
|
"v1.retention.default_applied",
|
|
"v1.retention.extended",
|
|
]
|
|
|
|
|
|
async def test_hold_release_and_sweeper_eligibility_transition(registry: Registry) -> None:
|
|
package_id = await registry.create_package(
|
|
name="held",
|
|
producer="tests",
|
|
subject="hold-release",
|
|
retention_class="transient",
|
|
actor="ops",
|
|
)
|
|
initial = await registry.get_retention_state(package_id)
|
|
assert initial.current_expires_at is not None
|
|
after_expiry = initial.current_expires_at + timedelta(seconds=5)
|
|
|
|
hold_id = await registry.apply_retention_hold(
|
|
package_id,
|
|
reason="quarterly hold",
|
|
actor="ops",
|
|
)
|
|
held = await registry.get_retention_state(package_id)
|
|
assert held.active_hold_id == hold_id
|
|
|
|
assert await registry.sweep_deletion_eligibility(now=after_expiry) == []
|
|
still_held = await registry.get_retention_state(package_id)
|
|
assert still_held.eligible_for_deletion is False
|
|
|
|
released = await registry.release_retention_hold(
|
|
package_id,
|
|
hold_id,
|
|
reason="hold complete",
|
|
actor="ops",
|
|
now=after_expiry,
|
|
)
|
|
assert released.active_hold_id is None
|
|
assert released.eligible_for_deletion is True
|
|
|
|
assert await registry.sweep_deletion_eligibility(now=after_expiry) == []
|
|
history = await registry.retention_history(package_id)
|
|
assert [event.event_type for event in history] == [
|
|
"v1.retention.default_applied",
|
|
"v1.retention.hold_applied",
|
|
"v1.retention.hold_released",
|
|
"v1.retention.deletion_eligible",
|
|
]
|
|
|
|
|
|
def test_http_retention_controls_and_history_formats(tmp_path: Path) -> None:
|
|
app = create_app(_http_settings(tmp_path))
|
|
with TestClient(app) as client:
|
|
package = _create_package(client)
|
|
package_id = str(package["id"])
|
|
current_expires_at = datetime.fromisoformat(str(package["expires_at"]))
|
|
new_expiry = current_expires_at + timedelta(days=7)
|
|
|
|
extended = client.post(
|
|
f"/packages/{package_id}/retention/extensions",
|
|
headers=AUTH,
|
|
json={
|
|
"new_expires_at": new_expiry.isoformat(),
|
|
"reason": "retain for release signoff",
|
|
},
|
|
)
|
|
assert extended.status_code == 200, extended.text
|
|
assert extended.json()["current_expires_at"] == new_expiry.isoformat()
|
|
|
|
hold = client.post(
|
|
f"/packages/{package_id}/retention/holds",
|
|
headers=AUTH,
|
|
json={"reason": "external audit"},
|
|
)
|
|
assert hold.status_code == 201, hold.text
|
|
hold_id = hold.json()["hold_id"]
|
|
|
|
released = client.post(
|
|
f"/packages/{package_id}/retention/holds/{hold_id}/release",
|
|
headers=AUTH,
|
|
json={"reason": "audit complete"},
|
|
)
|
|
assert released.status_code == 200, released.text
|
|
assert released.json()["active_hold_id"] is None
|
|
|
|
history_json = client.get(f"/packages/{package_id}/retention/history", headers=AUTH)
|
|
assert history_json.status_code == 200
|
|
assert [event["event_type"] for event in history_json.json()["events"]] == [
|
|
"v1.retention.default_applied",
|
|
"v1.retention.extended",
|
|
"v1.retention.hold_applied",
|
|
"v1.retention.hold_released",
|
|
]
|
|
|
|
history_cbor = client.get(
|
|
f"/packages/{package_id}/retention/history",
|
|
headers={**AUTH, "Accept": "application/cbor"},
|
|
)
|
|
assert history_cbor.status_code == 200
|
|
assert cbor2.loads(history_cbor.content)["events"][1]["event_type"] == (
|
|
"v1.retention.extended"
|
|
)
|