import sqlite3 from repo_registry.core.service import RegistryService from repo_registry.repo_ingestion.git import GitIngestionService from repo_registry.storage.sqlite import RegistryStore def make_service(tmp_path): store = RegistryStore(tmp_path / "registry.sqlite3") store.initialize() return RegistryService(store, ingestion=GitIngestionService(tmp_path / "checkouts")) def test_initialize_is_idempotent_and_applies_expected_columns(tmp_path): database_path = tmp_path / "registry.sqlite3" store = RegistryStore(database_path) store.initialize() store.initialize() with sqlite3.connect(database_path) as connection: feature_columns = { row[1] for row in connection.execute("PRAGMA table_info(approved_features)") } evidence_columns = { row[1] for row in connection.execute("PRAGMA table_info(approved_evidence)") } tables = { row[0] for row in connection.execute( "SELECT name FROM sqlite_master WHERE type = 'table'" ) } assert "source_refs" in feature_columns assert "source_refs" in evidence_columns assert "content_chunks" in tables def test_delete_repository_cascades_registry_and_review_rows(tmp_path): service = make_service(tmp_path) repository = service.register_repository( name="Cascade", url="https://example.com/cascade.git", description="Cascade fixture.", ) ability_id = service.add_ability(repository.id, name="Cascade Ability") capability_id = service.add_capability( repository.id, ability_id, name="Cascade Capability", ) service.add_feature( repository.id, capability_id, name="Cascade Feature", type="API", ) service.add_evidence( repository.id, capability_id, type="test", reference="tests/test_cascade.py", ) run = service.store.create_analysis_run(repository.id) service.store.create_review_decision( repository.id, run.id, action="manual_test", notes="Cascade review decision.", ) service.delete_repository(repository.id) with service.store.connect() as connection: for table in ( "approved_abilities", "approved_capabilities", "approved_features", "approved_evidence", "analysis_runs", "content_chunks", "review_decisions", ): count = connection.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] assert count == 0, table