Files
repo-scoping/src/repo_registry/core/service.py
2026-04-26 12:45:49 +02:00

1004 lines
34 KiB
Python

from __future__ import annotations
from collections.abc import Sequence
from dataclasses import asdict
from repo_registry.core.models import (
AbilitySummary,
AnalysisRun,
CapabilitySummary,
CandidateGraph,
ContentChunk,
ObservedFact,
Repository,
RepositoryAbilityMap,
ReviewDecision,
ScanSummary,
SearchResult,
)
from repo_registry.candidate_graph.generator import CandidateGraphGenerator
from repo_registry.content_indexing.extractor import ContentExtractor
from repo_registry.llm_extraction.extractor import LLMCandidateExtractor
from repo_registry.llm_extraction.mapper import LLMExtractionMapper
from repo_registry.repo_ingestion.git import GitIngestionService
from repo_registry.repo_ingestion.metadata import RepositoryMetadataExtractor
from repo_registry.repo_scanning.scanner import DeterministicScanner
from repo_registry.storage.sqlite import RegistryStore
class RegistryService:
"""Application service for the manual registry MVP."""
def __init__(
self,
store: RegistryStore,
ingestion: GitIngestionService | None = None,
llm_extractor: LLMCandidateExtractor | None = None,
) -> None:
self.store = store
self.scanner = DeterministicScanner()
self.ingestion = ingestion or GitIngestionService()
self.metadata_extractor = RepositoryMetadataExtractor()
self.candidate_generator = CandidateGraphGenerator()
self.content_extractor = ContentExtractor()
self.llm_extractor = llm_extractor
self.llm_mapper = LLMExtractionMapper()
def register_repository(
self,
*,
url: str,
name: str | None = None,
description: str | None = None,
branch: str = "main",
) -> Repository:
if name is None or description is None:
checkout = self.ingestion.resolve(url, branch=branch)
metadata = self.metadata_extractor.extract(checkout.source_path, url)
else:
metadata = None
return self.store.create_repository(
name=name or (metadata.name if metadata is not None else "repository"),
url=url,
description=description
or (metadata.description if metadata is not None else None),
branch=branch,
)
def list_repositories(self) -> list[Repository]:
return self.store.list_repositories()
def get_repository(self, repository_id: int) -> Repository:
return self.store.get_repository(repository_id)
def update_repository(
self,
repository_id: int,
*,
name: str | None = None,
description: str | None = None,
branch: str | None = None,
) -> Repository:
return self.store.update_repository(
repository_id,
name=name,
description=description,
branch=branch,
)
def delete_repository(self, repository_id: int) -> None:
self.store.delete_repository(repository_id)
def analyze_repository(
self,
repository_id: int,
*,
source_path: str | None = None,
) -> ScanSummary:
repository = self.store.get_repository(repository_id)
run = self.store.create_analysis_run(repository_id)
self.store.update_repository_status(repository_id, "analyzing")
try:
if source_path is None:
checkout = self.ingestion.resolve(repository.url, branch=repository.branch)
scan_source = checkout.source_path
else:
scan_source = source_path
scan_result = self.scanner.scan(scan_source)
except Exception as exc:
failed_run = self.store.fail_analysis_run(repository_id, run.id, str(exc))
return ScanSummary(analysis_run=failed_run, snapshot=None, facts=[])
completed_run = self.store.complete_analysis_run(
repository_id,
run.id,
scan_result,
)
snapshot = (
self.store.get_snapshot(completed_run.snapshot_id)
if completed_run.snapshot_id is not None
else None
)
facts = self.store.list_observed_facts(repository_id, completed_run.id)
chunks = self.content_extractor.extract(scan_result.source_path, facts)
self.store.replace_content_chunks(
repository_id,
completed_run.id,
completed_run.snapshot_id,
chunks,
)
stored_chunks = self.store.list_content_chunks(repository_id, completed_run.id)
try:
candidates, candidate_source = self._generate_candidates(
repository,
facts,
stored_chunks,
)
except Exception as exc:
self.store.create_review_decision(
repository_id,
completed_run.id,
action="llm_extraction_failed",
notes=str(exc),
)
candidates = self.candidate_generator.generate(
repository,
facts,
stored_chunks,
)
candidate_source = "deterministic"
self.store.replace_candidate_graph(repository_id, completed_run.id, candidates)
if candidate_source == "llm":
self.store.create_review_decision(
repository_id,
completed_run.id,
action="llm_extraction_used",
notes=f"Generated {len(candidates)} candidate ability draft(s).",
)
return ScanSummary(
analysis_run=completed_run,
snapshot=snapshot,
facts=facts,
)
def _generate_candidates(
self,
repository: Repository,
facts: list[ObservedFact],
chunks: list[ContentChunk],
):
if self.llm_extractor is not None:
extracted = self.llm_extractor.extract(repository, chunks)
if extracted:
return self.llm_mapper.map(extracted, facts, chunks), "llm"
return self.candidate_generator.generate(repository, facts, chunks), "deterministic"
def list_analysis_runs(self, repository_id: int) -> list[AnalysisRun]:
return self.store.list_analysis_runs(repository_id)
def get_analysis_run(self, repository_id: int, analysis_run_id: int) -> AnalysisRun:
return self.store.get_analysis_run(repository_id, analysis_run_id)
def list_abilities(self) -> list[AbilitySummary]:
return self.store.list_abilities()
def list_capabilities(self) -> list[CapabilitySummary]:
return self.store.list_capabilities()
def list_review_decisions(
self,
repository_id: int,
analysis_run_id: int | None = None,
) -> list[ReviewDecision]:
return self.store.list_review_decisions(repository_id, analysis_run_id)
def list_observed_facts(
self,
repository_id: int,
analysis_run_id: int | None = None,
) -> list[ObservedFact]:
return self.store.list_observed_facts(repository_id, analysis_run_id)
def list_content_chunks(
self,
repository_id: int,
analysis_run_id: int | None = None,
) -> list[ContentChunk]:
return self.store.list_content_chunks(repository_id, analysis_run_id)
def candidate_graph(self, repository_id: int, analysis_run_id: int) -> CandidateGraph:
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def approve_candidate_graph(
self,
repository_id: int,
analysis_run_id: int,
*,
notes: str = "",
) -> RepositoryAbilityMap:
graph = self.store.get_candidate_graph(repository_id, analysis_run_id)
pending_abilities = [
ability for ability in graph.abilities if ability.status == "candidate"
]
for ability in pending_abilities:
approved_ability_id = self.store.create_ability(
repository_id,
name=ability.name,
description=ability.description,
confidence=ability.confidence,
)
for capability in ability.capabilities:
if capability.status != "candidate":
continue
approved_capability_id = self.store.create_capability(
repository_id,
approved_ability_id,
name=capability.name,
description=capability.description,
inputs=capability.inputs,
outputs=capability.outputs,
confidence=capability.confidence,
)
for feature in capability.features:
if feature.status != "candidate":
continue
self.store.create_feature(
repository_id,
approved_capability_id,
name=feature.name,
type=feature.type,
location=feature.location,
confidence=feature.confidence,
source_refs=feature.source_refs,
)
for evidence in capability.evidence:
if evidence.status != "candidate":
continue
self.store.create_evidence(
repository_id,
approved_capability_id,
type=evidence.type,
reference=evidence.reference,
strength=evidence.strength,
source_refs=evidence.source_refs,
)
if pending_abilities:
self.store.mark_candidate_graph_status(
repository_id,
analysis_run_id,
"approved",
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="approve_candidate_graph",
notes=notes,
)
self.store.update_repository_status(repository_id, "indexed")
return self.store.get_ability_map(repository_id)
def reject_candidate_ability(
self,
repository_id: int,
analysis_run_id: int,
candidate_ability_id: int,
*,
notes: str = "",
) -> CandidateGraph:
self.store.reject_candidate_ability(
repository_id,
analysis_run_id,
candidate_ability_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="reject_candidate_ability",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def reject_candidate_capability(
self,
repository_id: int,
analysis_run_id: int,
candidate_capability_id: int,
*,
notes: str = "",
) -> CandidateGraph:
self.store.reject_candidate_capability(
repository_id,
analysis_run_id,
candidate_capability_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="reject_candidate_capability",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def reject_candidate_feature(
self,
repository_id: int,
analysis_run_id: int,
candidate_feature_id: int,
*,
notes: str = "",
) -> CandidateGraph:
self.store.reject_candidate_feature(
repository_id,
analysis_run_id,
candidate_feature_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="reject_candidate_feature",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def reject_candidate_evidence(
self,
repository_id: int,
analysis_run_id: int,
candidate_evidence_id: int,
*,
notes: str = "",
) -> CandidateGraph:
self.store.reject_candidate_evidence(
repository_id,
analysis_run_id,
candidate_evidence_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="reject_candidate_evidence",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def edit_candidate_ability(
self,
repository_id: int,
analysis_run_id: int,
candidate_ability_id: int,
*,
name: str,
description: str,
confidence: float,
notes: str = "",
) -> CandidateGraph:
self.store.update_candidate_ability(
repository_id,
analysis_run_id,
candidate_ability_id,
name=name,
description=description,
confidence=confidence,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="edit_candidate_ability",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def edit_candidate_capability(
self,
repository_id: int,
analysis_run_id: int,
candidate_capability_id: int,
*,
name: str,
description: str,
confidence: float,
notes: str = "",
) -> CandidateGraph:
self.store.update_candidate_capability(
repository_id,
analysis_run_id,
candidate_capability_id,
name=name,
description=description,
confidence=confidence,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="edit_candidate_capability",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def relink_candidate_capability(
self,
repository_id: int,
analysis_run_id: int,
candidate_capability_id: int,
*,
target_ability_id: int,
notes: str = "",
) -> CandidateGraph:
self.store.relink_candidate_capability(
repository_id,
analysis_run_id,
candidate_capability_id,
target_ability_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="relink_candidate_capability",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def relink_candidate_feature(
self,
repository_id: int,
analysis_run_id: int,
candidate_feature_id: int,
*,
target_capability_id: int,
notes: str = "",
) -> CandidateGraph:
self.store.relink_candidate_feature(
repository_id,
analysis_run_id,
candidate_feature_id,
target_capability_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="relink_candidate_feature",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def relink_candidate_evidence(
self,
repository_id: int,
analysis_run_id: int,
candidate_evidence_id: int,
*,
target_capability_id: int,
notes: str = "",
) -> CandidateGraph:
self.store.relink_candidate_evidence(
repository_id,
analysis_run_id,
candidate_evidence_id,
target_capability_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="relink_candidate_evidence",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def merge_candidate_ability(
self,
repository_id: int,
analysis_run_id: int,
source_ability_id: int,
*,
target_ability_id: int,
notes: str = "",
) -> CandidateGraph:
self.store.merge_candidate_ability(
repository_id,
analysis_run_id,
source_ability_id,
target_ability_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="merge_candidate_ability",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def merge_candidate_capability(
self,
repository_id: int,
analysis_run_id: int,
source_capability_id: int,
*,
target_capability_id: int,
notes: str = "",
) -> CandidateGraph:
self.store.merge_candidate_capability(
repository_id,
analysis_run_id,
source_capability_id,
target_capability_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="merge_candidate_capability",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def merge_candidate_feature(
self,
repository_id: int,
analysis_run_id: int,
source_feature_id: int,
*,
target_feature_id: int,
notes: str = "",
) -> CandidateGraph:
self.store.merge_candidate_feature(
repository_id,
analysis_run_id,
source_feature_id,
target_feature_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="merge_candidate_feature",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def merge_candidate_evidence(
self,
repository_id: int,
analysis_run_id: int,
source_evidence_id: int,
*,
target_evidence_id: int,
notes: str = "",
) -> CandidateGraph:
self.store.merge_candidate_evidence(
repository_id,
analysis_run_id,
source_evidence_id,
target_evidence_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="merge_candidate_evidence",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def add_ability(
self,
repository_id: int,
*,
name: str,
description: str = "",
confidence: float = 1.0,
) -> int:
self.store.get_repository(repository_id)
return self.store.create_ability(
repository_id,
name=name,
description=description,
confidence=confidence,
)
def update_ability(
self,
repository_id: int,
ability_id: int,
*,
name: str | None = None,
description: str | None = None,
confidence: float | None = None,
) -> RepositoryAbilityMap:
self.store.update_ability(
repository_id,
ability_id,
name=name,
description=description,
confidence=confidence,
)
return self.store.get_ability_map(repository_id)
def delete_ability(
self,
repository_id: int,
ability_id: int,
) -> RepositoryAbilityMap:
self.store.delete_ability(repository_id, ability_id)
return self.store.get_ability_map(repository_id)
def add_capability(
self,
repository_id: int,
ability_id: int,
*,
name: str,
description: str = "",
inputs: Sequence[str] = (),
outputs: Sequence[str] = (),
confidence: float = 1.0,
) -> int:
self.store.ensure_ability(repository_id, ability_id)
return self.store.create_capability(
repository_id,
ability_id,
name=name,
description=description,
inputs=list(inputs),
outputs=list(outputs),
confidence=confidence,
)
def update_capability(
self,
repository_id: int,
capability_id: int,
*,
name: str | None = None,
description: str | None = None,
inputs: Sequence[str] | None = None,
outputs: Sequence[str] | None = None,
confidence: float | None = None,
) -> RepositoryAbilityMap:
self.store.update_capability(
repository_id,
capability_id,
name=name,
description=description,
inputs=list(inputs) if inputs is not None else None,
outputs=list(outputs) if outputs is not None else None,
confidence=confidence,
)
return self.store.get_ability_map(repository_id)
def delete_capability(
self,
repository_id: int,
capability_id: int,
) -> RepositoryAbilityMap:
self.store.delete_capability(repository_id, capability_id)
return self.store.get_ability_map(repository_id)
def add_feature(
self,
repository_id: int,
capability_id: int,
*,
name: str,
type: str,
location: str = "",
confidence: float = 1.0,
) -> int:
self.store.ensure_capability(repository_id, capability_id)
return self.store.create_feature(
repository_id,
capability_id,
name=name,
type=type,
location=location,
confidence=confidence,
)
def update_feature(
self,
repository_id: int,
feature_id: int,
*,
name: str | None = None,
type: str | None = None,
location: str | None = None,
confidence: float | None = None,
) -> RepositoryAbilityMap:
self.store.update_feature(
repository_id,
feature_id,
name=name,
type=type,
location=location,
confidence=confidence,
)
return self.store.get_ability_map(repository_id)
def delete_feature(
self,
repository_id: int,
feature_id: int,
) -> RepositoryAbilityMap:
self.store.delete_feature(repository_id, feature_id)
return self.store.get_ability_map(repository_id)
def add_evidence(
self,
repository_id: int,
capability_id: int,
*,
type: str,
reference: str,
strength: str = "medium",
) -> int:
self.store.ensure_capability(repository_id, capability_id)
return self.store.create_evidence(
repository_id,
capability_id,
type=type,
reference=reference,
strength=strength,
)
def update_evidence(
self,
repository_id: int,
evidence_id: int,
*,
type: str | None = None,
reference: str | None = None,
strength: str | None = None,
) -> RepositoryAbilityMap:
self.store.update_evidence(
repository_id,
evidence_id,
type=type,
reference=reference,
strength=strength,
)
return self.store.get_ability_map(repository_id)
def delete_evidence(
self,
repository_id: int,
evidence_id: int,
) -> RepositoryAbilityMap:
self.store.delete_evidence(repository_id, evidence_id)
return self.store.get_ability_map(repository_id)
def ability_map(self, repository_id: int) -> RepositoryAbilityMap:
return self.store.get_ability_map(repository_id)
def compare_repositories(self, repository_ids: Sequence[int]) -> dict[str, object]:
maps = [self.store.get_ability_map(repository_id) for repository_id in repository_ids]
ability_groups: dict[str, list[dict[str, object]]] = {}
capability_groups: dict[str, list[dict[str, object]]] = {}
for ability_map in maps:
repository = ability_map.repository
for ability in ability_map.abilities:
ability_groups.setdefault(ability.name.lower(), []).append(
{
"repository_id": repository.id,
"repository_name": repository.name,
"confidence": ability.confidence,
"confidence_label": ability.confidence_label,
"capabilities": [
{
"name": capability.name,
"confidence": capability.confidence,
"confidence_label": capability.confidence_label,
"evidence_count": len(capability.evidence),
}
for capability in ability.capabilities
],
"_name": ability.name,
}
)
for capability in ability.capabilities:
capability_groups.setdefault(capability.name.lower(), []).append(
{
"repository_id": repository.id,
"repository_name": repository.name,
"ability_name": ability.name,
"capability_name": capability.name,
}
)
abilities = [
{
"name": repositories[0]["_name"],
"repositories": [
{
key: value
for key, value in repository.items()
if key != "_name"
}
for repository in repositories
],
}
for repositories in ability_groups.values()
]
unique_capabilities = [
entries[0]
for entries in capability_groups.values()
if len({entry["repository_id"] for entry in entries}) == 1
]
return {
"repositories": [asdict(ability_map.repository) for ability_map in maps],
"abilities": sorted(abilities, key=lambda item: item["name"]),
"unique_capabilities": sorted(
unique_capabilities,
key=lambda item: (item["repository_name"], item["capability_name"]),
),
}
def detect_capability_gaps(
self,
*,
desired_ability: str,
desired_capabilities: Sequence[str],
repository_ids: Sequence[int] | None = None,
) -> dict[str, object]:
repositories = (
[self.store.get_repository(repository_id) for repository_id in repository_ids]
if repository_ids is not None
else self.store.list_repositories()
)
maps = [self.store.get_ability_map(repository.id) for repository in repositories]
desired = [capability.strip() for capability in desired_capabilities if capability.strip()]
capability_matches: dict[str, list[dict[str, object]]] = {name.lower(): [] for name in desired}
duplicate_index: dict[str, set[str]] = {}
weak: list[dict[str, object]] = []
for ability_map in maps:
repository = ability_map.repository
for ability in ability_map.abilities:
for capability in ability.capabilities:
key = capability.name.lower()
duplicate_index.setdefault(key, set()).add(repository.name)
if key in capability_matches:
capability_matches[key].append(
{
"repository_id": repository.id,
"repository_name": repository.name,
"capability": capability,
}
)
strengths = {evidence.strength for evidence in capability.evidence}
if "strong" not in strengths:
weak.append(
{
"capability": capability.name,
"repository_id": repository.id,
"repository_name": repository.name,
"evidence_count": len(capability.evidence),
"strongest_evidence": self._strongest_evidence(strengths),
"confidence": capability.confidence,
"confidence_label": capability.confidence_label,
}
)
matched = [
{
"capability": name,
"repositories": [
match["repository_name"]
for match in capability_matches[name.lower()]
],
}
for name in desired
if capability_matches[name.lower()]
]
missing = [name for name in desired if not capability_matches[name.lower()]]
duplicates = [
{
"capability": capability,
"repositories": sorted(repositories),
}
for capability, repositories in duplicate_index.items()
if len(repositories) > 1 and capability in capability_matches
]
return {
"desired_ability": desired_ability,
"matched_capabilities": matched,
"missing_capabilities": missing,
"weakly_evidenced_capabilities": weak,
"duplicate_capabilities": duplicates,
}
def export_registry_entry(self, repository_id: int) -> str:
ability_map = self.store.get_ability_map(repository_id)
lines = [
"repository:",
f" name: {self._yaml_scalar(ability_map.repository.name)}",
f" url: {self._yaml_scalar(ability_map.repository.url)}",
f" branch: {self._yaml_scalar(ability_map.repository.branch)}",
f" status: {self._yaml_scalar(ability_map.repository.status)}",
"abilities:",
]
for ability in ability_map.abilities:
lines.extend(
[
f" - name: {self._yaml_scalar(ability.name)}",
f" description: {self._yaml_scalar(ability.description)}",
f" confidence: {ability.confidence}",
f" confidence_label: {self._yaml_scalar(ability.confidence_label)}",
" capabilities:",
]
)
for capability in ability.capabilities:
lines.extend(
[
f" - name: {self._yaml_scalar(capability.name)}",
f" description: {self._yaml_scalar(capability.description)}",
f" confidence: {capability.confidence}",
f" confidence_label: {self._yaml_scalar(capability.confidence_label)}",
f" inputs: {self._yaml_list(capability.inputs)}",
f" outputs: {self._yaml_list(capability.outputs)}",
" features:",
]
)
for feature in capability.features:
lines.extend(
[
f" - name: {self._yaml_scalar(feature.name)}",
f" type: {self._yaml_scalar(feature.type)}",
f" location: {self._yaml_scalar(feature.location)}",
f" confidence: {feature.confidence}",
f" confidence_label: {self._yaml_scalar(feature.confidence_label)}",
]
)
lines.append(" evidence:")
for evidence in capability.evidence:
lines.extend(
[
f" - type: {self._yaml_scalar(evidence.type)}",
f" reference: {self._yaml_scalar(evidence.reference)}",
f" strength: {self._yaml_scalar(evidence.strength)}",
]
)
return "\n".join(lines) + "\n"
def _strongest_evidence(self, strengths: set[str]) -> str | None:
for strength in ("strong", "medium", "weak"):
if strength in strengths:
return strength
return None
def _yaml_list(self, values: Sequence[str]) -> str:
return "[" + ", ".join(self._yaml_scalar(value) for value in values) + "]"
def _yaml_scalar(self, value: object) -> str:
text = "" if value is None else str(value)
escaped = text.replace("\\", "\\\\").replace('"', '\\"')
return f'"{escaped}"'
def search(
self,
query: str,
*,
status: str | None = None,
language: str | None = None,
framework: str | None = None,
ability: str | None = None,
capability: str | None = None,
) -> list[SearchResult]:
return self.store.search(
query,
status=status,
language=language,
framework=framework,
ability=ability,
capability=capability,
)