"""Baseline assessment execution.""" from __future__ import annotations from collections import Counter from datetime import datetime, timezone from pathlib import Path from typing import Any from guide_board.artifacts import build_artifact_manifest from guide_board.io import write_json from guide_board.mapping import build_mapping_records, summarize_mappings from guide_board.planning import build_run_plan from guide_board.policy import apply_policy from guide_board.retention import build_retention_summary from guide_board.runners import run_step from guide_board.schema import assert_valid def run_assessment( root: Path, target_path: Path, assessment_path: Path, output_dir: Path | None = None, extension_dirs: list[Path] | None = None, ) -> dict[str, Any]: plan = build_run_plan(root, target_path, assessment_path, extension_dirs) run_id = f"run-{_timestamp()}" run_dir = output_dir or root / "runs" / run_id created_at = _now() evidence = _execute_steps(root, run_dir, run_id, plan) for item in evidence: assert_valid(item, "evidence-item") findings = _findings_for_evidence(run_id, evidence) findings, policy_summary, applied_waivers = apply_policy(root, plan, findings) for finding in findings: assert_valid(finding, "finding") artifact_manifest = build_artifact_manifest(run_dir, run_id, evidence) mapping_records = build_mapping_records(root, run_id, plan, evidence) mapping_summary = summarize_mappings(mapping_records) assessment_package = _assessment_package( run_id, plan, evidence, findings, artifact_manifest, mapping_summary, policy_summary, applied_waivers, created_at, ) assert_valid(assessment_package, "assessment-package") run_metadata = { "id": run_id, "status": _run_status(evidence), "created_at": created_at, "plan_id": plan["id"], "target_profile_ref": plan["target_profile_snapshot"]["id"], "assessment_profile_ref": plan["assessment_profile_snapshot"]["id"], } retention_summary = build_retention_summary(run_metadata, plan, assessment_package) assert_valid(retention_summary, "retention-summary") _write_run_directory( run_dir, run_metadata, plan, evidence, findings, mapping_records, assessment_package, retention_summary, ) return { "status": run_metadata["status"], "run_id": run_id, "run_dir": str(run_dir), "assessment_package": str(run_dir / "reports" / "assessment-package.json"), "report": str(run_dir / "reports" / "report.md"), "retention_summary": str(run_dir / "retention-summary.json"), } def _execute_steps( root: Path, run_dir: Path, run_id: str, plan: dict[str, Any], ) -> list[dict[str, Any]]: evidence: list[dict[str, Any]] = [] preflight_blocks: dict[str, dict[str, Any]] = {} for step in plan["ordered_steps"]: extension_id = step["extension_id"] if step["kind"] == "check_group" and extension_id in preflight_blocks: item = _blocked_by_preflight_evidence(run_id, plan, step, preflight_blocks[extension_id]) else: item = _evidence_for_step(root, run_dir, run_id, plan, step) evidence.append(item) if step["kind"] == "preflight" and _blocks_downstream(item): preflight_blocks[extension_id] = item return evidence def _blocked_by_preflight_evidence( run_id: str, plan: dict[str, Any], step: dict[str, Any], preflight: dict[str, Any], ) -> dict[str, Any]: now = _now() runner_ref = step.get("runner_ref") return { "id": f"evidence:{step['id']}", "run_id": run_id, "extension_id": step["extension_id"], "check_id": step["id"], "subject_ref": plan["target_profile_snapshot"]["id"], "result": "blocked", "observations": [ "Check group was not executed because extension preflight did not pass." ], "facts": { "step_kind": step["kind"], "runner_ref": runner_ref, "blocked_reason": "preflight_failed", "preflight_evidence_ref": preflight["id"], "preflight_result": preflight["result"], }, "requirement_refs": _requirement_refs(plan, step), "artifact_refs": [], "started_at": now, "completed_at": now, } def _blocks_downstream(evidence: dict[str, Any]) -> bool: return evidence["result"] in {"fail", "blocked", "infrastructure_error"} def _evidence_for_step( root: Path, run_dir: Path, run_id: str, plan: dict[str, Any], step: dict[str, Any], ) -> dict[str, Any]: now = _now() runner_ref = step.get("runner_ref") runner_result = run_step(root, run_dir, run_id, plan, step) return { "id": f"evidence:{step['id']}", "run_id": run_id, "extension_id": step["extension_id"], "check_id": step["id"], "subject_ref": plan["target_profile_snapshot"]["id"], "result": runner_result["result"], "observations": runner_result["observations"], "facts": { "step_kind": step["kind"], "runner_ref": runner_ref, **runner_result["facts"], }, "requirement_refs": _requirement_refs(plan, step), "artifact_refs": runner_result["artifact_refs"], "started_at": now, "completed_at": now, } def _requirement_refs(plan: dict[str, Any], step: dict[str, Any]) -> list[str]: if step["kind"] != "check_group": return [] return list(step.get("requirement_refs", [])) def _findings_for_evidence(run_id: str, evidence: list[dict[str, Any]]) -> list[dict[str, Any]]: findings: list[dict[str, Any]] = [] for item in evidence: if item["result"] not in {"blocked", "fail", "infrastructure_error"}: continue findings.append( { "id": f"finding:{item['check_id']}", "run_id": run_id, "check_id": item["check_id"], "status": item["result"], "severity": _severity_for_item(item), "classification": _classification_for_item(item), "requirement_refs": item["requirement_refs"], "evidence_refs": [item["id"]], "expected": _expected_for_item(item), "waiver_ref": None, "policy_ref": None, "remediation": _remediation_for_item(item), } ) return findings def _classification_for_item(item: dict[str, Any]) -> str: result = item["result"] if result == "blocked": blocked_reason = item.get("facts", {}).get("blocked_reason") if isinstance(blocked_reason, str): return blocked_reason return "runner_not_implemented" if result == "fail": return "check_failed" return "infrastructure_error" def _severity_for_item(item: dict[str, Any]) -> str: if item["result"] == "blocked": return "info" return "medium" def _expected_for_item(item: dict[str, Any]) -> bool: if item["result"] != "blocked": return False blocked_reason = item.get("facts", {}).get("blocked_reason") return blocked_reason in { "missing_command", "missing_dependency", "preflight_failed", "tck_invocation_not_configured", } def _remediation_for_item(item: dict[str, Any]) -> str: result = item["result"] if result == "blocked": blocked_reason = item.get("facts", {}).get("blocked_reason") if blocked_reason == "missing_dependency": return "Install the missing runner dependencies and rerun the assessment." if blocked_reason == "preflight_failed": return "Fix the preflight failure and rerun downstream checks." if blocked_reason == "tck_invocation_not_configured": return "Configure the final harness invocation, group mapping, and raw artifact capture." return "Implement or configure the declared extension runner." if result == "infrastructure_error": return "Fix the target, network, credentials, or harness runtime and rerun the assessment." return "Review the failed check and target implementation." def _assessment_package( run_id: str, plan: dict[str, Any], evidence: list[dict[str, Any]], findings: list[dict[str, Any]], artifact_manifest: list[dict[str, Any]], mapping_summary: dict[str, Any], policy_summary: dict[str, Any], applied_waivers: list[dict[str, Any]], created_at: str, ) -> dict[str, Any]: summary = dict(Counter(item["result"] for item in evidence)) return { "id": f"assessment-package:{run_id}", "run_id": run_id, "target": plan["target_profile_snapshot"], "frameworks": [ {"id": framework_id} for framework_id in plan["source_lock"]["framework_refs"] ], "extensions": plan["extension_snapshots"], "source_lock": plan["source_lock"], "summary": summary, "mapping_summary": mapping_summary, "policy_summary": policy_summary, "findings": findings, "evidence_refs": [item["id"] for item in evidence], "artifact_manifest": artifact_manifest, "waivers": applied_waivers, "certification_boundary": "Guide Board produces preparation evidence only and does not issue certifications or audit assurance.", "created_at": created_at, } def _write_run_directory( run_dir: Path, run_metadata: dict[str, Any], plan: dict[str, Any], evidence: list[dict[str, Any]], findings: list[dict[str, Any]], mapping_records: list[dict[str, Any]], assessment_package: dict[str, Any], retention_summary: dict[str, Any], ) -> None: write_json(run_dir / "run.json", run_metadata) write_json(run_dir / "retention-summary.json", retention_summary) write_json(run_dir / "plan.json", plan) write_json(run_dir / "sources.lock.json", plan["source_lock"]) write_json(run_dir / "target-profile.snapshot.json", plan["target_profile_snapshot"]) write_json( run_dir / "assessment-profile.snapshot.json", plan["assessment_profile_snapshot"], ) write_json(run_dir / "normalized" / "evidence.json", {"evidence": evidence}) write_json(run_dir / "normalized" / "findings.json", {"findings": findings}) write_json(run_dir / "normalized" / "mappings.json", {"mappings": mapping_records}) write_json(run_dir / "reports" / "assessment-package.json", assessment_package) (run_dir / "reports").mkdir(parents=True, exist_ok=True) (run_dir / "reports" / "report.md").write_text( _markdown_report(run_metadata, assessment_package), encoding="utf-8", ) def _markdown_report(run_metadata: dict[str, Any], package: dict[str, Any]) -> str: summary_lines = "\n".join( f"- {status}: {count}" for status, count in sorted(package["summary"].items()) ) if not summary_lines: summary_lines = "- no evidence produced" mapping_lines = _mapping_summary_lines(package) policy_lines = _policy_summary_lines(package) return "\n".join( [ f"# Guide Board Assessment Report: {run_metadata['id']}", "", f"Status: {run_metadata['status']}", f"Target: {run_metadata['target_profile_ref']}", f"Assessment: {run_metadata['assessment_profile_ref']}", "", "## Summary", "", summary_lines, "", "## Mappings", "", mapping_lines, "", "## Policy", "", policy_lines, "", "## Boundary", "", package["certification_boundary"], "", ] ) def _mapping_summary_lines(package: dict[str, Any]) -> str: targets = package.get("mapping_summary", {}).get("targets", []) if not targets: return "- no mapped evidence" lines = [] for target in targets: results = ", ".join( f"{status}: {count}" for status, count in sorted(target.get("results", {}).items()) ) lines.append(f"- {target['label']} ({target['target_id']}): {results}") return "\n".join(lines) def _policy_summary_lines(package: dict[str, Any]) -> str: summary = package.get("policy_summary", {}) return "\n".join( [ f"- applied expectations: {summary.get('applied_expectations', 0)}", f"- applied waivers: {summary.get('applied_waivers', 0)}", f"- unexpected findings: {summary.get('unexpected_findings', 0)}", ] ) def _run_status(evidence: list[dict[str, Any]]) -> str: if any(item["result"] == "fail" for item in evidence): return "failed" if any(item["result"] == "infrastructure_error" for item in evidence): return "infrastructure_error" if any(item["result"] == "blocked" for item in evidence): return "blocked" return "completed" def _now() -> str: return datetime.now(timezone.utc).isoformat() def _timestamp() -> str: return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")