generated from coulomb/repo-seed
Add extension profile schema validation
This commit is contained in:
@@ -155,12 +155,18 @@ def cmd_extensions_validate(args: argparse.Namespace) -> dict[str, Any]:
|
||||
|
||||
|
||||
def cmd_validate_target(args: argparse.Namespace) -> dict[str, Any]:
|
||||
profile = validate_target_profile(args.path)
|
||||
profile = validate_target_profile(
|
||||
args.path,
|
||||
discover_extensions(args.root, args.extension_dir),
|
||||
)
|
||||
return {"status": "valid", "target_profile": profile["id"]}
|
||||
|
||||
|
||||
def cmd_validate_assessment(args: argparse.Namespace) -> dict[str, Any]:
|
||||
profile = validate_assessment_profile(args.path)
|
||||
profile = validate_assessment_profile(
|
||||
args.path,
|
||||
discover_extensions(args.root, args.extension_dir),
|
||||
)
|
||||
return {"status": "valid", "assessment_profile": profile["id"]}
|
||||
|
||||
|
||||
|
||||
@@ -6,21 +6,32 @@ from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from guide_board.discovery import discover_extensions
|
||||
from guide_board.discovery import Extension, discover_extensions
|
||||
from guide_board.errors import ValidationError
|
||||
from guide_board.io import load_json
|
||||
from guide_board.schema import assert_valid
|
||||
from guide_board.schema import assert_valid, validate_document
|
||||
|
||||
|
||||
def validate_target_profile(path: Path) -> dict[str, Any]:
|
||||
def validate_target_profile(
|
||||
path: Path,
|
||||
extensions: list[Extension] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
document = load_json(path)
|
||||
assert_valid(document, "target-profile")
|
||||
if extensions:
|
||||
_validate_extension_profile_schemas(document, "target", extensions)
|
||||
return document
|
||||
|
||||
|
||||
def validate_assessment_profile(path: Path) -> dict[str, Any]:
|
||||
def validate_assessment_profile(
|
||||
path: Path,
|
||||
extensions: list[Extension] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
document = load_json(path)
|
||||
assert_valid(document, "assessment-profile")
|
||||
if extensions:
|
||||
selected_extensions = _selected_assessment_extensions(document, extensions)
|
||||
_validate_extension_profile_schemas(document, "assessment", selected_extensions)
|
||||
return document
|
||||
|
||||
|
||||
@@ -41,6 +52,10 @@ def build_run_plan(
|
||||
missing = [extension_id for extension_id in selected_extensions if extension_id not in extensions]
|
||||
if missing:
|
||||
raise ValidationError(f"assessment references unknown extension(s): {', '.join(missing)}")
|
||||
selected_extension_records = [extensions[extension_id] for extension_id in selected_extensions]
|
||||
|
||||
_validate_extension_profile_schemas(target, "target", selected_extension_records)
|
||||
_validate_extension_profile_schemas(assessment, "assessment", selected_extension_records)
|
||||
|
||||
if assessment["target_profile_ref"] != target["id"]:
|
||||
raise ValidationError(
|
||||
@@ -119,6 +134,80 @@ def _credential_refs(target: dict[str, Any]) -> list[str]:
|
||||
return []
|
||||
|
||||
|
||||
def _selected_assessment_extensions(
|
||||
assessment: dict[str, Any],
|
||||
extensions: list[Extension],
|
||||
) -> list[Extension]:
|
||||
by_id = {extension.id: extension for extension in extensions}
|
||||
selected_ids = assessment.get("extension_refs", [])
|
||||
selected_extensions = []
|
||||
missing = []
|
||||
for extension_id in selected_ids:
|
||||
if extension_id in by_id:
|
||||
selected_extensions.append(by_id[extension_id])
|
||||
else:
|
||||
missing.append(extension_id)
|
||||
if missing:
|
||||
raise ValidationError(f"assessment references unknown extension(s): {', '.join(missing)}")
|
||||
return selected_extensions
|
||||
|
||||
|
||||
def _validate_extension_profile_schemas(
|
||||
profile: dict[str, Any],
|
||||
profile_kind: str,
|
||||
extensions: list[Extension],
|
||||
) -> None:
|
||||
for extension in extensions:
|
||||
for descriptor in _profile_schema_descriptors(extension, profile_kind, profile):
|
||||
schema = _load_extension_profile_schema(extension, descriptor)
|
||||
errors = validate_document(profile, schema)
|
||||
if errors:
|
||||
formatted = "\n".join(f"- {error}" for error in errors)
|
||||
raise ValidationError(
|
||||
f"{extension.id}:{descriptor['id']} profile schema validation failed:\n"
|
||||
f"{formatted}"
|
||||
)
|
||||
|
||||
|
||||
def _profile_schema_descriptors(
|
||||
extension: Extension,
|
||||
profile_kind: str,
|
||||
profile: dict[str, Any],
|
||||
) -> list[dict[str, Any]]:
|
||||
descriptors = []
|
||||
for raw_descriptor in extension.manifest.get("profile_schemas", []):
|
||||
if not isinstance(raw_descriptor, dict):
|
||||
continue
|
||||
if raw_descriptor.get("profile_kind") != profile_kind:
|
||||
continue
|
||||
subject_type = raw_descriptor.get("subject_type")
|
||||
if profile_kind == "target" and subject_type and subject_type != profile.get("subject_type"):
|
||||
continue
|
||||
descriptors.append(raw_descriptor)
|
||||
return descriptors
|
||||
|
||||
|
||||
def _load_extension_profile_schema(
|
||||
extension: Extension,
|
||||
descriptor: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
raw_path = descriptor["path"]
|
||||
schema_path = (extension.path / raw_path).resolve()
|
||||
extension_root = extension.path.resolve()
|
||||
try:
|
||||
schema_path.relative_to(extension_root)
|
||||
except ValueError as exc:
|
||||
raise ValidationError(
|
||||
f"{extension.id}:{descriptor['id']} profile schema path escapes extension root: "
|
||||
f"{raw_path!r}"
|
||||
) from exc
|
||||
if not schema_path.is_file():
|
||||
raise ValidationError(
|
||||
f"{extension.id}:{descriptor['id']} profile schema not found: {raw_path!r}"
|
||||
)
|
||||
return load_json(schema_path)
|
||||
|
||||
|
||||
def _extension_path_ref(root: Path, path: Path) -> str:
|
||||
try:
|
||||
return str(path.resolve().relative_to(root.resolve()))
|
||||
|
||||
Reference in New Issue
Block a user