generated from coulomb/repo-seed
159 lines
5.1 KiB
Python
159 lines
5.1 KiB
Python
import sqlite3
|
|
|
|
from repo_registry.core.service import RegistryService
|
|
from repo_registry.repo_ingestion.git import GitIngestionService
|
|
from repo_registry.storage.sqlite import RegistryStore
|
|
|
|
|
|
def make_service(tmp_path):
|
|
store = RegistryStore(tmp_path / "registry.sqlite3")
|
|
store.initialize()
|
|
return RegistryService(store, ingestion=GitIngestionService(tmp_path / "checkouts"))
|
|
|
|
|
|
def test_initialize_is_idempotent_and_applies_expected_columns(tmp_path):
|
|
database_path = tmp_path / "registry.sqlite3"
|
|
store = RegistryStore(database_path)
|
|
|
|
store.initialize()
|
|
store.initialize()
|
|
|
|
with sqlite3.connect(database_path) as connection:
|
|
feature_columns = {
|
|
row[1] for row in connection.execute("PRAGMA table_info(approved_features)")
|
|
}
|
|
evidence_columns = {
|
|
row[1] for row in connection.execute("PRAGMA table_info(approved_evidence)")
|
|
}
|
|
tables = {
|
|
row[0]
|
|
for row in connection.execute(
|
|
"SELECT name FROM sqlite_master WHERE type = 'table'"
|
|
)
|
|
}
|
|
|
|
assert "source_refs" in feature_columns
|
|
assert "source_refs" in evidence_columns
|
|
assert "content_chunks" in tables
|
|
assert "expectation_gaps" in tables
|
|
|
|
|
|
def test_approved_registry_schema_allows_future_nullable_vocabulary_ref(tmp_path):
|
|
database_path = tmp_path / "registry.sqlite3"
|
|
store = RegistryStore(database_path)
|
|
store.initialize()
|
|
|
|
approved_tables = {
|
|
"approved_abilities",
|
|
"approved_capabilities",
|
|
"approved_features",
|
|
}
|
|
|
|
with sqlite3.connect(database_path) as connection:
|
|
table_sql = {
|
|
table: connection.execute(
|
|
"SELECT sql FROM sqlite_master WHERE type = 'table' AND name = ?",
|
|
(table,),
|
|
).fetchone()[0]
|
|
for table in approved_tables
|
|
}
|
|
indexes = {
|
|
table: [
|
|
{
|
|
"name": row[1],
|
|
"unique": bool(row[2]),
|
|
"columns": [
|
|
column[2]
|
|
for column in connection.execute(
|
|
f"PRAGMA index_info({row[1]!r})"
|
|
)
|
|
],
|
|
}
|
|
for row in connection.execute(f"PRAGMA index_list({table})")
|
|
]
|
|
for table in approved_tables
|
|
}
|
|
foreign_keys = {
|
|
table: [
|
|
{
|
|
"from": row[3],
|
|
"to_table": row[2],
|
|
"to_column": row[4],
|
|
}
|
|
for row in connection.execute(f"PRAGMA foreign_key_list({table})")
|
|
]
|
|
for table in approved_tables | {"review_decisions"}
|
|
}
|
|
review_columns = {
|
|
row[1] for row in connection.execute("PRAGMA table_info(review_decisions)")
|
|
}
|
|
|
|
assert all("CHECK" not in sql.upper() for sql in table_sql.values())
|
|
for table_indexes in indexes.values():
|
|
assert all(
|
|
not (index["unique"] and index["columns"] == ["name"])
|
|
for index in table_indexes
|
|
)
|
|
for table_foreign_keys in foreign_keys.values():
|
|
assert all(key["to_column"] != "name" for key in table_foreign_keys)
|
|
assert all(key["from"] != "name" for key in table_foreign_keys)
|
|
assert {"repository_id", "analysis_run_id", "action", "notes"} <= review_columns
|
|
assert not {"ability_name", "capability_name", "feature_name"} & review_columns
|
|
|
|
|
|
def test_delete_repository_cascades_registry_and_review_rows(tmp_path):
|
|
service = make_service(tmp_path)
|
|
repository = service.register_repository(
|
|
name="Cascade",
|
|
url="https://example.com/cascade.git",
|
|
description="Cascade fixture.",
|
|
)
|
|
ability_id = service.add_ability(repository.id, name="Cascade Ability")
|
|
capability_id = service.add_capability(
|
|
repository.id,
|
|
ability_id,
|
|
name="Cascade Capability",
|
|
)
|
|
service.add_feature(
|
|
repository.id,
|
|
capability_id,
|
|
name="Cascade Feature",
|
|
type="API",
|
|
)
|
|
service.add_evidence(
|
|
repository.id,
|
|
capability_id,
|
|
type="test",
|
|
reference="tests/test_cascade.py",
|
|
)
|
|
run = service.store.create_analysis_run(repository.id)
|
|
service.store.create_review_decision(
|
|
repository.id,
|
|
run.id,
|
|
action="manual_test",
|
|
notes="Cascade review decision.",
|
|
)
|
|
service.store.create_expectation_gap(
|
|
repository.id,
|
|
run.id,
|
|
expected_type="capability",
|
|
expected_name="Expected Cascade Capability",
|
|
source="human",
|
|
)
|
|
|
|
service.delete_repository(repository.id)
|
|
|
|
with service.store.connect() as connection:
|
|
for table in (
|
|
"approved_abilities",
|
|
"approved_capabilities",
|
|
"approved_features",
|
|
"approved_evidence",
|
|
"analysis_runs",
|
|
"content_chunks",
|
|
"expectation_gaps",
|
|
"review_decisions",
|
|
):
|
|
count = connection.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
|
|
assert count == 0, table
|