generated from coulomb/repo-seed
256 lines
8.5 KiB
Python
256 lines
8.5 KiB
Python
from __future__ import annotations
|
|
|
|
from collections.abc import Sequence
|
|
|
|
from repo_registry.core.models import (
|
|
AnalysisRun,
|
|
CandidateGraph,
|
|
ObservedFact,
|
|
Repository,
|
|
RepositoryAbilityMap,
|
|
ScanSummary,
|
|
SearchResult,
|
|
)
|
|
from repo_registry.candidate_graph.generator import CandidateGraphGenerator
|
|
from repo_registry.repo_ingestion.git import GitIngestionService
|
|
from repo_registry.repo_ingestion.metadata import RepositoryMetadataExtractor
|
|
from repo_registry.repo_scanning.scanner import DeterministicScanner
|
|
from repo_registry.storage.sqlite import RegistryStore
|
|
|
|
|
|
class RegistryService:
|
|
"""Application service for the manual registry MVP."""
|
|
|
|
def __init__(
|
|
self,
|
|
store: RegistryStore,
|
|
ingestion: GitIngestionService | None = None,
|
|
) -> None:
|
|
self.store = store
|
|
self.scanner = DeterministicScanner()
|
|
self.ingestion = ingestion or GitIngestionService()
|
|
self.metadata_extractor = RepositoryMetadataExtractor()
|
|
self.candidate_generator = CandidateGraphGenerator()
|
|
|
|
def register_repository(
|
|
self,
|
|
*,
|
|
url: str,
|
|
name: str | None = None,
|
|
description: str | None = None,
|
|
branch: str = "main",
|
|
) -> Repository:
|
|
if name is None or description is None:
|
|
checkout = self.ingestion.resolve(url, branch=branch)
|
|
metadata = self.metadata_extractor.extract(checkout.source_path, url)
|
|
else:
|
|
metadata = None
|
|
return self.store.create_repository(
|
|
name=name or (metadata.name if metadata is not None else "repository"),
|
|
url=url,
|
|
description=description
|
|
or (metadata.description if metadata is not None else None),
|
|
branch=branch,
|
|
)
|
|
|
|
def list_repositories(self) -> list[Repository]:
|
|
return self.store.list_repositories()
|
|
|
|
def get_repository(self, repository_id: int) -> Repository:
|
|
return self.store.get_repository(repository_id)
|
|
|
|
def analyze_repository(
|
|
self,
|
|
repository_id: int,
|
|
*,
|
|
source_path: str | None = None,
|
|
) -> ScanSummary:
|
|
repository = self.store.get_repository(repository_id)
|
|
run = self.store.create_analysis_run(repository_id)
|
|
self.store.update_repository_status(repository_id, "analyzing")
|
|
try:
|
|
if source_path is None:
|
|
checkout = self.ingestion.resolve(repository.url, branch=repository.branch)
|
|
scan_source = checkout.source_path
|
|
else:
|
|
scan_source = source_path
|
|
scan_result = self.scanner.scan(scan_source)
|
|
except Exception as exc:
|
|
failed_run = self.store.fail_analysis_run(repository_id, run.id, str(exc))
|
|
return ScanSummary(analysis_run=failed_run, snapshot=None, facts=[])
|
|
|
|
completed_run = self.store.complete_analysis_run(
|
|
repository_id,
|
|
run.id,
|
|
scan_result,
|
|
)
|
|
snapshot = (
|
|
self.store.get_snapshot(completed_run.snapshot_id)
|
|
if completed_run.snapshot_id is not None
|
|
else None
|
|
)
|
|
facts = self.store.list_observed_facts(repository_id, completed_run.id)
|
|
candidates = self.candidate_generator.generate(repository, facts)
|
|
self.store.replace_candidate_graph(repository_id, completed_run.id, candidates)
|
|
return ScanSummary(
|
|
analysis_run=completed_run,
|
|
snapshot=snapshot,
|
|
facts=facts,
|
|
)
|
|
|
|
def list_analysis_runs(self, repository_id: int) -> list[AnalysisRun]:
|
|
return self.store.list_analysis_runs(repository_id)
|
|
|
|
def list_observed_facts(
|
|
self,
|
|
repository_id: int,
|
|
analysis_run_id: int | None = None,
|
|
) -> list[ObservedFact]:
|
|
return self.store.list_observed_facts(repository_id, analysis_run_id)
|
|
|
|
def candidate_graph(self, repository_id: int, analysis_run_id: int) -> CandidateGraph:
|
|
return self.store.get_candidate_graph(repository_id, analysis_run_id)
|
|
|
|
def approve_candidate_graph(
|
|
self,
|
|
repository_id: int,
|
|
analysis_run_id: int,
|
|
*,
|
|
notes: str = "",
|
|
) -> RepositoryAbilityMap:
|
|
graph = self.store.get_candidate_graph(repository_id, analysis_run_id)
|
|
pending_abilities = [
|
|
ability for ability in graph.abilities if ability.status == "candidate"
|
|
]
|
|
for ability in pending_abilities:
|
|
approved_ability_id = self.store.create_ability(
|
|
repository_id,
|
|
name=ability.name,
|
|
description=ability.description,
|
|
confidence=ability.confidence,
|
|
)
|
|
for capability in ability.capabilities:
|
|
approved_capability_id = self.store.create_capability(
|
|
repository_id,
|
|
approved_ability_id,
|
|
name=capability.name,
|
|
description=capability.description,
|
|
inputs=capability.inputs,
|
|
outputs=capability.outputs,
|
|
confidence=capability.confidence,
|
|
)
|
|
for feature in capability.features:
|
|
self.store.create_feature(
|
|
repository_id,
|
|
approved_capability_id,
|
|
name=feature.name,
|
|
type=feature.type,
|
|
location=feature.location,
|
|
confidence=feature.confidence,
|
|
)
|
|
for evidence in capability.evidence:
|
|
self.store.create_evidence(
|
|
repository_id,
|
|
approved_capability_id,
|
|
type=evidence.type,
|
|
reference=evidence.reference,
|
|
strength=evidence.strength,
|
|
)
|
|
|
|
if pending_abilities:
|
|
self.store.mark_candidate_graph_status(
|
|
repository_id,
|
|
analysis_run_id,
|
|
"approved",
|
|
)
|
|
self.store.create_review_decision(
|
|
repository_id,
|
|
analysis_run_id,
|
|
action="approve_candidate_graph",
|
|
notes=notes,
|
|
)
|
|
self.store.update_repository_status(repository_id, "indexed")
|
|
return self.store.get_ability_map(repository_id)
|
|
|
|
def add_ability(
|
|
self,
|
|
repository_id: int,
|
|
*,
|
|
name: str,
|
|
description: str = "",
|
|
confidence: float = 1.0,
|
|
) -> int:
|
|
self.store.get_repository(repository_id)
|
|
return self.store.create_ability(
|
|
repository_id,
|
|
name=name,
|
|
description=description,
|
|
confidence=confidence,
|
|
)
|
|
|
|
def add_capability(
|
|
self,
|
|
repository_id: int,
|
|
ability_id: int,
|
|
*,
|
|
name: str,
|
|
description: str = "",
|
|
inputs: Sequence[str] = (),
|
|
outputs: Sequence[str] = (),
|
|
confidence: float = 1.0,
|
|
) -> int:
|
|
self.store.ensure_ability(repository_id, ability_id)
|
|
return self.store.create_capability(
|
|
repository_id,
|
|
ability_id,
|
|
name=name,
|
|
description=description,
|
|
inputs=list(inputs),
|
|
outputs=list(outputs),
|
|
confidence=confidence,
|
|
)
|
|
|
|
def add_feature(
|
|
self,
|
|
repository_id: int,
|
|
capability_id: int,
|
|
*,
|
|
name: str,
|
|
type: str,
|
|
location: str = "",
|
|
confidence: float = 1.0,
|
|
) -> int:
|
|
self.store.ensure_capability(repository_id, capability_id)
|
|
return self.store.create_feature(
|
|
repository_id,
|
|
capability_id,
|
|
name=name,
|
|
type=type,
|
|
location=location,
|
|
confidence=confidence,
|
|
)
|
|
|
|
def add_evidence(
|
|
self,
|
|
repository_id: int,
|
|
capability_id: int,
|
|
*,
|
|
type: str,
|
|
reference: str,
|
|
strength: str = "medium",
|
|
) -> int:
|
|
self.store.ensure_capability(repository_id, capability_id)
|
|
return self.store.create_evidence(
|
|
repository_id,
|
|
capability_id,
|
|
type=type,
|
|
reference=reference,
|
|
strength=strength,
|
|
)
|
|
|
|
def ability_map(self, repository_id: int) -> RepositoryAbilityMap:
|
|
return self.store.get_ability_map(repository_id)
|
|
|
|
def search(self, query: str) -> list[SearchResult]:
|
|
return self.store.search(query)
|