search filters and inspection polish

This commit is contained in:
2026-04-26 00:04:19 +02:00
parent b8627c0e1d
commit 8d1e1ff583
6 changed files with 209 additions and 22 deletions

View File

@@ -1275,35 +1275,55 @@ class RegistryStore:
]
return RepositoryAbilityMap(repository=repository, abilities=abilities)
def search(self, query: str) -> list[SearchResult]:
def search(
self,
query: str,
*,
status: str | None = None,
language: str | None = None,
framework: str | None = None,
) -> list[SearchResult]:
term = query.strip()
needle = f"%{term}%"
if not term:
return []
with self.connect() as connection:
repository_ids = self._search_filter_repository_ids(
connection,
status=status,
language=language,
framework=framework,
)
if repository_ids is not None and not repository_ids:
return []
repository_filter, repository_params = self._repository_filter_sql(
repository_ids,
)
repository_rows = connection.execute(
"""
f"""
SELECT r.id AS repository_id, r.name AS repository_name,
r.description
FROM repositories r
WHERE r.name LIKE ? OR COALESCE(r.description, '') LIKE ?
WHERE (r.name LIKE ? OR COALESCE(r.description, '') LIKE ?)
{repository_filter}
""",
(needle, needle),
(needle, needle, *repository_params),
).fetchall()
ability_rows = connection.execute(
"""
f"""
SELECT r.id AS repository_id, r.name AS repository_name,
a.id AS ability_id, a.name AS ability_name,
a.description AS ability_description, a.confidence
FROM approved_abilities a
JOIN repositories r ON r.id = a.repository_id
WHERE a.name LIKE ? OR a.description LIKE ?
WHERE (a.name LIKE ? OR a.description LIKE ?)
{repository_filter}
""",
(needle, needle),
(needle, needle, *repository_params),
).fetchall()
capability_rows = connection.execute(
"""
f"""
SELECT r.id AS repository_id, r.name AS repository_name,
a.id AS ability_id, a.name AS ability_name,
c.id AS capability_id, c.name AS capability_name,
@@ -1311,12 +1331,13 @@ class RegistryStore:
FROM approved_capabilities c
JOIN approved_abilities a ON a.id = c.ability_id
JOIN repositories r ON r.id = c.repository_id
WHERE c.name LIKE ? OR c.description LIKE ?
WHERE (c.name LIKE ? OR c.description LIKE ?)
{repository_filter}
""",
(needle, needle),
(needle, needle, *repository_params),
).fetchall()
feature_rows = connection.execute(
"""
f"""
SELECT r.id AS repository_id, r.name AS repository_name,
a.id AS ability_id, a.name AS ability_name,
c.id AS capability_id, c.name AS capability_name,
@@ -1326,12 +1347,13 @@ class RegistryStore:
JOIN approved_capabilities c ON c.id = f.capability_id
JOIN approved_abilities a ON a.id = c.ability_id
JOIN repositories r ON r.id = f.repository_id
WHERE f.name LIKE ? OR f.type LIKE ? OR f.location LIKE ?
WHERE (f.name LIKE ? OR f.type LIKE ? OR f.location LIKE ?)
{repository_filter}
""",
(needle, needle, needle),
(needle, needle, needle, *repository_params),
).fetchall()
evidence_rows = connection.execute(
"""
f"""
SELECT r.id AS repository_id, r.name AS repository_name,
a.id AS ability_id, a.name AS ability_name,
c.id AS capability_id, c.name AS capability_name,
@@ -1340,9 +1362,10 @@ class RegistryStore:
JOIN approved_capabilities c ON c.id = e.capability_id
JOIN approved_abilities a ON a.id = c.ability_id
JOIN repositories r ON r.id = e.repository_id
WHERE e.type LIKE ? OR e.reference LIKE ? OR e.strength LIKE ?
WHERE (e.type LIKE ? OR e.reference LIKE ? OR e.strength LIKE ?)
{repository_filter}
""",
(needle, needle, needle),
(needle, needle, needle, *repository_params),
).fetchall()
results: list[SearchResult] = []
@@ -1472,6 +1495,55 @@ class RegistryStore:
def _evidence_confidence(self, strength: str) -> float:
return {"strong": 0.9, "medium": 0.6, "weak": 0.3}.get(strength, 0.5)
def _search_filter_repository_ids(
self,
connection: sqlite3.Connection,
*,
status: str | None,
language: str | None,
framework: str | None,
) -> list[int] | None:
filters: list[set[int]] = []
if status:
rows = connection.execute(
"SELECT id FROM repositories WHERE status = ?",
(status,),
).fetchall()
filters.append({row["id"] for row in rows})
if language:
rows = connection.execute(
"""
SELECT DISTINCT repository_id
FROM observed_facts
WHERE kind = 'language' AND name LIKE ?
""",
(language,),
).fetchall()
filters.append({row["repository_id"] for row in rows})
if framework:
rows = connection.execute(
"""
SELECT DISTINCT repository_id
FROM observed_facts
WHERE kind = 'framework' AND name LIKE ?
""",
(framework,),
).fetchall()
filters.append({row["repository_id"] for row in rows})
if not filters:
return None
repository_ids = set.intersection(*filters)
return sorted(repository_ids)
def _repository_filter_sql(
self,
repository_ids: list[int] | None,
) -> tuple[str, tuple[int, ...]]:
if repository_ids is None:
return "", ()
placeholders = ", ".join("?" for _ in repository_ids)
return f"AND r.id IN ({placeholders})", tuple(repository_ids)
def _insert_facts(
self,
connection: sqlite3.Connection,