Files
repo-scoping/src/repo_registry/candidate_graph/generator.py

363 lines
12 KiB
Python

from __future__ import annotations
import re
from dataclasses import dataclass, field
from repo_registry.core.models import ContentChunk, ObservedFact, Repository, SourceReference
@dataclass(frozen=True)
class CandidateEvidenceDraft:
type: str
reference: str
strength: str
source_refs: list[SourceReference]
@dataclass(frozen=True)
class CandidateFeatureDraft:
name: str
type: str
location: str
confidence: float
source_refs: list[SourceReference]
@dataclass(frozen=True)
class CandidateCapabilityDraft:
name: str
description: str
inputs: list[str]
outputs: list[str]
confidence: float
source_refs: list[SourceReference]
features: list[CandidateFeatureDraft] = field(default_factory=list)
evidence: list[CandidateEvidenceDraft] = field(default_factory=list)
@dataclass(frozen=True)
class CandidateAbilityDraft:
name: str
description: str
confidence: float
source_refs: list[SourceReference]
capabilities: list[CandidateCapabilityDraft] = field(default_factory=list)
class CandidateGraphGenerator:
"""Build conservative review candidates from observed facts."""
def generate(
self,
repository: Repository,
facts: list[ObservedFact],
chunks: list[ContentChunk] | None = None,
) -> list[CandidateAbilityDraft]:
if not facts:
return []
chunks = chunks or []
docs = self._facts(facts, "documentation")
tests = self._facts(facts, "test")
examples = self._facts(facts, "example")
interfaces = self._facts(facts, "interface")
manifests = self._facts(facts, "manifest")
frameworks = self._facts(facts, "framework")
languages = self._facts(facts, "language")
ability_sources = docs or manifests or languages
ability = CandidateAbilityDraft(
name=f"Review {repository.name} Repository Usefulness",
description=self._ability_description(chunks),
confidence=self._ability_confidence(
docs=docs,
interfaces=interfaces,
tests=tests,
examples=examples,
frameworks=frameworks,
languages=languages,
),
source_refs=self._source_refs(ability_sources),
capabilities=[],
)
capabilities: list[CandidateCapabilityDraft] = []
if interfaces:
capabilities.append(
self._interface_capability(interfaces, tests, examples, docs, chunks)
)
if manifests or frameworks or languages:
capabilities.append(
CandidateCapabilityDraft(
name="Describe Repository Structure",
description=(
"Summarize detected languages, package manifests, and framework "
"hints as structural context for review."
),
inputs=[],
outputs=["repository structure summary"],
confidence=self._structure_confidence(
manifests=manifests,
frameworks=frameworks,
languages=languages,
docs=docs,
),
source_refs=self._source_refs(manifests + frameworks + languages),
evidence=self._evidence(tests, examples, docs),
)
)
return [
CandidateAbilityDraft(
name=ability.name,
description=ability.description,
confidence=ability.confidence,
source_refs=ability.source_refs,
capabilities=capabilities,
)
]
def _interface_capability(
self,
interfaces: list[ObservedFact],
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
chunks: list[ContentChunk],
) -> CandidateCapabilityDraft:
features = [
CandidateFeatureDraft(
name=self._feature_name(fact, chunks),
type=self._feature_type(fact),
location=fact.path,
confidence=0.65 if fact.value else 0.45,
source_refs=self._source_refs([fact]),
)
for fact in interfaces
]
return CandidateCapabilityDraft(
name="Expose Repository Interface",
description=self._interface_description(chunks),
inputs=[],
outputs=["callable interface"],
confidence=self._interface_confidence(
interfaces=interfaces,
tests=tests,
examples=examples,
docs=docs,
),
source_refs=self._source_refs(interfaces),
features=features,
evidence=self._evidence(tests, examples, docs),
)
def _evidence(
self,
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
) -> list[CandidateEvidenceDraft]:
evidence: list[CandidateEvidenceDraft] = []
for fact in tests:
evidence.append(
CandidateEvidenceDraft(
type="test",
reference=fact.path,
strength="strong",
source_refs=self._source_refs([fact]),
)
)
for fact in examples:
evidence.append(
CandidateEvidenceDraft(
type="example",
reference=fact.path,
strength="strong",
source_refs=self._source_refs([fact]),
)
)
for fact in docs:
evidence.append(
CandidateEvidenceDraft(
type="documentation",
reference=fact.path,
strength="medium",
source_refs=self._source_refs([fact]),
)
)
return evidence
def _feature_type(self, fact: ObservedFact) -> str:
lower = f"{fact.name} {fact.path} {fact.value}".lower()
if "cli" in lower or "command" in lower:
return "CLI"
if "api" in lower or "route" in lower or "@app." in lower or "@router." in lower:
return "API"
return "interface"
def _feature_name(self, fact: ObservedFact, chunks: list[ContentChunk]) -> str:
route_name = self._route_feature_name(fact.value)
if route_name:
return route_name
if self._feature_type(fact) == "CLI":
function_name = self._function_name_near_fact(fact, chunks)
if function_name:
return f"CLI command {function_name}"
return fact.value or fact.name
def _route_feature_name(self, value: str) -> str:
match = re.search(r"@(?:app|router)\.(get|post|put|patch|delete)\((['\"])(.*?)\2", value)
if match is None:
return ""
method = match.group(1).upper()
path = match.group(3)
return f"{method} {path}"
def _function_name_near_fact(
self,
fact: ObservedFact,
chunks: list[ContentChunk],
) -> str:
line = fact.metadata.get("line")
for chunk in chunks:
if chunk.path != fact.path or chunk.kind != "interface":
continue
if isinstance(line, int) and not (chunk.start_line <= line <= chunk.end_line):
continue
match = re.search(r"^\s*def\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\(", chunk.text, re.MULTILINE)
if match is not None:
return match.group(1)
return ""
def _ability_confidence(
self,
*,
docs: list[ObservedFact],
interfaces: list[ObservedFact],
tests: list[ObservedFact],
examples: list[ObservedFact],
frameworks: list[ObservedFact],
languages: list[ObservedFact],
) -> float:
return self._confidence(
0.25,
[
(0.20, bool(docs)),
(0.15, bool(interfaces)),
(0.15, bool(tests)),
(0.10, bool(examples)),
(0.10, bool(frameworks)),
(0.05, bool(languages)),
],
)
def _interface_confidence(
self,
*,
interfaces: list[ObservedFact],
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
) -> float:
return self._confidence(
0.30,
[
(0.20, bool(interfaces)),
(0.15, bool(tests)),
(0.10, bool(examples)),
(0.10, bool(docs)),
(0.05, len(interfaces) > 1),
],
)
def _structure_confidence(
self,
*,
manifests: list[ObservedFact],
frameworks: list[ObservedFact],
languages: list[ObservedFact],
docs: list[ObservedFact],
) -> float:
return self._confidence(
0.25,
[
(0.20, bool(manifests)),
(0.15, bool(frameworks)),
(0.10, bool(languages)),
(0.05, bool(docs)),
],
)
def _confidence(
self,
base: float,
factors: list[tuple[float, bool]],
) -> float:
score = base + sum(weight for weight, applies in factors if applies)
return min(1.0, round(score, 2))
def _ability_description(self, chunks: list[ContentChunk]) -> str:
doc_summary = self._document_summary(chunks)
if doc_summary:
return (
"Candidate usefulness summary seeded from repository content: "
f"{doc_summary} This is a review seed, not an asserted domain ability."
)
return (
"Candidate usefulness summary generated from observed repository "
"documentation, manifests, languages, and interfaces. This is a "
"review seed, not an asserted domain ability."
)
def _interface_description(self, chunks: list[ContentChunk]) -> str:
interface_summary = self._interface_summary(chunks)
if interface_summary:
return (
"Expose one or more likely user-facing API or CLI entry points. "
f"Source context: {interface_summary} Review is required to name "
"the concrete domain behavior."
)
return (
"Expose one or more likely user-facing API or CLI entry points. "
"Review is required to name the concrete domain behavior."
)
def _document_summary(self, chunks: list[ContentChunk]) -> str:
for chunk in chunks:
if chunk.kind != "documentation":
continue
lines = [line.strip() for line in chunk.text.splitlines() if line.strip()]
if not lines:
continue
heading = next((line.lstrip("#").strip() for line in lines if line.startswith("#")), "")
paragraph = next((line for line in lines if not line.startswith("#")), "")
if heading and paragraph:
return f"{heading}. {paragraph}"
return heading or paragraph
return ""
def _interface_summary(self, chunks: list[ContentChunk]) -> str:
for chunk in chunks:
if chunk.kind != "interface":
continue
lines = [line.strip() for line in chunk.text.splitlines() if line.strip()]
if not lines:
continue
return " ".join(lines[:3])
return ""
def _facts(self, facts: list[ObservedFact], kind: str) -> list[ObservedFact]:
return [fact for fact in facts if fact.kind == kind]
def _source_refs(self, facts: list[ObservedFact]) -> list[SourceReference]:
return [
SourceReference(
fact_id=fact.id,
path=fact.path,
kind=fact.kind,
name=fact.name,
line=fact.metadata.get("line"),
)
for fact in facts
]