Add deterministic quality gate outcomes

This commit is contained in:
2026-05-15 15:37:00 +02:00
parent 7851eae42f
commit 83d5044ff4
11 changed files with 449 additions and 18 deletions

View File

@@ -15,3 +15,8 @@ repo-scoping list-quality-criteria --format markdown
```
The same registry is available from the API at `GET /quality-criteria`.
The first deterministic gate evaluator emits audit outcomes against this
registry. Candidate graph API responses and self-scoping assessment exports
include those outcomes as `quality_gate_outcomes`. Gate outcomes can block or
flag review, but they never approve candidates.

View File

@@ -5,11 +5,21 @@ from repo_registry.acceptance.criteria import (
criteria_registry_markdown,
load_quality_criteria,
)
from repo_registry.acceptance.gates import (
blocking_quality_gate_outcomes,
evaluate_candidate_capability_quality,
evaluate_candidate_graph_quality,
quality_gate_outcome_dicts,
)
__all__ = [
"active_quality_criteria_version",
"blocking_quality_gate_outcomes",
"criteria_registry_dict",
"criteria_registry_json",
"criteria_registry_markdown",
"evaluate_candidate_capability_quality",
"evaluate_candidate_graph_quality",
"load_quality_criteria",
"quality_gate_outcome_dicts",
]

View File

@@ -0,0 +1,215 @@
from __future__ import annotations
from dataclasses import asdict, dataclass
from repo_registry.acceptance.criteria import (
QualityCriteriaRegistry,
QualityCriterion,
load_quality_criteria,
)
from repo_registry.core.models import (
CandidateCapability,
CandidateFeature,
CandidateGraph,
SourceReference,
)
PROVIDER_ROUTING_CAPABILITY = "Route LLM Requests Across Providers"
BLOCKING_OUTCOMES = {"downgraded", "rejected", "invalidated", "requires_review"}
@dataclass(frozen=True)
class QualityGateOutcome:
criteria_version: str
criterion_id: str
criterion_title: str
severity: str
outcome: str
element_type: str
element_id: int
element_name: str
reason: str
def evaluate_candidate_graph_quality(
graph: CandidateGraph,
registry: QualityCriteriaRegistry | None = None,
) -> list[QualityGateOutcome]:
active_registry = registry or load_quality_criteria()
outcomes: list[QualityGateOutcome] = []
for ability in graph.abilities:
for capability in ability.capabilities:
outcomes.extend(evaluate_candidate_capability_quality(capability, active_registry))
return outcomes
def evaluate_candidate_capability_quality(
capability: CandidateCapability,
registry: QualityCriteriaRegistry | None = None,
) -> list[QualityGateOutcome]:
active_registry = registry or load_quality_criteria()
criteria = {criterion.id: criterion for criterion in active_registry.criteria}
outcomes: list[QualityGateOutcome] = []
refs = _capability_refs(capability)
if not refs:
outcomes.append(
_outcome(
active_registry,
criteria["RREG-QC-004"],
element_type="capability",
element_id=capability.id,
element_name=capability.name,
reason="Candidate capability has no source refs supporting the abstraction.",
)
)
elif _all_generated_scope_refs(refs):
outcomes.append(
_outcome(
active_registry,
criteria["RREG-QC-005"],
element_type="capability",
element_id=capability.id,
element_name=capability.name,
reason="Candidate is supported only by generated SCOPE.md evidence.",
)
)
elif _all_weak_source_refs(refs):
outcomes.append(
_outcome(
active_registry,
criteria["RREG-QC-001"],
element_type="capability",
element_id=capability.id,
element_name=capability.name,
reason="All supporting refs are weak source roles for capability truth.",
)
)
outcomes.append(
_outcome(
active_registry,
criteria["RREG-QC-006"],
element_type="capability",
element_id=capability.id,
element_name=capability.name,
reason="Candidate is primarily supported by tests, fixtures, schemas, or examples.",
)
)
if _looks_like_provider_routing(capability):
outcomes.append(
_outcome(
active_registry,
criteria["RREG-QC-002"],
element_type="capability",
element_id=capability.id,
element_name=capability.name,
reason=(
"Provider-routing or LLM-integration vocabulary requires "
"explicit product evidence before it can be native utility."
),
)
)
for feature in capability.features:
if _feature_misplaced_under_provider_routing(capability, feature):
outcomes.append(
_outcome(
active_registry,
criteria["RREG-QC-003"],
element_type="feature",
element_id=feature.id,
element_name=feature.name,
reason=(
"API/CLI surface is nested below provider-routing or "
"LLM-integration capability."
),
)
)
return outcomes
def blocking_quality_gate_outcomes(
outcomes: list[QualityGateOutcome],
) -> list[QualityGateOutcome]:
return [outcome for outcome in outcomes if outcome.outcome in BLOCKING_OUTCOMES]
def quality_gate_outcome_dicts(
outcomes: list[QualityGateOutcome],
) -> list[dict[str, object]]:
return [asdict(outcome) for outcome in outcomes]
def _outcome(
registry: QualityCriteriaRegistry,
criterion: QualityCriterion,
*,
element_type: str,
element_id: int,
element_name: str,
reason: str,
) -> QualityGateOutcome:
return QualityGateOutcome(
criteria_version=registry.criteria_version,
criterion_id=criterion.id,
criterion_title=criterion.title,
severity=criterion.severity,
outcome=criterion.deterministic_action,
element_type=element_type,
element_id=element_id,
element_name=element_name,
reason=reason,
)
def _capability_refs(capability: CandidateCapability) -> list[SourceReference]:
refs = list(capability.source_refs)
for feature in capability.features:
refs.extend(feature.source_refs)
for evidence in capability.evidence:
refs.extend(evidence.source_refs)
return refs
def _looks_like_provider_routing(capability: CandidateCapability) -> bool:
return (
capability.name == PROVIDER_ROUTING_CAPABILITY
or capability.primary_class in {"llm-integration", "provider-routing"}
)
def _feature_misplaced_under_provider_routing(
capability: CandidateCapability,
feature: CandidateFeature,
) -> bool:
if not _looks_like_provider_routing(capability):
return False
return feature.type.upper() in {"API", "CLI"} or feature.primary_class.upper() in {
"API",
"CLI",
}
def _all_generated_scope_refs(refs: list[SourceReference]) -> bool:
return bool(refs) and all(ref.path.endswith("SCOPE.md") for ref in refs)
def _all_weak_source_refs(refs: list[SourceReference]) -> bool:
return bool(refs) and all(_is_weak_source_ref(ref) for ref in refs)
def _is_weak_source_ref(ref: SourceReference) -> bool:
path = ref.path.lower()
kind = ref.kind.lower()
return (
path.startswith("tests/")
or "/tests/" in path
or "fixture" in path
or path.startswith("docs/schemas/")
or "schema" in kind
or "example" in kind
or kind in {"test", "fixture", "schema-example", "generated-scope"}
)

View File

@@ -4,6 +4,10 @@ from collections.abc import Sequence
from dataclasses import asdict, replace
from typing import Any
from repo_registry.acceptance import (
blocking_quality_gate_outcomes,
evaluate_candidate_capability_quality,
)
from repo_registry.core.models import (
AbilitySummary,
AnalysisRunDiff,
@@ -609,6 +613,13 @@ class RegistryService:
self,
capability: CandidateCapability,
) -> tuple[bool, str]:
gate_outcomes = evaluate_candidate_capability_quality(capability)
blocking_outcomes = blocking_quality_gate_outcomes(gate_outcomes)
if blocking_outcomes:
criteria = ", ".join(
sorted({outcome.criterion_id for outcome in blocking_outcomes})
)
return False, f"quality gates require review ({criteria})"
has_source_refs = bool(capability.source_refs) or any(
feature.source_refs for feature in capability.features
)

View File

@@ -9,7 +9,11 @@ from importlib import metadata
from pathlib import Path
from typing import Any
from repo_registry.acceptance import active_quality_criteria_version
from repo_registry.acceptance import (
active_quality_criteria_version,
evaluate_candidate_graph_quality,
quality_gate_outcome_dicts,
)
from repo_registry.core.models import (
Ability,
CandidateAbility,
@@ -56,6 +60,7 @@ def export_assessment_artifact(
facts = service.list_observed_facts(repository_id, analysis_run_id)
chunks = service.list_content_chunks(repository_id, analysis_run_id)
graph = service.candidate_graph(repository_id, analysis_run_id)
gate_outcomes = evaluate_candidate_graph_quality(graph)
ability_map = service.ability_map(repository_id)
decisions = service.list_review_decisions(repository_id, analysis_run_id)
engine_identity = _engine_identity(
@@ -107,7 +112,7 @@ def export_assessment_artifact(
},
"approved_map": _approved_map(ability_map),
"review_decisions": [_review_decision(decision) for decision in decisions],
"quality_gate_outcomes": [],
"quality_gate_outcomes": quality_gate_outcome_dicts(gate_outcomes),
"known_regression_patterns": regression_patterns,
"notes": [
"Generated by repo-scoping self-scoping assessment exporter.",

View File

@@ -12,7 +12,12 @@ from fastapi.responses import PlainTextResponse
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
from repo_registry.acceptance import criteria_registry_dict, load_quality_criteria
from repo_registry.acceptance import (
criteria_registry_dict,
evaluate_candidate_graph_quality,
load_quality_criteria,
quality_gate_outcome_dicts,
)
from repo_registry.core.service import RegistryService
from repo_registry.llm_extraction import LLMCandidateExtractor, create_llm_connect_adapter
from repo_registry.repo_ingestion.git import GitIngestionService
@@ -121,6 +126,14 @@ def get_service(settings: Settings = Depends(get_settings)) -> RegistryService:
)
def candidate_graph_payload(graph) -> dict[str, object]:
payload = asdict(graph)
payload["quality_gate_outcomes"] = quality_gate_outcome_dicts(
evaluate_candidate_graph_quality(graph)
)
return payload
API_DESCRIPTION = (
"Register repositories, analyze their observable implementation facts, "
"curate reviewable scope graphs, and search approved repository characteristics."
@@ -525,7 +538,9 @@ def get_candidate_graph(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(service.candidate_graph(repository_id, analysis_run_id))
return candidate_graph_payload(
service.candidate_graph(repository_id, analysis_run_id)
)
except NotFoundError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
@@ -590,7 +605,7 @@ def reject_candidate_ability(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
return candidate_graph_payload(
service.reject_candidate_ability(
repository_id,
analysis_run_id,
@@ -616,7 +631,7 @@ def reject_candidate_capability(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
return candidate_graph_payload(
service.reject_candidate_capability(
repository_id,
analysis_run_id,
@@ -642,7 +657,7 @@ def reject_candidate_feature(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
return candidate_graph_payload(
service.reject_candidate_feature(
repository_id,
analysis_run_id,
@@ -668,7 +683,7 @@ def reject_candidate_evidence(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
return candidate_graph_payload(
service.reject_candidate_evidence(
repository_id,
analysis_run_id,
@@ -694,7 +709,7 @@ def edit_candidate_ability(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
return candidate_graph_payload(
service.edit_candidate_ability(
repository_id,
analysis_run_id,
@@ -720,7 +735,7 @@ def edit_candidate_capability(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
return candidate_graph_payload(
service.edit_candidate_capability(
repository_id,
analysis_run_id,
@@ -746,7 +761,7 @@ def relink_candidate_capability(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
return candidate_graph_payload(
service.relink_candidate_capability(
repository_id,
analysis_run_id,
@@ -772,7 +787,7 @@ def relink_candidate_feature(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
return candidate_graph_payload(
service.relink_candidate_feature(
repository_id,
analysis_run_id,
@@ -798,7 +813,7 @@ def relink_candidate_evidence(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
return candidate_graph_payload(
service.relink_candidate_evidence(
repository_id,
analysis_run_id,
@@ -824,7 +839,7 @@ def merge_candidate_ability(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
return candidate_graph_payload(
service.merge_candidate_ability(
repository_id,
analysis_run_id,
@@ -850,7 +865,7 @@ def merge_candidate_capability(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
return candidate_graph_payload(
service.merge_candidate_capability(
repository_id,
analysis_run_id,
@@ -876,7 +891,7 @@ def merge_candidate_feature(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
return candidate_graph_payload(
service.merge_candidate_feature(
repository_id,
analysis_run_id,
@@ -902,7 +917,7 @@ def merge_candidate_evidence(
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
return candidate_graph_payload(
service.merge_candidate_evidence(
repository_id,
analysis_run_id,

View File

@@ -638,10 +638,23 @@ class CandidateAbilityResponse(BaseModel):
capabilities: list[CandidateCapabilityResponse]
class QualityGateOutcomeResponse(BaseModel):
criteria_version: str
criterion_id: str
criterion_title: str
severity: str
outcome: str
element_type: str
element_id: int
element_name: str
reason: str
class CandidateGraphResponse(BaseModel):
repository: RepositoryResponse
analysis_run: AnalysisRunResponse
abilities: list[CandidateAbilityResponse]
quality_gate_outcomes: list[QualityGateOutcomeResponse] = Field(default_factory=list)
model_config = {
"json_schema_extra": {
@@ -698,6 +711,7 @@ class CandidateGraphResponse(BaseModel):
],
}
],
"quality_gate_outcomes": [],
}
]
}

140
tests/test_quality_gates.py Normal file
View File

@@ -0,0 +1,140 @@
from repo_registry.acceptance import (
blocking_quality_gate_outcomes,
evaluate_candidate_capability_quality,
evaluate_candidate_graph_quality,
quality_gate_outcome_dicts,
)
from repo_registry.core.models import (
AnalysisRun,
CandidateAbility,
CandidateCapability,
CandidateFeature,
CandidateGraph,
Repository,
SourceReference,
)
from repo_registry.core.service import RegistryService
from repo_registry.repo_ingestion.git import GitIngestionService
from repo_registry.storage.sqlite import RegistryStore
def source_ref(path="src/app.py", kind="source"):
return SourceReference(
fact_id=1,
path=path,
kind=kind,
name=path,
line=1,
)
def provider_routing_capability():
return CandidateCapability(
id=10,
name="Route LLM Requests Across Providers",
description="Routes provider requests.",
inputs=[],
outputs=[],
confidence=0.9,
status="candidate",
source_refs=[source_ref("src/providers.py")],
confidence_label="high",
primary_class="llm-integration",
attributes=["utility-owned"],
features=[
CandidateFeature(
id=20,
name="HTTP API surface",
type="API",
location="src/app.py",
confidence=0.8,
status="candidate",
source_refs=[source_ref("src/app.py")],
confidence_label="high",
primary_class="API",
)
],
)
def test_quality_gates_flag_known_provider_routing_failure():
outcomes = evaluate_candidate_capability_quality(provider_routing_capability())
outcome_ids = {outcome.criterion_id for outcome in outcomes}
assert {"RREG-QC-002", "RREG-QC-003"} <= outcome_ids
assert all(outcome.outcome != "approve" for outcome in outcomes)
assert blocking_quality_gate_outcomes(outcomes)
def test_quality_gates_flag_circular_scope_evidence():
capability = CandidateCapability(
id=11,
name="Map Repository Scope",
description="Uses generated scope.",
inputs=[],
outputs=[],
confidence=0.8,
status="candidate",
source_refs=[source_ref("SCOPE.md", "generated-scope")],
confidence_label="high",
primary_class="scope-generation",
attributes=["utility-owned"],
)
outcomes = evaluate_candidate_capability_quality(capability)
assert outcomes[0].criterion_id == "RREG-QC-005"
assert outcomes[0].outcome == "rejected"
def test_quality_gate_outcomes_are_serializable_for_assessment_artifacts():
graph = CandidateGraph(
repository=Repository(
id=1,
name="Repo",
url=".",
description=None,
branch="main",
status="indexed",
),
analysis_run=AnalysisRun(
id=1,
repository_id=1,
snapshot_id=None,
status="completed",
started_at="2026-05-15T00:00:00Z",
completed_at="2026-05-15T00:00:01Z",
error_message=None,
scanner_version="deterministic-v1",
),
abilities=[
CandidateAbility(
id=1,
name="Support Repo",
description="Support repo.",
confidence=0.8,
status="candidate",
source_refs=[],
capabilities=[provider_routing_capability()],
)
],
)
payload = quality_gate_outcome_dicts(evaluate_candidate_graph_quality(graph))
assert payload
assert payload[0]["criteria_version"] == "repo-scoping-quality-criteria/v1"
def test_legacy_trusted_auto_approval_skips_quality_gate_blocked_capability(tmp_path):
store = RegistryStore(tmp_path / "registry.sqlite3")
store.initialize()
service = RegistryService(store, ingestion=GitIngestionService(tmp_path / "checkouts"))
safe, reason = service._trusted_auto_approve_capability_decision(
provider_routing_capability()
)
assert safe is False
assert "quality gates require review" in reason
assert "RREG-QC-002" in reason

View File

@@ -94,7 +94,9 @@ def test_export_assessment_artifact_flags_known_provider_regression(tmp_path):
)
regression_ids = {item["id"] for item in artifact["known_regression_patterns"]}
gate_ids = {item["criterion_id"] for item in artifact["quality_gate_outcomes"]}
assert "RREG-SELF-REG-001" in regression_ids
assert "RREG-QC-002" in gate_ids
assert any(
item["path"] == "providers.py"
for item in artifact["fact_summary"]["contamination_sources"]

View File

@@ -1020,6 +1020,10 @@ def test_api_analysis_run_loop(tmp_path):
assert candidate_response.status_code == 200
candidate_graph = candidate_response.json()
assert candidate_graph["abilities"][0]["name"] == "Support Frontend"
assert any(
outcome["criterion_id"] == "RREG-QC-004"
for outcome in candidate_graph["quality_gate_outcomes"]
)
candidate_ability_id = candidate_graph["abilities"][0]["id"]
candidate_capability_id = candidate_graph["abilities"][0]["capabilities"][0]["id"]

View File

@@ -111,7 +111,7 @@ version binding.
```task
id: RREG-WP-0014-T03
status: todo
status: done
priority: high
state_hub_task_id: "d599c084-a207-4910-9d0b-578d0c50f282"
```
@@ -129,6 +129,16 @@ Acceptance criteria:
- The known repo-scoping LLM-provider self-scan failure is flagged before
acceptance.
Implementation note 2026-05-15: added
`src/repo_registry/acceptance/gates.py` with deterministic quality-gate
outcomes tied to `repo-scoping-quality-criteria/v1`. Candidate graph API
responses and self-scoping assessment exports now include
`quality_gate_outcomes`. The legacy trusted auto-approval path now refuses
capabilities with blocking gate outcomes instead of approving them. Focused
tests cover provider-routing regression flags, circular generated-scope
evidence, serializable gate outcomes, candidate graph API exposure, and the
legacy auto-approval guard.
## T04: Replace Trusted Auto-Approval With Agentic Review
```task