Files
repo-scoping/tests/test_storage_migrations.py

181 lines
6.0 KiB
Python

import sqlite3
from repo_scoping.core.service import RegistryService
from repo_scoping.repo_ingestion.git import GitIngestionService
from repo_scoping.storage.sqlite import RegistryStore
def make_service(tmp_path):
store = RegistryStore(tmp_path / "registry.sqlite3")
store.initialize()
return RegistryService(store, ingestion=GitIngestionService(tmp_path / "checkouts"))
def test_initialize_is_idempotent_and_applies_expected_columns(tmp_path):
database_path = tmp_path / "registry.sqlite3"
store = RegistryStore(database_path)
store.initialize()
store.initialize()
with sqlite3.connect(database_path) as connection:
feature_columns = {
row[1] for row in connection.execute("PRAGMA table_info(approved_features)")
}
ability_columns = {
row[1] for row in connection.execute("PRAGMA table_info(approved_abilities)")
}
capability_columns = {
row[1]
for row in connection.execute("PRAGMA table_info(approved_capabilities)")
}
candidate_feature_columns = {
row[1] for row in connection.execute("PRAGMA table_info(candidate_features)")
}
evidence_columns = {
row[1] for row in connection.execute("PRAGMA table_info(approved_evidence)")
}
tables = {
row[0]
for row in connection.execute(
"SELECT name FROM sqlite_master WHERE type = 'table'"
)
}
assert "source_refs" in feature_columns
assert {"primary_class", "attributes"} <= ability_columns
assert {"primary_class", "attributes"} <= capability_columns
assert {"primary_class", "attributes"} <= feature_columns
assert {"primary_class", "attributes"} <= candidate_feature_columns
assert "source_refs" in evidence_columns
assert {
"target_kind",
"target_id",
"reference_kind",
"reference_id",
} <= evidence_columns
assert "content_chunks" in tables
assert "expectation_gaps" in tables
assert "repository_scopes" in tables
def test_approved_registry_schema_allows_future_nullable_vocabulary_ref(tmp_path):
database_path = tmp_path / "registry.sqlite3"
store = RegistryStore(database_path)
store.initialize()
approved_tables = {
"approved_abilities",
"approved_capabilities",
"approved_features",
}
with sqlite3.connect(database_path) as connection:
table_sql = {
table: connection.execute(
"SELECT sql FROM sqlite_master WHERE type = 'table' AND name = ?",
(table,),
).fetchone()[0]
for table in approved_tables
}
indexes = {
table: [
{
"name": row[1],
"unique": bool(row[2]),
"columns": [
column[2]
for column in connection.execute(
f"PRAGMA index_info({row[1]!r})"
)
],
}
for row in connection.execute(f"PRAGMA index_list({table})")
]
for table in approved_tables
}
foreign_keys = {
table: [
{
"from": row[3],
"to_table": row[2],
"to_column": row[4],
}
for row in connection.execute(f"PRAGMA foreign_key_list({table})")
]
for table in approved_tables | {"review_decisions"}
}
review_columns = {
row[1] for row in connection.execute("PRAGMA table_info(review_decisions)")
}
assert all("CHECK" not in sql.upper() for sql in table_sql.values())
for table_indexes in indexes.values():
assert all(
not (index["unique"] and index["columns"] == ["name"])
for index in table_indexes
)
for table_foreign_keys in foreign_keys.values():
assert all(key["to_column"] != "name" for key in table_foreign_keys)
assert all(key["from"] != "name" for key in table_foreign_keys)
assert {"repository_id", "analysis_run_id", "action", "notes"} <= review_columns
assert not {"ability_name", "capability_name", "feature_name"} & review_columns
def test_delete_repository_cascades_registry_and_review_rows(tmp_path):
service = make_service(tmp_path)
repository = service.register_repository(
name="Cascade",
url="https://example.com/cascade.git",
description="Cascade fixture.",
)
ability_id = service.add_ability(repository.id, name="Cascade Ability")
capability_id = service.add_capability(
repository.id,
ability_id,
name="Cascade Capability",
)
service.add_feature(
repository.id,
capability_id,
name="Cascade Feature",
type="API",
)
service.add_evidence(
repository.id,
capability_id,
type="test",
reference="tests/test_cascade.py",
)
run = service.store.create_analysis_run(repository.id)
service.store.create_review_decision(
repository.id,
run.id,
action="manual_test",
notes="Cascade review decision.",
)
service.store.create_expectation_gap(
repository.id,
run.id,
expected_type="capability",
expected_name="Expected Cascade Capability",
source="human",
)
service.delete_repository(repository.id)
with service.store.connect() as connection:
for table in (
"approved_abilities",
"repository_scopes",
"approved_capabilities",
"approved_features",
"approved_evidence",
"analysis_runs",
"content_chunks",
"expectation_gaps",
"review_decisions",
):
count = connection.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
assert count == 0, table