Files
guide-board/src/guide_board/planning.py

131 lines
4.6 KiB
Python

"""Assessment planning."""
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from guide_board.discovery import discover_extensions
from guide_board.errors import ValidationError
from guide_board.io import load_json
from guide_board.schema import assert_valid
def validate_target_profile(path: Path) -> dict[str, Any]:
document = load_json(path)
assert_valid(document, "target-profile")
return document
def validate_assessment_profile(path: Path) -> dict[str, Any]:
document = load_json(path)
assert_valid(document, "assessment-profile")
return document
def build_run_plan(
root: Path,
target_path: Path,
assessment_path: Path,
extension_dirs: list[Path] | None = None,
) -> dict[str, Any]:
target = validate_target_profile(target_path)
assessment = validate_assessment_profile(assessment_path)
extensions = {
extension.id: extension
for extension in discover_extensions(root, extension_dirs)
}
selected_extensions = assessment["extension_refs"]
missing = [extension_id for extension_id in selected_extensions if extension_id not in extensions]
if missing:
raise ValidationError(f"assessment references unknown extension(s): {', '.join(missing)}")
if assessment["target_profile_ref"] != target["id"]:
raise ValidationError(
"assessment target_profile_ref "
f"{assessment['target_profile_ref']!r} does not match target profile {target['id']!r}"
)
ordered_steps: list[dict[str, Any]] = []
for extension_id in selected_extensions:
extension = extensions[extension_id]
selected_groups = assessment["selected_check_groups"].get(extension_id, [])
available_groups = {group["id"]: group for group in extension.manifest["check_groups"]}
unknown_groups = [group_id for group_id in selected_groups if group_id not in available_groups]
if unknown_groups:
raise ValidationError(
f"{extension_id}: unknown check group(s): {', '.join(unknown_groups)}"
)
ordered_steps.append(
{
"id": f"preflight:{extension_id}",
"extension_id": extension_id,
"kind": "preflight",
"check_groups": selected_groups,
"runner_ref": extension.manifest.get("preflight_runner"),
}
)
for group_id in selected_groups:
group = available_groups[group_id]
ordered_steps.append(
{
"id": f"check-group:{extension_id}:{group_id}",
"extension_id": extension_id,
"kind": "check_group",
"check_group": group_id,
"runner_ref": group.get("runner_ref"),
"requirement_refs": group.get("requirement_refs", []),
}
)
plan = {
"id": f"plan-{_timestamp()}",
"assessment_profile_snapshot": assessment,
"target_profile_snapshot": target,
"extension_snapshots": [
{
"id": extension_id,
"version": extensions[extension_id].manifest["version"],
"path": _extension_path_ref(root, extensions[extension_id].path),
"source": extensions[extension_id].source,
}
for extension_id in selected_extensions
],
"source_lock": {
"framework_refs": assessment["framework_refs"],
"extension_refs": selected_extensions,
},
"profile_paths": {
"target_profile_path": str(target_path.resolve()),
"assessment_profile_path": str(assessment_path.resolve()),
"assessment_profile_dir": str(assessment_path.resolve().parent),
},
"ordered_steps": ordered_steps,
"credential_refs": _credential_refs(target),
"artifact_policy": assessment["output_policy"],
"runtime_policy": assessment.get("runtime_policy", {}),
}
assert_valid(plan, "run-plan")
return plan
def _credential_refs(target: dict[str, Any]) -> list[str]:
credential_ref = target.get("credentials_ref")
if isinstance(credential_ref, str) and credential_ref:
return [credential_ref]
return []
def _extension_path_ref(root: Path, path: Path) -> str:
try:
return str(path.resolve().relative_to(root.resolve()))
except ValueError:
return str(path.resolve())
def _timestamp() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")