Milestone 3: candidate graph generation

This commit is contained in:
2026-04-25 22:42:13 +02:00
parent ef41a9974a
commit 519b7726e7
11 changed files with 634 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Candidate ability graph generation."""

View File

@@ -0,0 +1,194 @@
from __future__ import annotations
from dataclasses import dataclass, field
from repo_registry.core.models import ObservedFact, Repository, SourceReference
@dataclass(frozen=True)
class CandidateEvidenceDraft:
type: str
reference: str
strength: str
source_refs: list[SourceReference]
@dataclass(frozen=True)
class CandidateFeatureDraft:
name: str
type: str
location: str
confidence: float
source_refs: list[SourceReference]
@dataclass(frozen=True)
class CandidateCapabilityDraft:
name: str
description: str
inputs: list[str]
outputs: list[str]
confidence: float
source_refs: list[SourceReference]
features: list[CandidateFeatureDraft] = field(default_factory=list)
evidence: list[CandidateEvidenceDraft] = field(default_factory=list)
@dataclass(frozen=True)
class CandidateAbilityDraft:
name: str
description: str
confidence: float
source_refs: list[SourceReference]
capabilities: list[CandidateCapabilityDraft] = field(default_factory=list)
class CandidateGraphGenerator:
"""Build conservative review candidates from observed facts."""
def generate(
self,
repository: Repository,
facts: list[ObservedFact],
) -> list[CandidateAbilityDraft]:
if not facts:
return []
docs = self._facts(facts, "documentation")
tests = self._facts(facts, "test")
examples = self._facts(facts, "example")
interfaces = self._facts(facts, "interface")
manifests = self._facts(facts, "manifest")
frameworks = self._facts(facts, "framework")
languages = self._facts(facts, "language")
ability_sources = docs or manifests or languages
ability = CandidateAbilityDraft(
name=f"Review {repository.name} Repository Usefulness",
description=(
"Candidate usefulness summary generated from observed repository "
"documentation, manifests, languages, and interfaces. This is a "
"review seed, not an asserted domain ability."
),
confidence=0.55 if docs else 0.35,
source_refs=self._source_refs(ability_sources),
capabilities=[],
)
capabilities: list[CandidateCapabilityDraft] = []
if interfaces:
capabilities.append(self._interface_capability(interfaces, tests, examples, docs))
if manifests or frameworks or languages:
capabilities.append(
CandidateCapabilityDraft(
name="Describe Repository Structure",
description=(
"Summarize detected languages, package manifests, and framework "
"hints as structural context for review."
),
inputs=[],
outputs=["repository structure summary"],
confidence=0.6,
source_refs=self._source_refs(manifests + frameworks + languages),
evidence=self._evidence(tests, examples, docs),
)
)
return [
CandidateAbilityDraft(
name=ability.name,
description=ability.description,
confidence=ability.confidence,
source_refs=ability.source_refs,
capabilities=capabilities,
)
]
def _interface_capability(
self,
interfaces: list[ObservedFact],
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
) -> CandidateCapabilityDraft:
features = [
CandidateFeatureDraft(
name=fact.value or fact.name,
type=self._feature_type(fact),
location=fact.path,
confidence=0.65 if fact.value else 0.45,
source_refs=self._source_refs([fact]),
)
for fact in interfaces
]
return CandidateCapabilityDraft(
name="Expose Repository Interface",
description=(
"Expose one or more likely user-facing API or CLI entry points. "
"Review is required to name the concrete domain behavior."
),
inputs=[],
outputs=["callable interface"],
confidence=0.65,
source_refs=self._source_refs(interfaces),
features=features,
evidence=self._evidence(tests, examples, docs),
)
def _evidence(
self,
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
) -> list[CandidateEvidenceDraft]:
evidence: list[CandidateEvidenceDraft] = []
for fact in tests:
evidence.append(
CandidateEvidenceDraft(
type="test",
reference=fact.path,
strength="strong",
source_refs=self._source_refs([fact]),
)
)
for fact in examples:
evidence.append(
CandidateEvidenceDraft(
type="example",
reference=fact.path,
strength="strong",
source_refs=self._source_refs([fact]),
)
)
for fact in docs:
evidence.append(
CandidateEvidenceDraft(
type="documentation",
reference=fact.path,
strength="medium",
source_refs=self._source_refs([fact]),
)
)
return evidence
def _feature_type(self, fact: ObservedFact) -> str:
lower = f"{fact.name} {fact.path} {fact.value}".lower()
if "cli" in lower or "command" in lower:
return "CLI"
if "api" in lower or "route" in lower or "@app." in lower or "@router." in lower:
return "API"
return "interface"
def _facts(self, facts: list[ObservedFact], kind: str) -> list[ObservedFact]:
return [fact for fact in facts if fact.kind == kind]
def _source_refs(self, facts: list[ObservedFact]) -> list[SourceReference]:
return [
SourceReference(
fact_id=fact.id,
path=fact.path,
kind=fact.kind,
name=fact.name,
)
for fact in facts
]

View File

@@ -56,6 +56,67 @@ class ScanSummary:
facts: list[ObservedFact]
@dataclass(frozen=True)
class SourceReference:
fact_id: int | None
path: str
kind: str
name: str
@dataclass(frozen=True)
class CandidateEvidence:
id: int
type: str
reference: str
strength: str
status: str
source_refs: list[SourceReference]
@dataclass(frozen=True)
class CandidateFeature:
id: int
name: str
type: str
location: str
confidence: float
status: str
source_refs: list[SourceReference]
@dataclass(frozen=True)
class CandidateCapability:
id: int
name: str
description: str
inputs: list[str]
outputs: list[str]
confidence: float
status: str
source_refs: list[SourceReference]
features: list[CandidateFeature] = field(default_factory=list)
evidence: list[CandidateEvidence] = field(default_factory=list)
@dataclass(frozen=True)
class CandidateAbility:
id: int
name: str
description: str
confidence: float
status: str
source_refs: list[SourceReference]
capabilities: list[CandidateCapability] = field(default_factory=list)
@dataclass(frozen=True)
class CandidateGraph:
repository: Repository
analysis_run: AnalysisRun
abilities: list[CandidateAbility]
@dataclass(frozen=True)
class Evidence:
id: int

View File

@@ -4,12 +4,14 @@ from collections.abc import Sequence
from repo_registry.core.models import (
AnalysisRun,
CandidateGraph,
ObservedFact,
Repository,
RepositoryAbilityMap,
ScanSummary,
SearchResult,
)
from repo_registry.candidate_graph.generator import CandidateGraphGenerator
from repo_registry.repo_ingestion.git import GitIngestionService
from repo_registry.repo_scanning.scanner import DeterministicScanner
from repo_registry.storage.sqlite import RegistryStore
@@ -26,6 +28,7 @@ class RegistryService:
self.store = store
self.scanner = DeterministicScanner()
self.ingestion = ingestion or GitIngestionService()
self.candidate_generator = CandidateGraphGenerator()
def register_repository(
self,
@@ -79,6 +82,8 @@ class RegistryService:
else None
)
facts = self.store.list_observed_facts(repository_id, completed_run.id)
candidates = self.candidate_generator.generate(repository, facts)
self.store.replace_candidate_graph(repository_id, completed_run.id, candidates)
return ScanSummary(
analysis_run=completed_run,
snapshot=snapshot,
@@ -95,6 +100,9 @@ class RegistryService:
) -> list[ObservedFact]:
return self.store.list_observed_facts(repository_id, analysis_run_id)
def candidate_graph(self, repository_id: int, analysis_run_id: int) -> CandidateGraph:
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def add_ability(
self,
repository_id: int,

View File

@@ -7,6 +7,11 @@ from pathlib import Path
from repo_registry.core.models import (
Ability,
AnalysisRun,
CandidateAbility,
CandidateCapability,
CandidateEvidence,
CandidateFeature,
CandidateGraph,
Capability,
Evidence,
Feature,
@@ -15,7 +20,9 @@ from repo_registry.core.models import (
RepositoryAbilityMap,
RepositorySnapshot,
SearchResult,
SourceReference,
)
from repo_registry.candidate_graph.generator import CandidateAbilityDraft
from repo_registry.repo_scanning.scanner import FactCandidate, ScanResult
@@ -158,6 +165,202 @@ class RegistryStore:
)
return self.get_analysis_run(repository_id, analysis_run_id)
def replace_candidate_graph(
self,
repository_id: int,
analysis_run_id: int,
abilities: list[CandidateAbilityDraft],
) -> None:
with self.connect() as connection:
connection.execute(
"DELETE FROM candidate_abilities WHERE analysis_run_id = ?",
(analysis_run_id,),
)
for ability in abilities:
ability_cursor = connection.execute(
"""
INSERT INTO candidate_abilities
(repository_id, analysis_run_id, name, description, confidence, source_refs)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
repository_id,
analysis_run_id,
ability.name,
ability.description,
ability.confidence,
self._source_refs_to_json(ability.source_refs),
),
)
ability_id = int(ability_cursor.lastrowid)
for capability in ability.capabilities:
capability_cursor = connection.execute(
"""
INSERT INTO candidate_capabilities
(repository_id, analysis_run_id, ability_id, name, description,
inputs, outputs, confidence, source_refs)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
analysis_run_id,
ability_id,
capability.name,
capability.description,
json.dumps(capability.inputs),
json.dumps(capability.outputs),
capability.confidence,
self._source_refs_to_json(capability.source_refs),
),
)
capability_id = int(capability_cursor.lastrowid)
for feature in capability.features:
connection.execute(
"""
INSERT INTO candidate_features
(repository_id, analysis_run_id, capability_id, name, type,
location, confidence, source_refs)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
analysis_run_id,
capability_id,
feature.name,
feature.type,
feature.location,
feature.confidence,
self._source_refs_to_json(feature.source_refs),
),
)
for evidence in capability.evidence:
connection.execute(
"""
INSERT INTO candidate_evidence
(repository_id, analysis_run_id, capability_id, type,
reference, strength, source_refs)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
analysis_run_id,
capability_id,
evidence.type,
evidence.reference,
evidence.strength,
self._source_refs_to_json(evidence.source_refs),
),
)
def get_candidate_graph(
self,
repository_id: int,
analysis_run_id: int,
) -> CandidateGraph:
repository = self.get_repository(repository_id)
analysis_run = self.get_analysis_run(repository_id, analysis_run_id)
with self.connect() as connection:
ability_rows = connection.execute(
"""
SELECT id, name, description, confidence, status, source_refs
FROM candidate_abilities
WHERE repository_id = ? AND analysis_run_id = ?
ORDER BY id
""",
(repository_id, analysis_run_id),
).fetchall()
capability_rows = connection.execute(
"""
SELECT id, ability_id, name, description, inputs, outputs,
confidence, status, source_refs
FROM candidate_capabilities
WHERE repository_id = ? AND analysis_run_id = ?
ORDER BY id
""",
(repository_id, analysis_run_id),
).fetchall()
feature_rows = connection.execute(
"""
SELECT id, capability_id, name, type, location, confidence,
status, source_refs
FROM candidate_features
WHERE repository_id = ? AND analysis_run_id = ?
ORDER BY id
""",
(repository_id, analysis_run_id),
).fetchall()
evidence_rows = connection.execute(
"""
SELECT id, capability_id, type, reference, strength, status, source_refs
FROM candidate_evidence
WHERE repository_id = ? AND analysis_run_id = ?
ORDER BY id
""",
(repository_id, analysis_run_id),
).fetchall()
features_by_capability: dict[int, list[CandidateFeature]] = {}
for row in feature_rows:
features_by_capability.setdefault(row["capability_id"], []).append(
CandidateFeature(
id=row["id"],
name=row["name"],
type=row["type"],
location=row["location"],
confidence=row["confidence"],
status=row["status"],
source_refs=self._source_refs_from_json(row["source_refs"]),
)
)
evidence_by_capability: dict[int, list[CandidateEvidence]] = {}
for row in evidence_rows:
evidence_by_capability.setdefault(row["capability_id"], []).append(
CandidateEvidence(
id=row["id"],
type=row["type"],
reference=row["reference"],
strength=row["strength"],
status=row["status"],
source_refs=self._source_refs_from_json(row["source_refs"]),
)
)
capabilities_by_ability: dict[int, list[CandidateCapability]] = {}
for row in capability_rows:
capabilities_by_ability.setdefault(row["ability_id"], []).append(
CandidateCapability(
id=row["id"],
name=row["name"],
description=row["description"],
inputs=json.loads(row["inputs"]),
outputs=json.loads(row["outputs"]),
confidence=row["confidence"],
status=row["status"],
source_refs=self._source_refs_from_json(row["source_refs"]),
features=features_by_capability.get(row["id"], []),
evidence=evidence_by_capability.get(row["id"], []),
)
)
abilities = [
CandidateAbility(
id=row["id"],
name=row["name"],
description=row["description"],
confidence=row["confidence"],
status=row["status"],
source_refs=self._source_refs_from_json(row["source_refs"]),
capabilities=capabilities_by_ability.get(row["id"], []),
)
for row in ability_rows
]
return CandidateGraph(
repository=repository,
analysis_run=analysis_run,
abilities=abilities,
)
def fail_analysis_run(
self,
repository_id: int,
@@ -538,6 +741,30 @@ class RegistryStore:
],
)
def _source_refs_to_json(self, source_refs: list[SourceReference]) -> str:
return json.dumps(
[
{
"fact_id": source_ref.fact_id,
"path": source_ref.path,
"kind": source_ref.kind,
"name": source_ref.name,
}
for source_ref in source_refs
]
)
def _source_refs_from_json(self, value: str) -> list[SourceReference]:
return [
SourceReference(
fact_id=item.get("fact_id"),
path=item.get("path", ""),
kind=item.get("kind", ""),
name=item.get("name", ""),
)
for item in json.loads(value)
]
@staticmethod
def _repository_from_row(row: sqlite3.Row) -> Repository:
return Repository(

View File

@@ -149,6 +149,18 @@ def list_observed_facts(
raise HTTPException(status_code=404, detail=str(exc)) from exc
@app.get("/repos/{repository_id}/analysis-runs/{analysis_run_id}/candidate-graph")
def get_candidate_graph(
repository_id: int,
analysis_run_id: int,
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(service.candidate_graph(repository_id, analysis_run_id))
except NotFoundError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
@app.post("/repos/{repository_id}/abilities", status_code=201)
def create_ability(
repository_id: int,