optional semantic retrieval

This commit is contained in:
2026-04-26 16:05:27 +02:00
parent 7c3cd2ab63
commit 1bac1832f0
11 changed files with 453 additions and 3 deletions

View File

@@ -245,6 +245,9 @@ class SearchResult:
capability_name: str | None = None
evidence_level: str | None = None
source_reference: str | None = None
text_score: float = 0.0
vector_score: float = 0.0
hybrid_score: float = 0.0
@dataclass(frozen=True)

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import asdict
from dataclasses import asdict, replace
from repo_registry.core.models import (
AbilitySummary,
@@ -30,6 +30,7 @@ from repo_registry.llm_extraction.mapper import LLMExtractionMapper
from repo_registry.repo_ingestion.git import GitIngestionService
from repo_registry.repo_ingestion.metadata import RepositoryMetadataExtractor
from repo_registry.repo_scanning.scanner import DeterministicScanner
from repo_registry.semantic import EmbeddingProvider, cosine_similarity
from repo_registry.storage.sqlite import RegistryStore
@@ -41,6 +42,7 @@ class RegistryService:
store: RegistryStore,
ingestion: GitIngestionService | None = None,
llm_extractor: LLMCandidateExtractor | None = None,
embedding_provider: EmbeddingProvider | None = None,
) -> None:
self.store = store
self.scanner = DeterministicScanner()
@@ -50,6 +52,7 @@ class RegistryService:
self.content_extractor = ContentExtractor()
self.llm_extractor = llm_extractor
self.llm_mapper = LLMExtractionMapper()
self.embedding_provider = embedding_provider
def register_repository(
self,
@@ -1319,7 +1322,7 @@ class RegistryService:
ability: str | None = None,
capability: str | None = None,
) -> list[SearchResult]:
return self.store.search(
text_results = self.store.search(
query,
status=status,
language=language,
@@ -1327,3 +1330,241 @@ class RegistryService:
ability=ability,
capability=capability,
)
if self.embedding_provider is None:
return text_results
return self._hybrid_search(
query,
text_results,
status=status,
language=language,
framework=framework,
ability=ability,
capability=capability,
)
def _hybrid_search(
self,
query: str,
text_results: list[SearchResult],
*,
status: str | None,
language: str | None,
framework: str | None,
ability: str | None,
capability: str | None,
) -> list[SearchResult]:
query_vector = self.embedding_provider.embed(query)
candidates = self._semantic_candidates(
status=status,
language=language,
framework=framework,
ability=ability,
capability=capability,
)
by_key = {
self._search_result_key(result): replace(
result,
text_score=max(result.text_score, 1.0),
hybrid_score=max(result.hybrid_score, result.confidence),
)
for result in text_results
}
for text, result in candidates:
vector_score = max(
0.0,
cosine_similarity(query_vector, self.embedding_provider.embed(text)),
)
if vector_score < 0.18:
continue
text_match = by_key.get(self._search_result_key(result))
text_score = text_match.text_score if text_match is not None else 0.0
hybrid_score = (
0.55 * text_score
+ 0.35 * vector_score
+ 0.10 * result.confidence
)
ranked = replace(
text_match or result,
vector_score=max(vector_score, (text_match or result).vector_score),
text_score=text_score,
hybrid_score=max(hybrid_score, (text_match or result).hybrid_score),
matched_field=(text_match or result).matched_field or "semantic",
)
by_key[self._search_result_key(ranked)] = ranked
return sorted(
by_key.values(),
key=lambda result: (
-result.hybrid_score,
-result.vector_score,
-result.confidence,
result.repository_name.lower(),
result.match_type,
result.match_name.lower(),
),
)
def _semantic_candidates(
self,
*,
status: str | None,
language: str | None,
framework: str | None,
ability: str | None,
capability: str | None,
) -> list[tuple[str, SearchResult]]:
candidates: list[tuple[str, SearchResult]] = []
for repository in self.store.list_repositories():
if status and repository.status != status:
continue
facts = self.store.list_observed_facts(repository.id)
if not self._repository_matches_observed_filter(facts, "language", language):
continue
if not self._repository_matches_observed_filter(facts, "framework", framework):
continue
ability_map = self.store.get_ability_map(repository.id)
if not self._ability_map_matches_filter(
ability_map,
ability=ability,
capability=capability,
):
continue
candidates.extend(self._approved_entry_candidates(ability_map))
candidates.extend(self._content_chunk_candidates(repository, ability_map))
return candidates
def _approved_entry_candidates(
self,
ability_map: RepositoryAbilityMap,
) -> list[tuple[str, SearchResult]]:
candidates: list[tuple[str, SearchResult]] = []
repository = ability_map.repository
for ability in ability_map.abilities:
ability_text = f"{ability.name} {ability.description}"
candidates.append(
(
ability_text,
SearchResult(
repository_id=repository.id,
repository_name=repository.name,
match_type="ability",
match_name=ability.name,
confidence=ability.confidence,
confidence_label=ability.confidence_label,
match_description=ability.description,
matched_field="semantic",
ability_id=ability.id,
ability_name=ability.name,
),
)
)
for capability in ability.capabilities:
capability_text = " ".join(
[
ability.name,
capability.name,
capability.description,
" ".join(capability.inputs),
" ".join(capability.outputs),
]
)
candidates.append(
(
capability_text,
SearchResult(
repository_id=repository.id,
repository_name=repository.name,
match_type="capability",
match_name=capability.name,
confidence=capability.confidence,
confidence_label=capability.confidence_label,
match_description=capability.description,
matched_field="semantic",
ability_id=ability.id,
ability_name=ability.name,
capability_id=capability.id,
capability_name=capability.name,
),
)
)
return candidates
def _content_chunk_candidates(
self,
repository: Repository,
ability_map: RepositoryAbilityMap,
) -> list[tuple[str, SearchResult]]:
if not ability_map.abilities:
return []
chunks = self.store.list_content_chunks(repository.id)
candidates: list[tuple[str, SearchResult]] = []
for chunk in chunks:
candidates.append(
(
chunk.text,
SearchResult(
repository_id=repository.id,
repository_name=repository.name,
match_type="content_chunk",
match_name=f"{chunk.path}:{chunk.start_line}-{chunk.end_line}",
confidence=0.5,
confidence_label="medium",
match_description=chunk.text[:240],
matched_field="semantic",
source_reference=f"{chunk.path}:{chunk.start_line}",
),
)
)
return candidates
def _repository_matches_observed_filter(
self,
facts: Sequence[ObservedFact],
kind: str,
expected: str | None,
) -> bool:
if not expected:
return True
expected_lower = expected.lower()
return any(
fact.kind == kind and expected_lower in fact.name.lower()
for fact in facts
)
def _ability_map_matches_filter(
self,
ability_map: RepositoryAbilityMap,
*,
ability: str | None,
capability: str | None,
) -> bool:
if not ability and not capability:
return True
ability_lower = ability.lower() if ability else None
capability_lower = capability.lower() if capability else None
for approved_ability in ability_map.abilities:
ability_matches = (
ability_lower is None
or ability_lower in approved_ability.name.lower()
or ability_lower in approved_ability.description.lower()
)
if not ability_matches:
continue
if capability_lower is None:
return True
for approved_capability in approved_ability.capabilities:
if (
capability_lower in approved_capability.name.lower()
or capability_lower in approved_capability.description.lower()
):
return True
return False
def _search_result_key(self, result: SearchResult) -> tuple[object, ...]:
return (
result.repository_id,
result.match_type,
result.ability_id,
result.capability_id,
result.match_name,
result.source_reference,
)