search and inspection polish

This commit is contained in:
2026-04-25 23:59:38 +02:00
parent cc0eef21be
commit b8627c0e1d
5 changed files with 252 additions and 22 deletions

View File

@@ -1276,43 +1276,201 @@ class RegistryStore:
return RepositoryAbilityMap(repository=repository, abilities=abilities)
def search(self, query: str) -> list[SearchResult]:
needle = f"%{query.strip()}%"
if needle == "%%":
term = query.strip()
needle = f"%{term}%"
if not term:
return []
with self.connect() as connection:
rows = connection.execute(
repository_rows = connection.execute(
"""
SELECT r.id AS repository_id, r.name AS repository_name,
'repository' AS match_type, r.name AS match_name,
1.0 AS confidence
r.description
FROM repositories r
WHERE r.name LIKE ? OR COALESCE(r.description, '') LIKE ?
UNION ALL
SELECT r.id, r.name, 'ability', a.name, a.confidence
""",
(needle, needle),
).fetchall()
ability_rows = connection.execute(
"""
SELECT r.id AS repository_id, r.name AS repository_name,
a.id AS ability_id, a.name AS ability_name,
a.description AS ability_description, a.confidence
FROM approved_abilities a
JOIN repositories r ON r.id = a.repository_id
WHERE a.name LIKE ? OR a.description LIKE ?
UNION ALL
SELECT r.id, r.name, 'capability', c.name, c.confidence
""",
(needle, needle),
).fetchall()
capability_rows = connection.execute(
"""
SELECT r.id AS repository_id, r.name AS repository_name,
a.id AS ability_id, a.name AS ability_name,
c.id AS capability_id, c.name AS capability_name,
c.description AS capability_description, c.confidence
FROM approved_capabilities c
JOIN approved_abilities a ON a.id = c.ability_id
JOIN repositories r ON r.id = c.repository_id
WHERE c.name LIKE ? OR c.description LIKE ?
ORDER BY confidence DESC, repository_name ASC, match_name ASC
""",
(needle, needle, needle, needle, needle, needle),
(needle, needle),
).fetchall()
feature_rows = connection.execute(
"""
SELECT r.id AS repository_id, r.name AS repository_name,
a.id AS ability_id, a.name AS ability_name,
c.id AS capability_id, c.name AS capability_name,
f.name AS feature_name, f.type AS feature_type,
f.location, f.confidence
FROM approved_features f
JOIN approved_capabilities c ON c.id = f.capability_id
JOIN approved_abilities a ON a.id = c.ability_id
JOIN repositories r ON r.id = f.repository_id
WHERE f.name LIKE ? OR f.type LIKE ? OR f.location LIKE ?
""",
(needle, needle, needle),
).fetchall()
evidence_rows = connection.execute(
"""
SELECT r.id AS repository_id, r.name AS repository_name,
a.id AS ability_id, a.name AS ability_name,
c.id AS capability_id, c.name AS capability_name,
e.type AS evidence_type, e.reference, e.strength
FROM approved_evidence e
JOIN approved_capabilities c ON c.id = e.capability_id
JOIN approved_abilities a ON a.id = c.ability_id
JOIN repositories r ON r.id = e.repository_id
WHERE e.type LIKE ? OR e.reference LIKE ? OR e.strength LIKE ?
""",
(needle, needle, needle),
).fetchall()
return [
SearchResult(
repository_id=row["repository_id"],
repository_name=row["repository_name"],
match_type=row["match_type"],
match_name=row["match_name"],
confidence=row["confidence"],
results: list[SearchResult] = []
for row in repository_rows:
matched_field = (
"name" if self._matches(row["repository_name"], term) else "description"
)
for row in rows
]
results.append(
SearchResult(
repository_id=row["repository_id"],
repository_name=row["repository_name"],
match_type="repository",
match_name=row["repository_name"],
match_description=row["description"] or "",
matched_field=matched_field,
confidence=1.0,
)
)
for row in ability_rows:
matched_field = (
"name" if self._matches(row["ability_name"], term) else "description"
)
results.append(
SearchResult(
repository_id=row["repository_id"],
repository_name=row["repository_name"],
match_type="ability",
match_name=row["ability_name"],
match_description=row["ability_description"],
matched_field=matched_field,
confidence=row["confidence"],
ability_id=row["ability_id"],
ability_name=row["ability_name"],
)
)
for row in capability_rows:
matched_field = (
"name"
if self._matches(row["capability_name"], term)
else "description"
)
results.append(
SearchResult(
repository_id=row["repository_id"],
repository_name=row["repository_name"],
match_type="capability",
match_name=row["capability_name"],
match_description=row["capability_description"],
matched_field=matched_field,
confidence=row["confidence"],
ability_id=row["ability_id"],
ability_name=row["ability_name"],
capability_id=row["capability_id"],
capability_name=row["capability_name"],
)
)
for row in feature_rows:
matched_field = self._first_matched_field(
term,
{
"name": row["feature_name"],
"type": row["feature_type"],
"location": row["location"],
},
)
results.append(
SearchResult(
repository_id=row["repository_id"],
repository_name=row["repository_name"],
match_type="feature",
match_name=row["feature_name"],
match_description=row["feature_type"],
matched_field=matched_field,
confidence=row["confidence"],
ability_id=row["ability_id"],
ability_name=row["ability_name"],
capability_id=row["capability_id"],
capability_name=row["capability_name"],
source_reference=row["location"],
)
)
for row in evidence_rows:
matched_field = self._first_matched_field(
term,
{
"type": row["evidence_type"],
"reference": row["reference"],
"strength": row["strength"],
},
)
results.append(
SearchResult(
repository_id=row["repository_id"],
repository_name=row["repository_name"],
match_type="evidence",
match_name=row["reference"],
match_description=row["evidence_type"],
matched_field=matched_field,
confidence=self._evidence_confidence(row["strength"]),
ability_id=row["ability_id"],
ability_name=row["ability_name"],
capability_id=row["capability_id"],
capability_name=row["capability_name"],
evidence_level=row["strength"],
source_reference=row["reference"],
)
)
return sorted(
results,
key=lambda result: (
-result.confidence,
result.repository_name.lower(),
result.match_type,
result.match_name.lower(),
),
)
def _matches(self, value: str | None, term: str) -> bool:
return term.lower() in (value or "").lower()
def _first_matched_field(self, term: str, values: dict[str, str | None]) -> str:
for field, value in values.items():
if self._matches(value, term):
return field
return ""
def _evidence_confidence(self, strength: str) -> float:
return {"strong": 0.9, "medium": 0.6, "weak": 0.3}.get(strength, 0.5)
def _insert_facts(
self,