generated from coulomb/repo-seed
Schema (src/artifactstore/db/schema.py): - events table (ADR-0002 source of truth): sequence BIGSERIAL PK, created_at, event_type, subject_kind, subject_id, actor, payload (CBOR bytes), payload_digest. Indexes on (subject_kind, subject_id) and (event_type, sequence). - artifact_packages, artifact_files, storage_locations, retention_state (materialised views over events). - retention_classes (seed table) and metadata_schemas (config table). - ADR-0001 columns present: digest_algorithm, digest_primary, digest_sha256, content_address. Blueprint tiering columns present: retrieval_tier (default 'hot'), restore_status. - Types portable: SQLAlchemy 2.0 Core with JSON().with_variant(JSONB, 'postgresql'), Uuid, LargeBinary, DateTime(timezone=True), Boolean false() default. Seed (src/artifactstore/db/seed.py): five v1 retention classes (transient, raw-evidence, summary-evidence, release-evidence, permanent-record) with default durations in seconds; permanent-record has no expiry. Alembic: - alembic.ini with sync sqlite URL default; path_separator=os to silence the 1.13 deprecation warning. - migrations/env.py: translates async URLs (+aiosqlite, +asyncpg) to sync counterparts at migrate-time so a single ARTIFACTSTORE_DATABASE_URL works for both runtime (async) and Alembic (sync). - migrations/script.py.mako template. - migrations/versions/20260516_0001_initial.py: metadata.create_all + bulk insert of retention class seeds. Make: - make migrate: alembic upgrade head (ensures var/ exists). - make migrate-fresh: drop local SQLite + re-run. Deps: psycopg[binary] added as optional `postgres` extra (PostgreSQL prod path; SQLite default for dev needs no extra). Tests: - tests/unit/test_db_schema.py: every expected table present; ADR-0001 and tiering columns present; seed has the five v1 classes; permanent-record has no default_duration; create_all + FK insert + Boolean default round-trip on in-memory SQLite. - tests/integration/test_migrations.py: alembic upgrade head against a tempfile SQLite produces all tables (+ alembic_version) and the seed rows. Gates: ruff clean, mypy --strict clean on 32 files, 38 tests pass. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
181 lines
5.4 KiB
Python
181 lines
5.4 KiB
Python
"""Schema definition tests (ARTIFACT-STORE-WP-0001-T002).
|
|
|
|
Verifies that the SQLAlchemy metadata exposes every table named in the
|
|
architecture blueprint, with the columns required by ADR-0001 / ADR-0002,
|
|
plus a working create_all + seed insert against an in-memory SQLite.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine, insert, select
|
|
from sqlalchemy.engine import Engine
|
|
|
|
from artifactstore.db.schema import (
|
|
artifact_files,
|
|
artifact_packages,
|
|
events,
|
|
metadata,
|
|
metadata_schemas,
|
|
retention_classes,
|
|
retention_state,
|
|
storage_locations,
|
|
)
|
|
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
|
|
|
|
EXPECTED_TABLES = {
|
|
"events",
|
|
"retention_classes",
|
|
"metadata_schemas",
|
|
"artifact_packages",
|
|
"artifact_files",
|
|
"storage_locations",
|
|
"retention_state",
|
|
}
|
|
|
|
|
|
def test_all_expected_tables_present() -> None:
|
|
assert EXPECTED_TABLES.issubset(metadata.tables.keys())
|
|
|
|
|
|
def test_events_table_columns() -> None:
|
|
cols = {c.name for c in events.columns}
|
|
assert {
|
|
"sequence",
|
|
"created_at",
|
|
"event_type",
|
|
"subject_kind",
|
|
"subject_id",
|
|
"actor",
|
|
"payload",
|
|
"payload_digest",
|
|
}.issubset(cols)
|
|
|
|
|
|
def test_artifact_files_carries_adr_0001_columns() -> None:
|
|
cols = {c.name for c in artifact_files.columns}
|
|
assert {
|
|
"digest_algorithm",
|
|
"digest_primary",
|
|
"digest_sha256",
|
|
}.issubset(cols)
|
|
|
|
|
|
def test_storage_locations_carries_content_address_and_tiering() -> None:
|
|
cols = {c.name for c in storage_locations.columns}
|
|
assert {
|
|
"content_address",
|
|
"retrieval_tier",
|
|
"restore_status",
|
|
}.issubset(cols)
|
|
|
|
|
|
def test_metadata_schemas_table_present() -> None:
|
|
cols = {c.name for c in metadata_schemas.columns}
|
|
assert {"id", "slug", "json_schema"}.issubset(cols)
|
|
|
|
|
|
def test_retention_classes_seed_has_five_v1_entries() -> None:
|
|
class_ids = {s["class_id"] for s in RETENTION_CLASS_SEEDS}
|
|
assert class_ids == {
|
|
"transient",
|
|
"raw-evidence",
|
|
"summary-evidence",
|
|
"release-evidence",
|
|
"permanent-record",
|
|
}
|
|
|
|
|
|
def test_retention_classes_permanent_record_has_no_default_duration() -> None:
|
|
perm = next(s for s in RETENTION_CLASS_SEEDS if s["class_id"] == "permanent-record")
|
|
assert perm["default_duration_seconds"] is None
|
|
|
|
|
|
@pytest.fixture
|
|
def in_memory_engine() -> Engine:
|
|
engine = create_engine("sqlite:///:memory:", future=True)
|
|
metadata.create_all(engine)
|
|
return engine
|
|
|
|
|
|
def test_create_all_on_sqlite_produces_expected_tables(in_memory_engine: Engine) -> None:
|
|
with in_memory_engine.connect() as conn:
|
|
inspector_rows = conn.execute(
|
|
select(events).limit(0) # forces table reference resolution
|
|
)
|
|
# consume to ensure no error
|
|
inspector_rows.close()
|
|
table_names = set(metadata.tables.keys())
|
|
assert EXPECTED_TABLES.issubset(table_names)
|
|
|
|
|
|
def test_seed_round_trip_through_sqlite(in_memory_engine: Engine) -> None:
|
|
with in_memory_engine.begin() as conn:
|
|
conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
|
|
with in_memory_engine.connect() as conn:
|
|
rows = conn.execute(select(retention_classes)).all()
|
|
assert len(rows) == len(RETENTION_CLASS_SEEDS)
|
|
class_ids = {r.class_id for r in rows}
|
|
assert class_ids == {s["class_id"] for s in RETENTION_CLASS_SEEDS}
|
|
|
|
|
|
def test_artifact_package_fk_to_retention_classes(in_memory_engine: Engine) -> None:
|
|
with in_memory_engine.begin() as conn:
|
|
conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
|
|
pkg_id = uuid.uuid4()
|
|
conn.execute(
|
|
insert(artifact_packages).values(
|
|
id=pkg_id,
|
|
name="t",
|
|
producer="t",
|
|
subject="t",
|
|
retention_class="raw-evidence",
|
|
metadata_schema_id=None,
|
|
metadata={},
|
|
status="created",
|
|
manifest_digest=None,
|
|
last_event_sequence=1,
|
|
)
|
|
)
|
|
with in_memory_engine.connect() as conn:
|
|
rows = conn.execute(select(artifact_packages).where(artifact_packages.c.id == pkg_id)).all()
|
|
assert len(rows) == 1
|
|
assert rows[0].retention_class == "raw-evidence"
|
|
|
|
|
|
def test_retention_state_default_eligible_for_deletion_is_false(
|
|
in_memory_engine: Engine,
|
|
) -> None:
|
|
with in_memory_engine.begin() as conn:
|
|
conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
|
|
pkg_id = uuid.uuid4()
|
|
conn.execute(
|
|
insert(artifact_packages).values(
|
|
id=pkg_id,
|
|
name="t",
|
|
producer="t",
|
|
subject="t",
|
|
retention_class="raw-evidence",
|
|
metadata_schema_id=None,
|
|
metadata={},
|
|
status="created",
|
|
manifest_digest=None,
|
|
last_event_sequence=1,
|
|
)
|
|
)
|
|
conn.execute(
|
|
insert(retention_state).values(
|
|
package_id=pkg_id,
|
|
current_expires_at=None,
|
|
effective_class="raw-evidence",
|
|
active_hold_id=None,
|
|
)
|
|
)
|
|
with in_memory_engine.connect() as conn:
|
|
row = conn.execute(
|
|
select(retention_state).where(retention_state.c.package_id == pkg_id)
|
|
).one()
|
|
assert row.eligible_for_deletion is False
|