Repo stats and features as aggregates

This commit is contained in:
2026-04-28 03:01:10 +02:00
parent c0a044fa0b
commit 2313e8675e
10 changed files with 258 additions and 24 deletions

View File

@@ -125,16 +125,7 @@ class CandidateGraphGenerator:
docs: list[ObservedFact],
chunks: list[ContentChunk],
) -> CandidateCapabilityDraft:
features = [
CandidateFeatureDraft(
name=self._feature_name(fact, chunks),
type=self._feature_type(fact),
location=fact.path,
confidence=0.65 if fact.value else 0.45,
source_refs=self._source_refs([fact]),
)
for fact in interfaces
]
features = self._interface_features(interfaces, chunks)
return CandidateCapabilityDraft(
name="Expose Repository Interface",
description=self._interface_description(chunks),
@@ -151,6 +142,83 @@ class CandidateGraphGenerator:
evidence=self._evidence(tests, examples, docs),
)
def _interface_features(
self,
interfaces: list[ObservedFact],
chunks: list[ContentChunk],
) -> list[CandidateFeatureDraft]:
by_type: dict[str, list[ObservedFact]] = {}
for fact in interfaces:
by_type.setdefault(self._feature_type(fact), []).append(fact)
features: list[CandidateFeatureDraft] = []
for feature_type, facts in sorted(by_type.items()):
if len(facts) == 1:
fact = facts[0]
features.append(
CandidateFeatureDraft(
name=self._feature_name(fact, chunks),
type=feature_type,
location=fact.path,
confidence=0.65 if fact.value else 0.45,
source_refs=self._source_refs([fact]),
)
)
continue
features.append(
CandidateFeatureDraft(
name=self._grouped_interface_feature_name(
feature_type,
facts,
chunks,
),
type=feature_type,
location=self._grouped_location(facts),
confidence=self._grouped_interface_confidence(facts),
source_refs=self._source_refs(facts),
)
)
return features
def _grouped_interface_feature_name(
self,
feature_type: str,
facts: list[ObservedFact],
chunks: list[ContentChunk],
) -> str:
summary = self._grouped_interface_summary(facts, chunks)
if feature_type == "API":
return f"HTTP API surface: {summary}"
if feature_type == "CLI":
return f"CLI command surface: {summary}"
return f"Callable interface surface: {summary}"
def _grouped_interface_summary(
self,
facts: list[ObservedFact],
chunks: list[ContentChunk],
) -> str:
names = [self._feature_name(fact, chunks) for fact in facts]
compact_names = [name for name in names if name]
if not compact_names:
return f"{len(facts)} entry points"
visible = compact_names[:3]
suffix = f", +{len(compact_names) - 3} more" if len(compact_names) > 3 else ""
return f"{', '.join(visible)}{suffix}"
def _grouped_location(self, facts: list[ObservedFact]) -> str:
paths = sorted({fact.path for fact in facts if fact.path})
if not paths:
return ""
if len(paths) == 1:
return paths[0]
return "multiple files"
def _grouped_interface_confidence(self, facts: list[ObservedFact]) -> float:
valued = sum(1 for fact in facts if fact.value)
return 0.7 if valued == len(facts) else 0.55
def _evidence(
self,
tests: list[ObservedFact],

View File

@@ -120,6 +120,7 @@ class RegistryService:
repository_id: int,
*,
source_path: str | None = None,
use_cached_checkout: bool = False,
access_username: str | None = None,
access_password: str | None = None,
) -> ScanSummary:
@@ -134,12 +135,20 @@ class RegistryService:
)
try:
if source_path is None:
checkout = self.ingestion.resolve(
repository.url,
branch=repository.branch,
access_username=access_username,
access_password=access_password,
)
if use_cached_checkout:
checkout = self.ingestion.cached_checkout(repository.url)
if checkout is None:
raise RuntimeError(
"cached checkout was requested, but no checkout exists "
"for this repository"
)
else:
checkout = self.ingestion.resolve(
repository.url,
branch=repository.branch,
access_username=access_username,
access_password=access_password,
)
scan_source = checkout.source_path
else:
scan_source = source_path

View File

@@ -57,6 +57,16 @@ class GitIngestionService:
)
return Checkout(source_path=checkout_path.resolve(), was_cloned=True)
def cached_checkout(self, url_or_path: str) -> Checkout | None:
local_path = self._local_path(url_or_path)
if local_path is not None:
return Checkout(source_path=local_path.resolve(), was_cloned=False)
checkout_path = self.checkout_root / self._checkout_key(url_or_path)
if not checkout_path.exists():
return None
return Checkout(source_path=checkout_path.resolve(), was_cloned=True)
def _checkout_branch(
self,
checkout_path: Path,

View File

@@ -245,6 +245,7 @@ def create_analysis_run(
summary = service.analyze_repository(
repository_id,
source_path=payload.source_path,
use_cached_checkout=payload.use_cached_checkout,
access_username=payload.access_username,
access_password=payload.access_password,
)

View File

@@ -202,6 +202,7 @@ class EvidenceUpdate(BaseModel):
class AnalysisRunCreate(BaseModel):
source_path: str | None = None
use_cached_checkout: bool = False
access_username: str | None = None
access_password: str | None = Field(default=None, repr=False)
@@ -210,6 +211,7 @@ class AnalysisRunCreate(BaseModel):
"examples": [
{},
{"source_path": "/path/to/local/repository"},
{"use_cached_checkout": True},
{
"access_username": "git-user",
"access_password": "access-token",

View File

@@ -501,6 +501,7 @@ def repository_detail(
<h2>Run Analysis</h2>
<form class="stack" method="post" action="/ui/repos/{repository_id}/analysis-runs">
<label>Override source path <input name="source_path" placeholder="Optional local path"></label>
<label class="checkbox"><input type="checkbox" name="use_cached_checkout" value="1"> Analyze cached checkout without fetching upstream</label>
<label>Username <input name="access_username" autocomplete="username" placeholder="Optional for private HTTP(S) repos"></label>
<label>Password or access token <input name="access_password" type="password" autocomplete="current-password" placeholder="Used for this Git operation only"></label>
<div class="actions">
@@ -516,6 +517,7 @@ def repository_detail(
</section>
<section class="panel">
<h2>Approved Ability Map</h2>
{render_graph_counts(asdict(ability_map), facts_count=None)}
{render_ability_map(asdict(ability_map), repository_id)}
</section>
</div>
@@ -821,6 +823,7 @@ def delete_evidence_from_form(
def create_analysis_run_from_form(
repository_id: int,
source_path: str = Form(""),
use_cached_checkout: str | None = Form(None),
access_username: str = Form(""),
access_password: str = Form(""),
service: RegistryService = Depends(get_service),
@@ -828,6 +831,7 @@ def create_analysis_run_from_form(
summary = service.analyze_repository(
repository_id,
source_path=source_path or None,
use_cached_checkout=bool(use_cached_checkout),
access_username=access_username or None,
access_password=access_password or None,
)
@@ -869,6 +873,7 @@ def analysis_run_detail(
<section class="panel">
<div class="actions">
<h2 style="margin-right:auto">Candidate Graph</h2>
{render_graph_counts(asdict(candidate_graph), facts_count=len(facts))}
<form method="post" action="/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}/candidate-graph/approve">
<button type="submit">Approve</button>
</form>
@@ -876,7 +881,10 @@ def analysis_run_detail(
{render_candidate_graph(asdict(candidate_graph), repository_id, analysis_run_id)}
</section>
<section class="panel">
<h2>Observed Facts</h2>
<div class="actions">
<h2 style="margin-right:auto">Observed Facts</h2>
{render_count_pills(facts=len(facts))}
</div>
<table>
<thead><tr><th>Kind</th><th>Name</th><th>Path</th><th>Value</th></tr></thead>
<tbody>{fact_rows or '<tr><td colspan="4" class="muted">No observed facts.</td></tr>'}</tbody>
@@ -1565,6 +1573,41 @@ def split_capability_lines(value: str) -> list[str]:
return [line.strip() for line in normalized.splitlines() if line.strip()]
def render_graph_counts(graph: dict, facts_count: int | None = None) -> str:
abilities = graph.get("abilities", [])
capabilities = [
capability
for ability in abilities
for capability in ability.get("capabilities", [])
]
features = [
feature
for capability in capabilities
for feature in capability.get("features", [])
]
counts: dict[str, int] = {
"abilities": len(abilities),
"capabilities": len(capabilities),
"features": len(features),
}
if facts_count is not None:
counts["facts"] = facts_count
return render_count_pills(**counts)
def render_count_pills(**counts: int) -> str:
labels = {
"abilities": "abilities",
"capabilities": "capabilities",
"features": "features",
"facts": "facts",
}
return "".join(
f'<span class="pill">{count} {labels[name]}</span>'
for name, count in counts.items()
)
def render_candidate_graph(graph: dict, repository_id: int, analysis_run_id: int) -> str:
abilities = graph.get("abilities", [])
if not abilities: