import sqlite3 from repo_registry.core.service import RegistryService from repo_registry.repo_ingestion.git import GitIngestionService from repo_registry.storage.sqlite import RegistryStore def make_service(tmp_path): store = RegistryStore(tmp_path / "registry.sqlite3") store.initialize() return RegistryService(store, ingestion=GitIngestionService(tmp_path / "checkouts")) def test_initialize_is_idempotent_and_applies_expected_columns(tmp_path): database_path = tmp_path / "registry.sqlite3" store = RegistryStore(database_path) store.initialize() store.initialize() with sqlite3.connect(database_path) as connection: feature_columns = { row[1] for row in connection.execute("PRAGMA table_info(approved_features)") } ability_columns = { row[1] for row in connection.execute("PRAGMA table_info(approved_abilities)") } capability_columns = { row[1] for row in connection.execute("PRAGMA table_info(approved_capabilities)") } candidate_feature_columns = { row[1] for row in connection.execute("PRAGMA table_info(candidate_features)") } evidence_columns = { row[1] for row in connection.execute("PRAGMA table_info(approved_evidence)") } tables = { row[0] for row in connection.execute( "SELECT name FROM sqlite_master WHERE type = 'table'" ) } assert "source_refs" in feature_columns assert {"primary_class", "attributes"} <= ability_columns assert {"primary_class", "attributes"} <= capability_columns assert {"primary_class", "attributes"} <= feature_columns assert {"primary_class", "attributes"} <= candidate_feature_columns assert "source_refs" in evidence_columns assert { "target_kind", "target_id", "reference_kind", "reference_id", } <= evidence_columns assert "content_chunks" in tables assert "expectation_gaps" in tables assert "repository_scopes" in tables def test_approved_registry_schema_allows_future_nullable_vocabulary_ref(tmp_path): database_path = tmp_path / "registry.sqlite3" store = RegistryStore(database_path) store.initialize() approved_tables = { "approved_abilities", "approved_capabilities", "approved_features", } with sqlite3.connect(database_path) as connection: table_sql = { table: connection.execute( "SELECT sql FROM sqlite_master WHERE type = 'table' AND name = ?", (table,), ).fetchone()[0] for table in approved_tables } indexes = { table: [ { "name": row[1], "unique": bool(row[2]), "columns": [ column[2] for column in connection.execute( f"PRAGMA index_info({row[1]!r})" ) ], } for row in connection.execute(f"PRAGMA index_list({table})") ] for table in approved_tables } foreign_keys = { table: [ { "from": row[3], "to_table": row[2], "to_column": row[4], } for row in connection.execute(f"PRAGMA foreign_key_list({table})") ] for table in approved_tables | {"review_decisions"} } review_columns = { row[1] for row in connection.execute("PRAGMA table_info(review_decisions)") } assert all("CHECK" not in sql.upper() for sql in table_sql.values()) for table_indexes in indexes.values(): assert all( not (index["unique"] and index["columns"] == ["name"]) for index in table_indexes ) for table_foreign_keys in foreign_keys.values(): assert all(key["to_column"] != "name" for key in table_foreign_keys) assert all(key["from"] != "name" for key in table_foreign_keys) assert {"repository_id", "analysis_run_id", "action", "notes"} <= review_columns assert not {"ability_name", "capability_name", "feature_name"} & review_columns def test_delete_repository_cascades_registry_and_review_rows(tmp_path): service = make_service(tmp_path) repository = service.register_repository( name="Cascade", url="https://example.com/cascade.git", description="Cascade fixture.", ) ability_id = service.add_ability(repository.id, name="Cascade Ability") capability_id = service.add_capability( repository.id, ability_id, name="Cascade Capability", ) service.add_feature( repository.id, capability_id, name="Cascade Feature", type="API", ) service.add_evidence( repository.id, capability_id, type="test", reference="tests/test_cascade.py", ) run = service.store.create_analysis_run(repository.id) service.store.create_review_decision( repository.id, run.id, action="manual_test", notes="Cascade review decision.", ) service.store.create_expectation_gap( repository.id, run.id, expected_type="capability", expected_name="Expected Cascade Capability", source="human", ) service.delete_repository(repository.id) with service.store.connect() as connection: for table in ( "approved_abilities", "repository_scopes", "approved_capabilities", "approved_features", "approved_evidence", "analysis_runs", "content_chunks", "expectation_gaps", "review_decisions", ): count = connection.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] assert count == 0, table