Status drift warnings

This commit is contained in:
2026-05-02 11:54:07 +02:00
parent 06ed12b2c7
commit 010d23bc38
10 changed files with 538 additions and 8 deletions

View File

@@ -129,6 +129,18 @@ class ScanSummary:
facts: list[ObservedFact]
@dataclass(frozen=True)
class CharacteristicRebuildResult:
repository: Repository
analysis_run: AnalysisRun
dry_run: bool
confirmed: bool
cleared_approved: bool
previous_counts: dict[str, int]
previous_ids: dict[str, list[int]]
candidate_counts: dict[str, int]
@dataclass(frozen=True)
class SourceReference:
fact_id: int | None

View File

@@ -15,6 +15,7 @@ from repo_registry.core.models import (
CandidateEvidence,
CandidateFeature,
CandidateGraph,
CharacteristicRebuildResult,
ContentChunk,
ExpectationGap,
ObservedFact,
@@ -228,14 +229,13 @@ class RegistryService:
notes=f"Generated {len(candidates)} candidate ability draft(s).",
)
if trusted_auto_approve:
self.approve_candidate_graph(
self.trusted_auto_approve_candidate_graph(
repository_id,
completed_run.id,
notes=(
"Trusted auto-populate mode approved candidate graph "
"Trusted auto-populate mode reviewed candidate graph "
f"after {candidate_source} candidate generation."
),
action="trusted_auto_approve_candidate_graph",
)
log_operation(
"analysis_completed",
@@ -335,6 +335,79 @@ class RegistryService:
def candidate_graph(self, repository_id: int, analysis_run_id: int) -> CandidateGraph:
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def rebuild_characteristics_from_scratch(
self,
repository_id: int,
*,
dry_run: bool = True,
confirm: bool = False,
source_path: str | None = None,
use_cached_checkout: bool = False,
use_llm_assistance: bool = True,
access_username: str | None = None,
access_password: str | None = None,
) -> CharacteristicRebuildResult:
if not dry_run and not confirm:
raise ValueError("confirmed rebuild requires confirm=True")
repository = self.store.get_repository(repository_id)
previous_counts = self._approved_counts(repository_id)
previous_ids = self._approved_ids(repository_id)
summary = self.analyze_repository(
repository_id,
source_path=source_path,
use_cached_checkout=use_cached_checkout,
use_llm_assistance=use_llm_assistance,
trusted_auto_approve=False,
access_username=access_username,
access_password=access_password,
)
if summary.analysis_run.status != "completed":
return CharacteristicRebuildResult(
repository=repository,
analysis_run=summary.analysis_run,
dry_run=dry_run,
confirmed=confirm,
cleared_approved=False,
previous_counts=previous_counts,
previous_ids=previous_ids,
candidate_counts={},
)
graph = self.store.get_candidate_graph(repository_id, summary.analysis_run.id)
candidate_counts = self._candidate_counts(graph)
cleared = False
if not dry_run:
self.store.clear_approved_characteristics(repository_id)
self.store.update_repository_status(repository_id, "analyzed")
cleared = True
action = (
"rebuild_characteristics_from_scratch"
if cleared
else "dry_run_rebuild_characteristics_from_scratch"
)
self.store.create_review_decision(
repository_id,
summary.analysis_run.id,
action=action,
notes=(
f"Previous approved counts: {previous_counts}. "
f"Previous approved IDs: {previous_ids}. "
f"New candidate counts: {candidate_counts}."
),
)
return CharacteristicRebuildResult(
repository=repository,
analysis_run=summary.analysis_run,
dry_run=dry_run,
confirmed=confirm,
cleared_approved=cleared,
previous_counts=previous_counts,
previous_ids=previous_ids,
candidate_counts=candidate_counts,
)
def approve_candidate_graph(
self,
repository_id: int,
@@ -406,6 +479,160 @@ class RegistryService:
self.store.update_repository_status(repository_id, "indexed")
return self.store.get_ability_map(repository_id)
def trusted_auto_approve_candidate_graph(
self,
repository_id: int,
analysis_run_id: int,
*,
notes: str = "",
) -> RepositoryAbilityMap:
graph = self.store.get_candidate_graph(repository_id, analysis_run_id)
approved_count = 0
skipped_count = 0
for ability in graph.abilities:
if ability.status != "candidate":
continue
candidate_capabilities = [
capability
for capability in ability.capabilities
if capability.status == "candidate"
]
safe_capabilities = [
capability
for capability in candidate_capabilities
if self._trusted_auto_approve_capability_safe(capability)
]
skipped_count += len(candidate_capabilities) - len(safe_capabilities)
if not safe_capabilities:
continue
approved_ability_id = self._ensure_approved_ability(repository_id, ability)
for capability in safe_capabilities:
self._create_approved_capability_subtree(
repository_id,
approved_ability_id,
capability,
)
self.store.mark_candidate_capability_status(
repository_id,
analysis_run_id,
capability.id,
"approved",
)
approved_count += 1
if len(safe_capabilities) == len(candidate_capabilities):
self.store.mark_candidate_ability_status(
repository_id,
analysis_run_id,
ability.id,
"approved",
)
if approved_count:
self.store.update_repository_status(repository_id, "indexed")
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="trusted_auto_approve_candidate_graph",
notes=(
f"{notes} Auto-approved {approved_count} safe candidate "
f"capability(s); left {skipped_count} for review."
).strip(),
)
return self.store.get_ability_map(repository_id)
def _trusted_auto_approve_capability_safe(
self,
capability: CandidateCapability,
) -> bool:
has_source_refs = bool(capability.source_refs) or any(
feature.source_refs for feature in capability.features
)
if not has_source_refs:
return False
if capability.primary_class == "repository-structure":
return False
if capability.primary_class == "llm-integration":
return bool(
{"utility-owned", "utility-facade", "utility-adapter"}
& set(capability.attributes)
)
if capability.primary_class in {"interface", "API", "CLI", "callable", "api", "cli"}:
return capability.confidence >= 0.55
if capability.features:
return capability.confidence >= 0.55
return capability.confidence >= 0.75
def _approved_counts(self, repository_id: int) -> dict[str, int]:
ability_map = self.store.get_ability_map(repository_id)
capabilities = [
capability
for ability in ability_map.abilities
for capability in ability.capabilities
]
features = [
feature
for capability in capabilities
for feature in capability.features
]
evidence = [
item
for capability in capabilities
for item in capability.evidence
]
return {
"abilities": len(ability_map.abilities),
"capabilities": len(capabilities),
"features": len(features),
"evidence": len(evidence),
}
def _approved_ids(self, repository_id: int) -> dict[str, list[int]]:
ability_map = self.store.get_ability_map(repository_id)
capabilities = [
capability
for ability in ability_map.abilities
for capability in ability.capabilities
]
features = [
feature
for capability in capabilities
for feature in capability.features
]
evidence = [
item
for capability in capabilities
for item in capability.evidence
]
return {
"abilities": [ability.id for ability in ability_map.abilities],
"capabilities": [capability.id for capability in capabilities],
"features": [feature.id for feature in features],
"evidence": [item.id for item in evidence],
}
def _candidate_counts(self, graph: CandidateGraph) -> dict[str, int]:
capabilities = [
capability
for ability in graph.abilities
for capability in ability.capabilities
]
features = [
feature
for capability in capabilities
for feature in capability.features
]
evidence = [
item
for capability in capabilities
for item in capability.evidence
]
return {
"abilities": len(graph.abilities),
"capabilities": len(capabilities),
"features": len(features),
"evidence": len(evidence),
}
def accept_candidate_ability(
self,
repository_id: int,