generated from coulomb/repo-seed
79 lines
2.4 KiB
Python
79 lines
2.4 KiB
Python
import sqlite3
|
|
|
|
from repo_registry.core.service import RegistryService
|
|
from repo_registry.repo_ingestion.git import GitIngestionService
|
|
from repo_registry.storage.sqlite import RegistryStore
|
|
|
|
|
|
def make_service(tmp_path):
|
|
store = RegistryStore(tmp_path / "registry.sqlite3")
|
|
store.initialize()
|
|
return RegistryService(store, ingestion=GitIngestionService(tmp_path / "checkouts"))
|
|
|
|
|
|
def test_initialize_is_idempotent_and_applies_expected_columns(tmp_path):
|
|
database_path = tmp_path / "registry.sqlite3"
|
|
store = RegistryStore(database_path)
|
|
|
|
store.initialize()
|
|
store.initialize()
|
|
|
|
with sqlite3.connect(database_path) as connection:
|
|
feature_columns = {
|
|
row[1] for row in connection.execute("PRAGMA table_info(approved_features)")
|
|
}
|
|
evidence_columns = {
|
|
row[1] for row in connection.execute("PRAGMA table_info(approved_evidence)")
|
|
}
|
|
|
|
assert "source_refs" in feature_columns
|
|
assert "source_refs" in evidence_columns
|
|
|
|
|
|
def test_delete_repository_cascades_registry_and_review_rows(tmp_path):
|
|
service = make_service(tmp_path)
|
|
repository = service.register_repository(
|
|
name="Cascade",
|
|
url="https://example.com/cascade.git",
|
|
description="Cascade fixture.",
|
|
)
|
|
ability_id = service.add_ability(repository.id, name="Cascade Ability")
|
|
capability_id = service.add_capability(
|
|
repository.id,
|
|
ability_id,
|
|
name="Cascade Capability",
|
|
)
|
|
service.add_feature(
|
|
repository.id,
|
|
capability_id,
|
|
name="Cascade Feature",
|
|
type="API",
|
|
)
|
|
service.add_evidence(
|
|
repository.id,
|
|
capability_id,
|
|
type="test",
|
|
reference="tests/test_cascade.py",
|
|
)
|
|
run = service.store.create_analysis_run(repository.id)
|
|
service.store.create_review_decision(
|
|
repository.id,
|
|
run.id,
|
|
action="manual_test",
|
|
notes="Cascade review decision.",
|
|
)
|
|
|
|
service.delete_repository(repository.id)
|
|
|
|
with service.store.connect() as connection:
|
|
for table in (
|
|
"approved_abilities",
|
|
"approved_capabilities",
|
|
"approved_features",
|
|
"approved_evidence",
|
|
"analysis_runs",
|
|
"review_decisions",
|
|
):
|
|
count = connection.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
|
|
assert count == 0, table
|