Files
repo-scoping/src/repo_registry/self_scoping/assessment.py

479 lines
17 KiB
Python

from __future__ import annotations
import json
import subprocess
from collections import Counter
from dataclasses import asdict
from datetime import UTC, datetime
from importlib import metadata
from pathlib import Path
from typing import Any
from repo_registry.acceptance import (
active_quality_criteria_version,
evaluate_candidate_graph_quality,
quality_gate_outcome_dicts,
)
from repo_registry.core.models import (
Ability,
CandidateAbility,
CandidateCapability,
CandidateEvidence,
CandidateFeature,
ContentChunk,
ObservedFact,
RepositoryAbilityMap,
ReviewDecision,
SourceReference,
)
from repo_registry.core.service import RegistryService
SCHEMA_VERSION = "self-scoping-assessment/v1"
KNOWN_PROVIDER_ROUTING_CAPABILITY = "Route LLM Requests Across Providers"
def export_assessment_artifact(
service: RegistryService,
repository_id: int,
analysis_run_id: int,
*,
role: str = "challenger",
outcome: str = "challenger",
reviewer: str = "codex",
summary: str | None = None,
engine_root: str | Path | None = None,
) -> dict[str, Any]:
"""Export a completed analysis run as a self-scoping assessment artifact."""
repository = service.get_repository(repository_id)
analysis_run = service.get_analysis_run(repository_id, analysis_run_id)
if analysis_run.status != "completed":
raise ValueError(
f"analysis run {analysis_run_id} is {analysis_run.status}, not completed"
)
snapshot = (
service.store.get_snapshot(analysis_run.snapshot_id)
if analysis_run.snapshot_id is not None
else None
)
facts = service.list_observed_facts(repository_id, analysis_run_id)
chunks = service.list_content_chunks(repository_id, analysis_run_id)
graph = service.candidate_graph(repository_id, analysis_run_id)
gate_outcomes = evaluate_candidate_graph_quality(graph)
ability_map = service.ability_map(repository_id)
decisions = service.list_review_decisions(repository_id, analysis_run_id)
engine_identity = _engine_identity(
analysis_run.scanner_version,
Path(engine_root or Path.cwd()),
)
regression_patterns = _known_regression_patterns(graph.abilities, decisions)
comparison_eligibility = _comparison_eligibility(
role,
engine_identity["release_binding_status"],
)
artifact_summary = summary or _summary(role, regression_patterns)
return {
"schema_version": SCHEMA_VERSION,
"artifact_id": _artifact_id(repository.name, analysis_run_id, role),
"artifact_type": "assessment_run",
"created_at": datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
"target_repository": {
"repo_slug": _slug(repository.name),
"repository_id": repository.id,
"source": snapshot.source_path if snapshot is not None else repository.url,
"target_commit": snapshot.commit_hash if snapshot is not None else "unknown",
"target_branch": snapshot.branch if snapshot is not None else repository.branch,
"dirty_state": _dirty_state(Path(snapshot.source_path)) if snapshot is not None else "unknown",
"file_count": snapshot.file_count if snapshot is not None else None,
},
"engine_identity": engine_identity,
"execution": {
"mode": _execution_mode(decisions),
"analysis_run_id": analysis_run.id,
"candidate_source": _candidate_source(decisions),
"acceptance_mode": _acceptance_mode(decisions),
"started_at": _timestamp(analysis_run.started_at),
"completed_at": _timestamp(analysis_run.completed_at),
},
"assessment": {
"role": role,
"outcome": outcome,
"summary": artifact_summary,
"reviewer": reviewer,
"comparison_eligibility": comparison_eligibility,
"rationale": _rationale(regression_patterns, comparison_eligibility),
},
"fact_summary": _fact_summary(facts),
"content_chunk_summary": _content_chunk_summary(chunks),
"generated_tree": {
"abilities": [_candidate_ability(ability) for ability in graph.abilities]
},
"approved_map": _approved_map(ability_map),
"review_decisions": [_review_decision(decision) for decision in decisions],
"quality_gate_outcomes": quality_gate_outcome_dicts(gate_outcomes),
"known_regression_patterns": regression_patterns,
"notes": [
"Generated by repo-scoping self-scoping assessment exporter.",
(
"Artifact is not comparable as a preferred baseline until engine "
"identity is complete."
if comparison_eligibility == "not_comparable"
else "Artifact has enough engine identity metadata for comparison."
),
],
}
def _engine_identity(scanner_version: str, engine_root: Path) -> dict[str, Any]:
engine_commit = _git_value(engine_root, "rev-parse", "HEAD")
dirty_state = _dirty_state(engine_root)
release = _git_value(engine_root, "describe", "--tags", "--exact-match")
release_binding_status = "complete" if engine_commit else "unbound"
return {
"repo_scoping_version": _package_version(),
"engine_commit": engine_commit,
"engine_release": release,
"engine_dirty_state": dirty_state,
"scanner_version": scanner_version,
"candidate_generator_version": "unversioned",
"quality_criteria_version": active_quality_criteria_version(),
"prompt_version": None,
"release_binding_status": release_binding_status,
"release_binding_note": (
"Engine commit was captured from git."
if engine_commit
else "Engine commit could not be captured; artifact is not comparable."
),
}
def _package_version() -> str:
try:
return metadata.version("repo-registry")
except metadata.PackageNotFoundError:
return "unknown"
def _git_value(root: Path, *args: str) -> str | None:
try:
result = subprocess.run(
["git", "-C", str(root), *args],
check=False,
capture_output=True,
text=True,
)
except OSError:
return None
value = result.stdout.strip()
return value if result.returncode == 0 and value else None
def _dirty_state(root: Path) -> str:
if not (root / ".git").exists():
return "unknown"
try:
result = subprocess.run(
["git", "-C", str(root), "status", "--short"],
check=False,
capture_output=True,
text=True,
)
except OSError:
return "unknown"
if result.returncode != 0:
return "unknown"
return "dirty" if result.stdout.strip() else "clean"
def _comparison_eligibility(role: str, release_binding_status: str) -> str:
if role == "negative_regression_seed":
return "eligible_as_negative_seed"
if release_binding_status == "complete":
return "eligible"
return "not_comparable"
def _summary(role: str, regression_patterns: list[dict[str, str]]) -> str:
if role == "negative_regression_seed":
return "Historical run captured as a negative self-scoping regression seed."
if regression_patterns:
return "Generated self-scoping assessment repeats known regression patterns."
return "Generated self-scoping assessment artifact for comparison."
def _rationale(
regression_patterns: list[dict[str, str]],
comparison_eligibility: str,
) -> list[str]:
rationale: list[str] = []
if comparison_eligibility == "not_comparable":
rationale.append("Engine identity is incomplete, so this cannot be a comparable baseline.")
for pattern in regression_patterns:
rationale.append(f"{pattern['id']}: {pattern['description']}")
return rationale
def _fact_summary(facts: list[ObservedFact]) -> dict[str, Any]:
return {
"counts_by_kind": dict(sorted(Counter(fact.kind for fact in facts).items())),
"contamination_sources": _contamination_sources(facts),
}
def _contamination_sources(facts: list[ObservedFact]) -> list[dict[str, str]]:
provider_kinds = {
"llm_provider",
"credential_config",
"provider_registry",
"fallback_policy",
}
suspicious_segments = (
"test",
"tests/",
"fixtures",
"expectations",
"schemas.py",
"scanner.py",
"normalization.py",
"workplans/",
)
results: list[dict[str, str]] = []
seen: set[str] = set()
for fact in facts:
lower = fact.path.lower()
if fact.kind not in provider_kinds or not any(segment in lower for segment in suspicious_segments):
continue
if fact.path in seen:
continue
seen.add(fact.path)
results.append(
{
"path": fact.path,
"reason": (
"Provider-related fact came from scanner rules, tests, fixtures, "
"schemas, or workplan context and needs native-utility review."
),
}
)
return sorted(results, key=lambda item: item["path"])
def _content_chunk_summary(chunks: list[ContentChunk]) -> dict[str, Any]:
source_roles = Counter(
str(chunk.metadata.get("source_role", "") or "unknown") for chunk in chunks
)
return {
"total": len(chunks),
"counts_by_kind": dict(sorted(Counter(chunk.kind for chunk in chunks).items())),
"counts_by_source_role": dict(sorted(source_roles.items())),
"paths": sorted({chunk.path for chunk in chunks}),
}
def _candidate_ability(ability: CandidateAbility) -> dict[str, Any]:
return {
"name": ability.name,
"status": ability.status,
"primary_class": ability.primary_class,
"source_refs": [_source_ref(ref) for ref in ability.source_refs],
"capabilities": [
_candidate_capability(capability) for capability in ability.capabilities
],
}
def _candidate_capability(capability: CandidateCapability) -> dict[str, Any]:
return {
"name": capability.name,
"status": capability.status,
"primary_class": capability.primary_class,
"source_refs": [_source_ref(ref) for ref in capability.source_refs],
"features": [_candidate_feature(feature) for feature in capability.features],
"evidence": [_candidate_evidence(evidence) for evidence in capability.evidence],
}
def _candidate_feature(feature: CandidateFeature) -> dict[str, Any]:
return {
"name": feature.name,
"type": feature.type,
"status": feature.status,
"primary_class": feature.primary_class,
"location": feature.location,
"source_refs": [_source_ref(ref) for ref in feature.source_refs],
}
def _candidate_evidence(evidence: CandidateEvidence) -> dict[str, Any]:
return {
"type": evidence.type,
"reference": evidence.reference,
"strength": evidence.strength,
"status": evidence.status,
"source_refs": [_source_ref(ref) for ref in evidence.source_refs],
}
def _approved_map(ability_map: RepositoryAbilityMap) -> dict[str, Any]:
return {
"scope": asdict(ability_map.scope),
"abilities": [_approved_ability(ability) for ability in ability_map.abilities],
}
def _approved_ability(ability: Ability) -> dict[str, Any]:
return {
"name": ability.name,
"primary_class": ability.primary_class,
"capabilities": [
{
"name": capability.name,
"primary_class": capability.primary_class,
"features": [
{
"name": feature.name,
"type": feature.type,
"primary_class": feature.primary_class,
"location": feature.location,
"source_refs": [
_source_ref(ref) for ref in feature.source_refs
],
}
for feature in capability.features
],
"evidence": [asdict(evidence) for evidence in capability.evidence],
}
for capability in ability.capabilities
],
}
def _source_ref(ref: SourceReference) -> dict[str, Any]:
return asdict(ref)
def _review_decision(decision: ReviewDecision) -> dict[str, Any]:
payload = asdict(decision)
payload["quality_criteria_version"] = active_quality_criteria_version()
return payload
def _known_regression_patterns(
abilities: list[CandidateAbility],
decisions: list[ReviewDecision],
) -> list[dict[str, str]]:
patterns: list[dict[str, str]] = []
llm_capabilities = [
capability
for ability in abilities
for capability in ability.capabilities
if capability.name == KNOWN_PROVIDER_ROUTING_CAPABILITY
]
if llm_capabilities:
patterns.append(
{
"id": "RREG-SELF-REG-001",
"title": "LLM provider vocabulary promoted as native capability",
"severity": "critical",
"description": (
"Generated tree contains Route LLM Requests Across Providers "
"as a repo-scoping capability."
),
"detection_hint": (
"Flag the provider-routing capability unless product intent "
"and public implementation explicitly support it."
),
}
)
if any(
feature.type in {"API", "CLI"}
for capability in llm_capabilities
for feature in capability.features
):
patterns.append(
{
"id": "RREG-SELF-REG-002",
"title": "Native API and CLI surfaces attached under false capability",
"severity": "high",
"description": (
"API or CLI surface features are nested below provider routing."
),
"detection_hint": (
"Flag API/CLI surface features whose parent capability is "
"llm-integration or provider-routing."
),
}
)
if any(decision.action == "trusted_auto_approve_candidate_graph" for decision in decisions):
patterns.append(
{
"id": "RREG-SELF-REG-003",
"title": "Deterministic trusted auto-approval accepted candidate truth",
"severity": "high",
"description": (
"Candidate characteristics were approved through trusted "
"auto-approval instead of human or agentic judgement."
),
"detection_hint": "Flag trusted_auto_approve_candidate_graph review decisions.",
}
)
return patterns
def _execution_mode(decisions: list[ReviewDecision]) -> str:
if any(decision.action.startswith("agentic_review") for decision in decisions):
return "agentic-review"
if any(decision.action == "trusted_auto_approve_candidate_graph" for decision in decisions):
return "trusted-auto-review"
if any(decision.action == "llm_extraction_used" for decision in decisions):
return "llm-assisted"
if any(decision.action.startswith("approve") for decision in decisions):
return "manual-review"
return "deterministic-only"
def _candidate_source(decisions: list[ReviewDecision]) -> str:
return "llm+deterministic" if any(
decision.action == "llm_extraction_used" for decision in decisions
) else "deterministic"
def _acceptance_mode(decisions: list[ReviewDecision]) -> str:
agentic_decision = next(
(decision for decision in decisions if decision.action.startswith("agentic_review")),
None,
)
if agentic_decision is not None:
return agentic_decision.action
if any(decision.action == "trusted_auto_approve_candidate_graph" for decision in decisions):
return "trusted_auto_approve_candidate_graph"
if any(decision.action == "approve_candidate_graph" for decision in decisions):
return "manual_candidate_graph_approval"
if any(decision.action == "approve_analysis_run_changes" for decision in decisions):
return "manual_change_approval"
return "pending_review"
def _timestamp(value: str | None) -> str | None:
if value is None:
return None
if "T" in value:
return value
return value.replace(" ", "T") + "Z"
def _artifact_id(repository_name: str, analysis_run_id: int, role: str) -> str:
return f"{_slug(repository_name)}-{role}-run-{analysis_run_id}"
def _slug(value: str) -> str:
return "-".join(
token for token in "".join(char.lower() if char.isalnum() else "-" for char in value).split("-") if token
)
def artifact_json(artifact: dict[str, Any]) -> str:
return json.dumps(artifact, indent=2, sort_keys=True) + "\n"