Complete extension SDK maturity

This commit is contained in:
2026-05-15 15:34:55 +02:00
parent 67f2fc5346
commit 6758b3992c
19 changed files with 680 additions and 14 deletions

View File

@@ -10,6 +10,7 @@ from typing import Any
from guide_board.artifacts import build_artifact_manifest
from guide_board.io import write_json
from guide_board.mapping import build_mapping_records, summarize_mappings
from guide_board.normalizers import normalize_step_result
from guide_board.planning import build_run_plan
from guide_board.policy import apply_policy
from guide_board.retention import build_retention_summary
@@ -153,6 +154,7 @@ def _evidence_for_step(
now = _now()
runner_ref = step.get("runner_ref")
runner_result = run_step(root, run_dir, run_id, plan, step)
runner_result = normalize_step_result(root, run_dir, run_id, plan, step, runner_result)
return {
"id": f"evidence:{step['id']}",
@@ -167,17 +169,44 @@ def _evidence_for_step(
"runner_ref": runner_ref,
**runner_result["facts"],
},
"requirement_refs": _requirement_refs(plan, step),
"requirement_refs": _requirement_refs(plan, step, runner_result),
"artifact_refs": runner_result["artifact_refs"],
"started_at": now,
"completed_at": now,
}
def _requirement_refs(plan: dict[str, Any], step: dict[str, Any]) -> list[str]:
def _requirement_refs(
plan: dict[str, Any],
step: dict[str, Any],
runner_result: dict[str, Any] | None = None,
) -> list[str]:
refs = []
if step["kind"] != "check_group":
return _runner_requirement_refs(runner_result)
refs.extend(step.get("requirement_refs", []))
refs.extend(_runner_requirement_refs(runner_result))
return _dedupe(refs)
def _runner_requirement_refs(runner_result: dict[str, Any] | None) -> list[str]:
if not runner_result:
return []
return list(step.get("requirement_refs", []))
refs = runner_result.get("requirement_refs", [])
if not isinstance(refs, list):
return []
return [ref for ref in refs if isinstance(ref, str)]
def _dedupe(values: list[str]) -> list[str]:
seen = set()
deduped = []
for value in values:
if value in seen:
continue
seen.add(value)
deduped.append(value)
return deduped
def _findings_for_evidence(run_id: str, evidence: list[dict[str, Any]]) -> list[dict[str, Any]]:

View File

@@ -0,0 +1,224 @@
"""Normalizer plug-in bridge for extension-provided runner output."""
from __future__ import annotations
import importlib.util
from pathlib import Path
from types import ModuleType
from typing import Any
from guide_board.errors import ValidationError
from guide_board.io import load_json
def normalize_step_result(
root: Path,
run_dir: Path,
run_id: str,
plan: dict[str, Any],
step: dict[str, Any],
runner_result: dict[str, Any],
) -> dict[str, Any]:
"""Apply matching extension normalizers to a runner result."""
extension = _extension_snapshot(plan, step["extension_id"])
extension_path = _snapshot_path(root, extension)
manifest = load_json(extension_path / "extension.json")
result = _coerce_result(runner_result)
applied: list[str] = []
for normalizer in _matching_normalizers(manifest, step):
normalized = _run_normalizer(
root,
run_dir,
run_id,
plan,
step,
extension_path,
normalizer,
result,
)
if _is_normalizer_error(normalized):
return normalized
result = _merge_result(result, normalized)
applied.append(normalizer["id"])
if applied:
facts = dict(result.get("facts", {}))
facts["normalizer_refs"] = applied
result["facts"] = facts
return result
def _matching_normalizers(
manifest: dict[str, Any],
step: dict[str, Any],
) -> list[dict[str, Any]]:
matching = []
runner_ref = step.get("runner_ref")
for normalizer in manifest.get("normalizers", []):
if not isinstance(normalizer, dict):
continue
normalizer_runner_ref = normalizer.get("runner_ref")
if normalizer_runner_ref and normalizer_runner_ref != runner_ref:
continue
matching.append(normalizer)
return matching
def _run_normalizer(
root: Path,
run_dir: Path,
run_id: str,
plan: dict[str, Any],
step: dict[str, Any],
extension_path: Path,
normalizer: dict[str, Any],
runner_result: dict[str, Any],
) -> dict[str, Any]:
if normalizer["kind"] != "python_module":
raise ValidationError(
f"{normalizer['id']}: unsupported normalizer kind {normalizer['kind']!r}"
)
module_path = normalizer.get("module_path")
callable_name = normalizer.get("callable")
if not module_path or not callable_name:
raise ValidationError(
f"{normalizer['id']}: python_module normalizers need module_path and callable"
)
module_file = (extension_path / module_path).resolve()
try:
module_file.relative_to(extension_path.resolve())
except ValueError as exc:
raise ValidationError(
f"{normalizer['id']}: module_path must stay inside the extension directory"
) from exc
module = _load_module(module_file, normalizer["id"])
normalizer_callable = getattr(module, callable_name, None)
if not callable(normalizer_callable):
raise ValidationError(f"{normalizer['id']}: callable {callable_name!r} was not found")
context = {
"root": str(root),
"run_dir": str(run_dir),
"run_id": run_id,
"plan": plan,
"step": step,
"target_profile": plan["target_profile_snapshot"],
"assessment_profile": plan["assessment_profile_snapshot"],
"extension_path": str(extension_path),
"normalizer": normalizer,
"runner_result": runner_result,
}
try:
result = normalizer_callable(context)
except Exception as exc: # noqa: BLE001 - extension failures become evidence.
return {
"result": "infrastructure_error",
"observations": [
f"Normalizer {normalizer['id']!r} failed before producing evidence: {exc}"
],
"facts": {
"normalizer_ref": normalizer["id"],
"normalizer_kind": normalizer["kind"],
"error_type": type(exc).__name__,
},
"artifact_refs": runner_result.get("artifact_refs", []),
"requirement_refs": runner_result.get("requirement_refs", []),
}
if not isinstance(result, dict):
raise ValidationError(f"{normalizer['id']}: normalizer must return an object")
return result
def _merge_result(
base: dict[str, Any],
update: dict[str, Any],
) -> dict[str, Any]:
merged = dict(base)
if "result" in update:
merged["result"] = update["result"]
if "observations" in update:
merged["observations"] = _string_list(base.get("observations", []))
merged["observations"].extend(_string_list(update.get("observations", [])))
if "facts" in update:
facts = dict(base.get("facts", {}))
update_facts = update.get("facts", {})
if isinstance(update_facts, dict):
facts.update(update_facts)
merged["facts"] = facts
if "artifact_refs" in update:
merged["artifact_refs"] = _dedupe(
_string_list(base.get("artifact_refs", []))
+ _string_list(update.get("artifact_refs", []))
)
if "requirement_refs" in update:
merged["requirement_refs"] = _dedupe(
_string_list(base.get("requirement_refs", []))
+ _string_list(update.get("requirement_refs", []))
)
return _coerce_result(merged)
def _coerce_result(value: dict[str, Any]) -> dict[str, Any]:
facts = value.get("facts", {})
if not isinstance(facts, dict):
facts = {}
return {
"result": value.get("result", "unknown"),
"observations": _string_list(value.get("observations", [])),
"facts": facts,
"artifact_refs": _string_list(value.get("artifact_refs", [])),
"requirement_refs": _string_list(value.get("requirement_refs", [])),
}
def _is_normalizer_error(result: dict[str, Any]) -> bool:
return (
result.get("result") == "infrastructure_error"
and "normalizer_ref" in result.get("facts", {})
)
def _string_list(value: Any) -> list[str]:
if not isinstance(value, list):
return []
return [item for item in value if isinstance(item, str)]
def _dedupe(values: list[str]) -> list[str]:
seen = set()
deduped = []
for value in values:
if value in seen:
continue
seen.add(value)
deduped.append(value)
return deduped
def _load_module(path: Path, normalizer_id: str) -> ModuleType:
if not path.exists():
raise ValidationError(f"{normalizer_id}: module not found: {path}")
module_name = f"_guide_board_normalizer_{normalizer_id.replace('-', '_')}"
spec = importlib.util.spec_from_file_location(module_name, path)
if spec is None or spec.loader is None:
raise ValidationError(f"{normalizer_id}: unable to load module from {path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def _extension_snapshot(plan: dict[str, Any], extension_id: str) -> dict[str, Any]:
for extension in plan["extension_snapshots"]:
if extension["id"] == extension_id:
return extension
raise ValidationError(f"step references unknown extension {extension_id!r}")
def _snapshot_path(root: Path, extension: dict[str, Any]) -> Path:
path = Path(extension["path"])
return path if path.is_absolute() else root / path