preflight gating

This commit is contained in:
2026-05-07 15:52:13 +02:00
parent 4f8d8a1f52
commit 18299b03aa
7 changed files with 157 additions and 5 deletions

View File

@@ -27,10 +27,7 @@ def run_assessment(
run_dir = output_dir or root / "runs" / run_id
created_at = _now()
evidence = [
_evidence_for_step(root, run_dir, run_id, plan, step)
for step in plan["ordered_steps"]
]
evidence = _execute_steps(root, run_dir, run_id, plan)
for item in evidence:
assert_valid(item, "evidence-item")
@@ -83,6 +80,63 @@ def run_assessment(
}
def _execute_steps(
root: Path,
run_dir: Path,
run_id: str,
plan: dict[str, Any],
) -> list[dict[str, Any]]:
evidence: list[dict[str, Any]] = []
preflight_blocks: dict[str, dict[str, Any]] = {}
for step in plan["ordered_steps"]:
extension_id = step["extension_id"]
if step["kind"] == "check_group" and extension_id in preflight_blocks:
item = _blocked_by_preflight_evidence(run_id, plan, step, preflight_blocks[extension_id])
else:
item = _evidence_for_step(root, run_dir, run_id, plan, step)
evidence.append(item)
if step["kind"] == "preflight" and _blocks_downstream(item):
preflight_blocks[extension_id] = item
return evidence
def _blocked_by_preflight_evidence(
run_id: str,
plan: dict[str, Any],
step: dict[str, Any],
preflight: dict[str, Any],
) -> dict[str, Any]:
now = _now()
runner_ref = step.get("runner_ref")
return {
"id": f"evidence:{step['id']}",
"run_id": run_id,
"extension_id": step["extension_id"],
"check_id": step["id"],
"subject_ref": plan["target_profile_snapshot"]["id"],
"result": "blocked",
"observations": [
"Check group was not executed because extension preflight did not pass."
],
"facts": {
"step_kind": step["kind"],
"runner_ref": runner_ref,
"blocked_reason": "preflight_failed",
"preflight_evidence_ref": preflight["id"],
"preflight_result": preflight["result"],
},
"requirement_refs": _requirement_refs(plan, step),
"artifact_refs": [],
"started_at": now,
"completed_at": now,
}
def _blocks_downstream(evidence: dict[str, Any]) -> bool:
return evidence["result"] in {"fail", "blocked", "infrastructure_error"}
def _evidence_for_step(
root: Path,
run_dir: Path,
@@ -169,6 +223,7 @@ def _expected_for_item(item: dict[str, Any]) -> bool:
return blocked_reason in {
"missing_command",
"missing_dependency",
"preflight_failed",
"tck_invocation_not_configured",
}
@@ -179,6 +234,8 @@ def _remediation_for_item(item: dict[str, Any]) -> str:
blocked_reason = item.get("facts", {}).get("blocked_reason")
if blocked_reason == "missing_dependency":
return "Install the missing runner dependencies and rerun the assessment."
if blocked_reason == "preflight_failed":
return "Fix the preflight failure and rerun downstream checks."
if blocked_reason == "tck_invocation_not_configured":
return "Configure the final harness invocation, group mapping, and raw artifact capture."
return "Implement or configure the declared extension runner."