from __future__ import annotations from collections.abc import Sequence from dataclasses import asdict from repo_registry.core.models import ( AbilitySummary, AnalysisRunDiff, AnalysisRunDiffItem, AnalysisRunDiffSection, AnalysisRun, CapabilitySummary, CandidateAbility, CandidateCapability, CandidateEvidence, CandidateFeature, CandidateGraph, ContentChunk, ObservedFact, Repository, RepositoryAbilityMap, ReviewDecision, ScanSummary, SearchResult, ) from repo_registry.candidate_graph.generator import CandidateGraphGenerator from repo_registry.content_indexing.extractor import ContentExtractor from repo_registry.llm_extraction.extractor import LLMCandidateExtractor from repo_registry.llm_extraction.mapper import LLMExtractionMapper from repo_registry.repo_ingestion.git import GitIngestionService from repo_registry.repo_ingestion.metadata import RepositoryMetadataExtractor from repo_registry.repo_scanning.scanner import DeterministicScanner from repo_registry.storage.sqlite import RegistryStore class RegistryService: """Application service for the manual registry MVP.""" def __init__( self, store: RegistryStore, ingestion: GitIngestionService | None = None, llm_extractor: LLMCandidateExtractor | None = None, ) -> None: self.store = store self.scanner = DeterministicScanner() self.ingestion = ingestion or GitIngestionService() self.metadata_extractor = RepositoryMetadataExtractor() self.candidate_generator = CandidateGraphGenerator() self.content_extractor = ContentExtractor() self.llm_extractor = llm_extractor self.llm_mapper = LLMExtractionMapper() def register_repository( self, *, url: str, name: str | None = None, description: str | None = None, branch: str = "main", ) -> Repository: if name is None or description is None: checkout = self.ingestion.resolve(url, branch=branch) metadata = self.metadata_extractor.extract(checkout.source_path, url) else: metadata = None return self.store.create_repository( name=name or (metadata.name if metadata is not None else "repository"), url=url, description=description or (metadata.description if metadata is not None else None), branch=branch, ) def list_repositories(self) -> list[Repository]: return self.store.list_repositories() def get_repository(self, repository_id: int) -> Repository: return self.store.get_repository(repository_id) def update_repository( self, repository_id: int, *, name: str | None = None, description: str | None = None, branch: str | None = None, ) -> Repository: return self.store.update_repository( repository_id, name=name, description=description, branch=branch, ) def delete_repository(self, repository_id: int) -> None: self.store.delete_repository(repository_id) def analyze_repository( self, repository_id: int, *, source_path: str | None = None, ) -> ScanSummary: repository = self.store.get_repository(repository_id) run = self.store.create_analysis_run(repository_id) self.store.update_repository_status(repository_id, "analyzing") try: if source_path is None: checkout = self.ingestion.resolve(repository.url, branch=repository.branch) scan_source = checkout.source_path else: scan_source = source_path scan_result = self.scanner.scan(scan_source) except Exception as exc: failed_run = self.store.fail_analysis_run(repository_id, run.id, str(exc)) return ScanSummary(analysis_run=failed_run, snapshot=None, facts=[]) completed_run = self.store.complete_analysis_run( repository_id, run.id, scan_result, ) snapshot = ( self.store.get_snapshot(completed_run.snapshot_id) if completed_run.snapshot_id is not None else None ) facts = self.store.list_observed_facts(repository_id, completed_run.id) chunks = self.content_extractor.extract(scan_result.source_path, facts) self.store.replace_content_chunks( repository_id, completed_run.id, completed_run.snapshot_id, chunks, ) stored_chunks = self.store.list_content_chunks(repository_id, completed_run.id) try: candidates, candidate_source = self._generate_candidates( repository, facts, stored_chunks, ) except Exception as exc: self.store.create_review_decision( repository_id, completed_run.id, action="llm_extraction_failed", notes=str(exc), ) candidates = self.candidate_generator.generate( repository, facts, stored_chunks, ) candidate_source = "deterministic" self.store.replace_candidate_graph(repository_id, completed_run.id, candidates) if candidate_source == "llm": self.store.create_review_decision( repository_id, completed_run.id, action="llm_extraction_used", notes=f"Generated {len(candidates)} candidate ability draft(s).", ) return ScanSummary( analysis_run=completed_run, snapshot=snapshot, facts=facts, ) def _generate_candidates( self, repository: Repository, facts: list[ObservedFact], chunks: list[ContentChunk], ): if self.llm_extractor is not None: extracted = self.llm_extractor.extract(repository, chunks) if extracted: return self.llm_mapper.map(extracted, facts, chunks), "llm" return self.candidate_generator.generate(repository, facts, chunks), "deterministic" def list_analysis_runs(self, repository_id: int) -> list[AnalysisRun]: return self.store.list_analysis_runs(repository_id) def get_analysis_run(self, repository_id: int, analysis_run_id: int) -> AnalysisRun: return self.store.get_analysis_run(repository_id, analysis_run_id) def list_abilities(self) -> list[AbilitySummary]: return self.store.list_abilities() def list_capabilities(self) -> list[CapabilitySummary]: return self.store.list_capabilities() def list_review_decisions( self, repository_id: int, analysis_run_id: int | None = None, ) -> list[ReviewDecision]: return self.store.list_review_decisions(repository_id, analysis_run_id) def list_observed_facts( self, repository_id: int, analysis_run_id: int | None = None, ) -> list[ObservedFact]: return self.store.list_observed_facts(repository_id, analysis_run_id) def list_content_chunks( self, repository_id: int, analysis_run_id: int | None = None, ) -> list[ContentChunk]: return self.store.list_content_chunks(repository_id, analysis_run_id) def candidate_graph(self, repository_id: int, analysis_run_id: int) -> CandidateGraph: return self.store.get_candidate_graph(repository_id, analysis_run_id) def approve_candidate_graph( self, repository_id: int, analysis_run_id: int, *, notes: str = "", ) -> RepositoryAbilityMap: graph = self.store.get_candidate_graph(repository_id, analysis_run_id) pending_abilities = [ ability for ability in graph.abilities if ability.status == "candidate" ] for ability in pending_abilities: approved_ability_id = self.store.create_ability( repository_id, name=ability.name, description=ability.description, confidence=ability.confidence, ) for capability in ability.capabilities: if capability.status != "candidate": continue approved_capability_id = self.store.create_capability( repository_id, approved_ability_id, name=capability.name, description=capability.description, inputs=capability.inputs, outputs=capability.outputs, confidence=capability.confidence, ) for feature in capability.features: if feature.status != "candidate": continue self.store.create_feature( repository_id, approved_capability_id, name=feature.name, type=feature.type, location=feature.location, confidence=feature.confidence, source_refs=feature.source_refs, ) for evidence in capability.evidence: if evidence.status != "candidate": continue self.store.create_evidence( repository_id, approved_capability_id, type=evidence.type, reference=evidence.reference, strength=evidence.strength, source_refs=evidence.source_refs, ) if pending_abilities: self.store.mark_candidate_graph_status( repository_id, analysis_run_id, "approved", ) self.store.create_review_decision( repository_id, analysis_run_id, action="approve_candidate_graph", notes=notes, ) self.store.update_repository_status(repository_id, "indexed") return self.store.get_ability_map(repository_id) def diff_analysis_runs( self, repository_id: int, base_analysis_run_id: int, target_analysis_run_id: int, ) -> AnalysisRunDiff: repository = self.store.get_repository(repository_id) base_run = self.store.get_analysis_run(repository_id, base_analysis_run_id) target_run = self.store.get_analysis_run(repository_id, target_analysis_run_id) base_graph = self.store.get_candidate_graph(repository_id, base_analysis_run_id) target_graph = self.store.get_candidate_graph(repository_id, target_analysis_run_id) approved_map = self.store.get_ability_map(repository_id) return AnalysisRunDiff( repository=repository, base_run=base_run, target_run=target_run, facts=self._diff_items( self._fact_index( self.store.list_observed_facts(repository_id, base_analysis_run_id) ), self._fact_index( self.store.list_observed_facts(repository_id, target_analysis_run_id) ), ), chunks=self._diff_items( self._chunk_index( self.store.list_content_chunks(repository_id, base_analysis_run_id) ), self._chunk_index( self.store.list_content_chunks(repository_id, target_analysis_run_id) ), ), candidates=self._diff_items( self._candidate_index(base_graph.abilities), self._candidate_index(target_graph.abilities), ), approved_entries=self._diff_items( self._approved_index(approved_map.abilities), self._candidate_index(target_graph.abilities), ), ) def approve_analysis_run_changes( self, repository_id: int, analysis_run_id: int, *, notes: str = "", ) -> RepositoryAbilityMap: graph = self.store.get_candidate_graph(repository_id, analysis_run_id) self.store.replace_approved_from_candidate_graph(repository_id, graph) self.store.mark_candidate_graph_status(repository_id, analysis_run_id, "approved") self.store.create_review_decision( repository_id, analysis_run_id, action="approve_analysis_run_changes", notes=notes, ) self.store.update_repository_status(repository_id, "indexed") return self.store.get_ability_map(repository_id) def reject_candidate_ability( self, repository_id: int, analysis_run_id: int, candidate_ability_id: int, *, notes: str = "", ) -> CandidateGraph: self.store.reject_candidate_ability( repository_id, analysis_run_id, candidate_ability_id, ) self.store.create_review_decision( repository_id, analysis_run_id, action="reject_candidate_ability", notes=notes, ) self.store.update_repository_status(repository_id, "reviewing") return self.store.get_candidate_graph(repository_id, analysis_run_id) def reject_candidate_capability( self, repository_id: int, analysis_run_id: int, candidate_capability_id: int, *, notes: str = "", ) -> CandidateGraph: self.store.reject_candidate_capability( repository_id, analysis_run_id, candidate_capability_id, ) self.store.create_review_decision( repository_id, analysis_run_id, action="reject_candidate_capability", notes=notes, ) self.store.update_repository_status(repository_id, "reviewing") return self.store.get_candidate_graph(repository_id, analysis_run_id) def reject_candidate_feature( self, repository_id: int, analysis_run_id: int, candidate_feature_id: int, *, notes: str = "", ) -> CandidateGraph: self.store.reject_candidate_feature( repository_id, analysis_run_id, candidate_feature_id, ) self.store.create_review_decision( repository_id, analysis_run_id, action="reject_candidate_feature", notes=notes, ) self.store.update_repository_status(repository_id, "reviewing") return self.store.get_candidate_graph(repository_id, analysis_run_id) def reject_candidate_evidence( self, repository_id: int, analysis_run_id: int, candidate_evidence_id: int, *, notes: str = "", ) -> CandidateGraph: self.store.reject_candidate_evidence( repository_id, analysis_run_id, candidate_evidence_id, ) self.store.create_review_decision( repository_id, analysis_run_id, action="reject_candidate_evidence", notes=notes, ) self.store.update_repository_status(repository_id, "reviewing") return self.store.get_candidate_graph(repository_id, analysis_run_id) def edit_candidate_ability( self, repository_id: int, analysis_run_id: int, candidate_ability_id: int, *, name: str, description: str, confidence: float, notes: str = "", ) -> CandidateGraph: self.store.update_candidate_ability( repository_id, analysis_run_id, candidate_ability_id, name=name, description=description, confidence=confidence, ) self.store.create_review_decision( repository_id, analysis_run_id, action="edit_candidate_ability", notes=notes, ) self.store.update_repository_status(repository_id, "reviewing") return self.store.get_candidate_graph(repository_id, analysis_run_id) def edit_candidate_capability( self, repository_id: int, analysis_run_id: int, candidate_capability_id: int, *, name: str, description: str, confidence: float, notes: str = "", ) -> CandidateGraph: self.store.update_candidate_capability( repository_id, analysis_run_id, candidate_capability_id, name=name, description=description, confidence=confidence, ) self.store.create_review_decision( repository_id, analysis_run_id, action="edit_candidate_capability", notes=notes, ) self.store.update_repository_status(repository_id, "reviewing") return self.store.get_candidate_graph(repository_id, analysis_run_id) def relink_candidate_capability( self, repository_id: int, analysis_run_id: int, candidate_capability_id: int, *, target_ability_id: int, notes: str = "", ) -> CandidateGraph: self.store.relink_candidate_capability( repository_id, analysis_run_id, candidate_capability_id, target_ability_id, ) self.store.create_review_decision( repository_id, analysis_run_id, action="relink_candidate_capability", notes=notes, ) self.store.update_repository_status(repository_id, "reviewing") return self.store.get_candidate_graph(repository_id, analysis_run_id) def relink_candidate_feature( self, repository_id: int, analysis_run_id: int, candidate_feature_id: int, *, target_capability_id: int, notes: str = "", ) -> CandidateGraph: self.store.relink_candidate_feature( repository_id, analysis_run_id, candidate_feature_id, target_capability_id, ) self.store.create_review_decision( repository_id, analysis_run_id, action="relink_candidate_feature", notes=notes, ) self.store.update_repository_status(repository_id, "reviewing") return self.store.get_candidate_graph(repository_id, analysis_run_id) def relink_candidate_evidence( self, repository_id: int, analysis_run_id: int, candidate_evidence_id: int, *, target_capability_id: int, notes: str = "", ) -> CandidateGraph: self.store.relink_candidate_evidence( repository_id, analysis_run_id, candidate_evidence_id, target_capability_id, ) self.store.create_review_decision( repository_id, analysis_run_id, action="relink_candidate_evidence", notes=notes, ) self.store.update_repository_status(repository_id, "reviewing") return self.store.get_candidate_graph(repository_id, analysis_run_id) def merge_candidate_ability( self, repository_id: int, analysis_run_id: int, source_ability_id: int, *, target_ability_id: int, notes: str = "", ) -> CandidateGraph: self.store.merge_candidate_ability( repository_id, analysis_run_id, source_ability_id, target_ability_id, ) self.store.create_review_decision( repository_id, analysis_run_id, action="merge_candidate_ability", notes=notes, ) self.store.update_repository_status(repository_id, "reviewing") return self.store.get_candidate_graph(repository_id, analysis_run_id) def merge_candidate_capability( self, repository_id: int, analysis_run_id: int, source_capability_id: int, *, target_capability_id: int, notes: str = "", ) -> CandidateGraph: self.store.merge_candidate_capability( repository_id, analysis_run_id, source_capability_id, target_capability_id, ) self.store.create_review_decision( repository_id, analysis_run_id, action="merge_candidate_capability", notes=notes, ) self.store.update_repository_status(repository_id, "reviewing") return self.store.get_candidate_graph(repository_id, analysis_run_id) def merge_candidate_feature( self, repository_id: int, analysis_run_id: int, source_feature_id: int, *, target_feature_id: int, notes: str = "", ) -> CandidateGraph: self.store.merge_candidate_feature( repository_id, analysis_run_id, source_feature_id, target_feature_id, ) self.store.create_review_decision( repository_id, analysis_run_id, action="merge_candidate_feature", notes=notes, ) self.store.update_repository_status(repository_id, "reviewing") return self.store.get_candidate_graph(repository_id, analysis_run_id) def merge_candidate_evidence( self, repository_id: int, analysis_run_id: int, source_evidence_id: int, *, target_evidence_id: int, notes: str = "", ) -> CandidateGraph: self.store.merge_candidate_evidence( repository_id, analysis_run_id, source_evidence_id, target_evidence_id, ) self.store.create_review_decision( repository_id, analysis_run_id, action="merge_candidate_evidence", notes=notes, ) self.store.update_repository_status(repository_id, "reviewing") return self.store.get_candidate_graph(repository_id, analysis_run_id) def add_ability( self, repository_id: int, *, name: str, description: str = "", confidence: float = 1.0, ) -> int: self.store.get_repository(repository_id) return self.store.create_ability( repository_id, name=name, description=description, confidence=confidence, ) def update_ability( self, repository_id: int, ability_id: int, *, name: str | None = None, description: str | None = None, confidence: float | None = None, ) -> RepositoryAbilityMap: self.store.update_ability( repository_id, ability_id, name=name, description=description, confidence=confidence, ) return self.store.get_ability_map(repository_id) def delete_ability( self, repository_id: int, ability_id: int, ) -> RepositoryAbilityMap: self.store.delete_ability(repository_id, ability_id) return self.store.get_ability_map(repository_id) def add_capability( self, repository_id: int, ability_id: int, *, name: str, description: str = "", inputs: Sequence[str] = (), outputs: Sequence[str] = (), confidence: float = 1.0, ) -> int: self.store.ensure_ability(repository_id, ability_id) return self.store.create_capability( repository_id, ability_id, name=name, description=description, inputs=list(inputs), outputs=list(outputs), confidence=confidence, ) def update_capability( self, repository_id: int, capability_id: int, *, name: str | None = None, description: str | None = None, inputs: Sequence[str] | None = None, outputs: Sequence[str] | None = None, confidence: float | None = None, ) -> RepositoryAbilityMap: self.store.update_capability( repository_id, capability_id, name=name, description=description, inputs=list(inputs) if inputs is not None else None, outputs=list(outputs) if outputs is not None else None, confidence=confidence, ) return self.store.get_ability_map(repository_id) def delete_capability( self, repository_id: int, capability_id: int, ) -> RepositoryAbilityMap: self.store.delete_capability(repository_id, capability_id) return self.store.get_ability_map(repository_id) def add_feature( self, repository_id: int, capability_id: int, *, name: str, type: str, location: str = "", confidence: float = 1.0, ) -> int: self.store.ensure_capability(repository_id, capability_id) return self.store.create_feature( repository_id, capability_id, name=name, type=type, location=location, confidence=confidence, ) def update_feature( self, repository_id: int, feature_id: int, *, name: str | None = None, type: str | None = None, location: str | None = None, confidence: float | None = None, ) -> RepositoryAbilityMap: self.store.update_feature( repository_id, feature_id, name=name, type=type, location=location, confidence=confidence, ) return self.store.get_ability_map(repository_id) def delete_feature( self, repository_id: int, feature_id: int, ) -> RepositoryAbilityMap: self.store.delete_feature(repository_id, feature_id) return self.store.get_ability_map(repository_id) def add_evidence( self, repository_id: int, capability_id: int, *, type: str, reference: str, strength: str = "medium", ) -> int: self.store.ensure_capability(repository_id, capability_id) return self.store.create_evidence( repository_id, capability_id, type=type, reference=reference, strength=strength, ) def update_evidence( self, repository_id: int, evidence_id: int, *, type: str | None = None, reference: str | None = None, strength: str | None = None, ) -> RepositoryAbilityMap: self.store.update_evidence( repository_id, evidence_id, type=type, reference=reference, strength=strength, ) return self.store.get_ability_map(repository_id) def delete_evidence( self, repository_id: int, evidence_id: int, ) -> RepositoryAbilityMap: self.store.delete_evidence(repository_id, evidence_id) return self.store.get_ability_map(repository_id) def ability_map(self, repository_id: int) -> RepositoryAbilityMap: return self.store.get_ability_map(repository_id) def compare_repositories(self, repository_ids: Sequence[int]) -> dict[str, object]: maps = [self.store.get_ability_map(repository_id) for repository_id in repository_ids] ability_groups: dict[str, list[dict[str, object]]] = {} capability_groups: dict[str, list[dict[str, object]]] = {} for ability_map in maps: repository = ability_map.repository for ability in ability_map.abilities: ability_groups.setdefault(ability.name.lower(), []).append( { "repository_id": repository.id, "repository_name": repository.name, "confidence": ability.confidence, "confidence_label": ability.confidence_label, "capabilities": [ { "name": capability.name, "confidence": capability.confidence, "confidence_label": capability.confidence_label, "evidence_count": len(capability.evidence), } for capability in ability.capabilities ], "_name": ability.name, } ) for capability in ability.capabilities: capability_groups.setdefault(capability.name.lower(), []).append( { "repository_id": repository.id, "repository_name": repository.name, "ability_name": ability.name, "capability_name": capability.name, } ) abilities = [ { "name": repositories[0]["_name"], "repositories": [ { key: value for key, value in repository.items() if key != "_name" } for repository in repositories ], } for repositories in ability_groups.values() ] unique_capabilities = [ entries[0] for entries in capability_groups.values() if len({entry["repository_id"] for entry in entries}) == 1 ] return { "repositories": [asdict(ability_map.repository) for ability_map in maps], "abilities": sorted(abilities, key=lambda item: item["name"]), "unique_capabilities": sorted( unique_capabilities, key=lambda item: (item["repository_name"], item["capability_name"]), ), } def detect_capability_gaps( self, *, desired_ability: str, desired_capabilities: Sequence[str], repository_ids: Sequence[int] | None = None, ) -> dict[str, object]: repositories = ( [self.store.get_repository(repository_id) for repository_id in repository_ids] if repository_ids is not None else self.store.list_repositories() ) maps = [self.store.get_ability_map(repository.id) for repository in repositories] desired = [capability.strip() for capability in desired_capabilities if capability.strip()] capability_matches: dict[str, list[dict[str, object]]] = {name.lower(): [] for name in desired} duplicate_index: dict[str, set[str]] = {} weak: list[dict[str, object]] = [] for ability_map in maps: repository = ability_map.repository for ability in ability_map.abilities: for capability in ability.capabilities: key = capability.name.lower() duplicate_index.setdefault(key, set()).add(repository.name) if key in capability_matches: capability_matches[key].append( { "repository_id": repository.id, "repository_name": repository.name, "capability": capability, } ) strengths = {evidence.strength for evidence in capability.evidence} if "strong" not in strengths: weak.append( { "capability": capability.name, "repository_id": repository.id, "repository_name": repository.name, "evidence_count": len(capability.evidence), "strongest_evidence": self._strongest_evidence(strengths), "confidence": capability.confidence, "confidence_label": capability.confidence_label, } ) matched = [ { "capability": name, "repositories": [ match["repository_name"] for match in capability_matches[name.lower()] ], } for name in desired if capability_matches[name.lower()] ] missing = [name for name in desired if not capability_matches[name.lower()]] duplicates = [ { "capability": capability, "repositories": sorted(repositories), } for capability, repositories in duplicate_index.items() if len(repositories) > 1 and capability in capability_matches ] return { "desired_ability": desired_ability, "matched_capabilities": matched, "missing_capabilities": missing, "weakly_evidenced_capabilities": weak, "duplicate_capabilities": duplicates, } def export_registry_entry(self, repository_id: int) -> str: ability_map = self.store.get_ability_map(repository_id) lines = [ "repository:", f" name: {self._yaml_scalar(ability_map.repository.name)}", f" url: {self._yaml_scalar(ability_map.repository.url)}", f" branch: {self._yaml_scalar(ability_map.repository.branch)}", f" status: {self._yaml_scalar(ability_map.repository.status)}", "abilities:", ] for ability in ability_map.abilities: lines.extend( [ f" - name: {self._yaml_scalar(ability.name)}", f" description: {self._yaml_scalar(ability.description)}", f" confidence: {ability.confidence}", f" confidence_label: {self._yaml_scalar(ability.confidence_label)}", " capabilities:", ] ) for capability in ability.capabilities: lines.extend( [ f" - name: {self._yaml_scalar(capability.name)}", f" description: {self._yaml_scalar(capability.description)}", f" confidence: {capability.confidence}", f" confidence_label: {self._yaml_scalar(capability.confidence_label)}", f" inputs: {self._yaml_list(capability.inputs)}", f" outputs: {self._yaml_list(capability.outputs)}", " features:", ] ) for feature in capability.features: lines.extend( [ f" - name: {self._yaml_scalar(feature.name)}", f" type: {self._yaml_scalar(feature.type)}", f" location: {self._yaml_scalar(feature.location)}", f" confidence: {feature.confidence}", f" confidence_label: {self._yaml_scalar(feature.confidence_label)}", ] ) lines.append(" evidence:") for evidence in capability.evidence: lines.extend( [ f" - type: {self._yaml_scalar(evidence.type)}", f" reference: {self._yaml_scalar(evidence.reference)}", f" strength: {self._yaml_scalar(evidence.strength)}", ] ) return "\n".join(lines) + "\n" def _strongest_evidence(self, strengths: set[str]) -> str | None: for strength in ("strong", "medium", "weak"): if strength in strengths: return strength return None def _diff_items( self, base: dict[str, dict[str, object]], target: dict[str, dict[str, object]], ) -> AnalysisRunDiffSection: added: list[AnalysisRunDiffItem] = [] removed: list[AnalysisRunDiffItem] = [] changed: list[AnalysisRunDiffItem] = [] weakened: list[AnalysisRunDiffItem] = [] for key in sorted(target.keys() - base.keys()): added.append( AnalysisRunDiffItem( change_type="added", item_type=str(target[key]["item_type"]), key=key, target=target[key], ) ) for key in sorted(base.keys() - target.keys()): removed.append( AnalysisRunDiffItem( change_type="removed", item_type=str(base[key]["item_type"]), key=key, base=base[key], ) ) for key in sorted(base.keys() & target.keys()): if base[key] == target[key]: continue item = AnalysisRunDiffItem( change_type="weakened" if self._is_weakened(base[key], target[key]) else "changed", item_type=str(target[key]["item_type"]), key=key, base=base[key], target=target[key], ) if item.change_type == "weakened": weakened.append(item) else: changed.append(item) return AnalysisRunDiffSection( added=added, removed=removed, changed=changed, weakened=weakened, ) def _is_weakened( self, base: dict[str, object], target: dict[str, object], ) -> bool: base_confidence = base.get("confidence") target_confidence = target.get("confidence") if ( isinstance(base_confidence, int | float) and isinstance(target_confidence, int | float) and target_confidence < base_confidence ): return True base_strength = base.get("strength") target_strength = target.get("strength") strength_order = {"weak": 1, "medium": 2, "strong": 3} return ( isinstance(base_strength, str) and isinstance(target_strength, str) and strength_order.get(target_strength, 0) < strength_order.get(base_strength, 0) ) def _fact_index(self, facts: Sequence[ObservedFact]) -> dict[str, dict[str, object]]: return { f"fact:{fact.kind}:{fact.path}:{fact.name}": { "item_type": "fact", "kind": fact.kind, "path": fact.path, "name": fact.name, "value": fact.value, "metadata": fact.metadata, } for fact in facts } def _chunk_index( self, chunks: Sequence[ContentChunk], ) -> dict[str, dict[str, object]]: return { f"chunk:{chunk.kind}:{chunk.path}:{chunk.start_line}:{chunk.end_line}": { "item_type": "chunk", "kind": chunk.kind, "path": chunk.path, "start_line": chunk.start_line, "end_line": chunk.end_line, "text": chunk.text, } for chunk in chunks } def _candidate_index( self, abilities: Sequence[CandidateAbility], ) -> dict[str, dict[str, object]]: index: dict[str, dict[str, object]] = {} for ability in abilities: ability_key = self._entry_key("ability", ability.name) index[ability_key] = { "item_type": "ability", "name": ability.name, "description": ability.description, "confidence": ability.confidence, "status": ability.status, } for capability in ability.capabilities: capability_key = self._entry_key( "capability", ability.name, capability.name, ) index[capability_key] = { "item_type": "capability", "ability_name": ability.name, "name": capability.name, "description": capability.description, "inputs": capability.inputs, "outputs": capability.outputs, "confidence": capability.confidence, "status": capability.status, } self._index_candidate_leaves(index, ability, capability) return index def _index_candidate_leaves( self, index: dict[str, dict[str, object]], ability: CandidateAbility, capability: CandidateCapability, ) -> None: for feature in capability.features: key = self._entry_key( "feature", ability.name, capability.name, feature.name, feature.type, feature.location, ) index[key] = self._feature_payload( feature, ability_name=ability.name, capability_name=capability.name, ) for evidence in capability.evidence: key = self._entry_key( "evidence", ability.name, capability.name, evidence.type, evidence.reference, ) index[key] = self._evidence_payload( evidence, ability_name=ability.name, capability_name=capability.name, ) def _approved_index(self, abilities) -> dict[str, dict[str, object]]: index: dict[str, dict[str, object]] = {} for ability in abilities: ability_key = self._entry_key("ability", ability.name) index[ability_key] = { "item_type": "ability", "name": ability.name, "description": ability.description, "confidence": ability.confidence, } for capability in ability.capabilities: capability_key = self._entry_key( "capability", ability.name, capability.name, ) index[capability_key] = { "item_type": "capability", "ability_name": ability.name, "name": capability.name, "description": capability.description, "inputs": capability.inputs, "outputs": capability.outputs, "confidence": capability.confidence, } for feature in capability.features: key = self._entry_key( "feature", ability.name, capability.name, feature.name, feature.type, feature.location, ) index[key] = self._feature_payload( feature, ability_name=ability.name, capability_name=capability.name, ) for evidence in capability.evidence: key = self._entry_key( "evidence", ability.name, capability.name, evidence.type, evidence.reference, ) index[key] = self._evidence_payload( evidence, ability_name=ability.name, capability_name=capability.name, ) return index def _feature_payload( self, feature: CandidateFeature, *, ability_name: str, capability_name: str, ) -> dict[str, object]: return { "item_type": "feature", "ability_name": ability_name, "capability_name": capability_name, "name": feature.name, "type": feature.type, "location": feature.location, "confidence": feature.confidence, } def _evidence_payload( self, evidence: CandidateEvidence, *, ability_name: str, capability_name: str, ) -> dict[str, object]: return { "item_type": "evidence", "ability_name": ability_name, "capability_name": capability_name, "type": evidence.type, "reference": evidence.reference, "strength": evidence.strength, } def _entry_key(self, *parts: str) -> str: return ":".join(part.strip().lower() for part in parts) def _yaml_list(self, values: Sequence[str]) -> str: return "[" + ", ".join(self._yaml_scalar(value) for value in values) + "]" def _yaml_scalar(self, value: object) -> str: text = "" if value is None else str(value) escaped = text.replace("\\", "\\\\").replace('"', '\\"') return f'"{escaped}"' def search( self, query: str, *, status: str | None = None, language: str | None = None, framework: str | None = None, ability: str | None = None, capability: str | None = None, ) -> list[SearchResult]: return self.store.search( query, status=status, language=language, framework=framework, ability=ability, capability=capability, )