#!/usr/bin/env python3 """ Validate NetKingdom Playbook Capability Contract v0.1 declarations. """ from __future__ import annotations import argparse import json import re import sys from dataclasses import dataclass from pathlib import Path from typing import Any import yaml API_VERSION = "netkingdom.io/playbook-capability/v0.1" KIND = "PlaybookCapabilityDeclaration" CONTRACT_VERSION = "0.1" CAPABILITIES = { "s1.os-baseline": "S1", "s1.secret-bootstrap": "S1", "s2.cluster-runtime": "S2", "s3.platform-services": "S3", "c0.bootstrap-identity": "C0", "c1.lightweight-sso": "C1", "c2a.light-2fa": "C2a", "c2b.token-authority": "C2b", "c3.runtime-secrets": "C3", "c4.fine-grained-authorization": "C4", "c5.enterprise-federation": "C5", "c6.self-optimizing-audit": "C6", } RESOURCE_KINDS = { "identities", "roles_scopes_policies", "secrets_credentials", "infrastructure_resources", } PARAM_TYPES = {"string", "integer", "number", "boolean", "array", "object"} SENSITIVITIES = {"public", "operational", "security_sensitive", "secret_reference"} TUNING_AUTHORITIES = { "playbook_default", "netkingdom_tunable", "platform_only", "tenant_tunable", "forbidden", } TRUST_STATES = { "bare_host_trust", "cluster_trust", "bootstrap_secret_trust", "bootstrap_identity_trust", "runtime_secret_trust", "runtime_identity_trust", "runtime_authorization_trust", "tenant_onboarding_trust", } SCENARIO_AUTHORITIES = {"platform", "netkingdom", "tenant"} @dataclass class Issue: level: str path: str message: str @dataclass class Declaration: path: Path data: dict[str, Any] @property def id(self) -> str: return str(self.data.get("metadata", {}).get("id", "")) @property def capabilities(self) -> set[str]: values = self.data.get("spec", {}).get("capabilities", []) if not isinstance(values, list): return set() return {str(item.get("id")) for item in values if isinstance(item, dict)} @property def parameters(self) -> dict[str, dict[str, Any]]: values = self.data.get("spec", {}).get("parameters", []) if not isinstance(values, list): return {} return { str(item.get("name")): item for item in values if isinstance(item, dict) and item.get("name") } def issue(level: str, path: str, message: str) -> Issue: return Issue(level, path, message) def load_yaml(path: Path) -> dict[str, Any]: with path.open("r", encoding="utf-8") as handle: data = yaml.safe_load(handle) if not isinstance(data, dict): raise ValueError("document must be a YAML object") return data def is_type(value: Any, declared_type: str) -> bool: if value is None: return True if declared_type == "string": return isinstance(value, str) if declared_type == "integer": return isinstance(value, int) and not isinstance(value, bool) if declared_type == "number": return (isinstance(value, int) or isinstance(value, float)) and not isinstance(value, bool) if declared_type == "boolean": return isinstance(value, bool) if declared_type == "array": return isinstance(value, list) if declared_type == "object": return isinstance(value, dict) return False def validate_constraints(value: Any, param: dict[str, Any], path: str) -> list[Issue]: issues: list[Issue] = [] constraints = param.get("constraints", {}) if constraints is None: return issues if not isinstance(constraints, dict): return [issue("ERROR", f"{path}.constraints", "constraints must be an object")] if "enum" in constraints: enum = constraints["enum"] if not isinstance(enum, list) or not enum: issues.append(issue("ERROR", f"{path}.constraints.enum", "enum must be a non-empty list")) elif value is not None and value not in enum: issues.append(issue("ERROR", path, f"value {value!r} is outside enum {enum!r}")) if value is None: return issues for key, comparator in (("minimum", lambda got, want: got < want), ("maximum", lambda got, want: got > want)): if key in constraints: if not isinstance(constraints[key], (int, float)): issues.append(issue("ERROR", f"{path}.constraints.{key}", f"{key} must be numeric")) elif isinstance(value, (int, float)) and not isinstance(value, bool) and comparator(value, constraints[key]): issues.append(issue("ERROR", path, f"value {value!r} violates {key}={constraints[key]!r}")) for key, comparator in (("min_items", lambda got, want: len(got) < want), ("max_items", lambda got, want: len(got) > want)): if key in constraints: if not isinstance(constraints[key], int): issues.append(issue("ERROR", f"{path}.constraints.{key}", f"{key} must be an integer")) elif isinstance(value, list) and comparator(value, constraints[key]): issues.append(issue("ERROR", path, f"value length violates {key}={constraints[key]!r}")) if "pattern" in constraints: pattern = constraints["pattern"] if not isinstance(pattern, str): issues.append(issue("ERROR", f"{path}.constraints.pattern", "pattern must be a string")) elif isinstance(value, str) and re.search(pattern, value) is None: issues.append(issue("ERROR", path, f"value {value!r} does not match pattern {pattern!r}")) return issues def validate_required_object(parent: dict[str, Any], path: str, required: list[str]) -> list[Issue]: issues: list[Issue] = [] for key in required: if key not in parent: issues.append(issue("ERROR", f"{path}.{key}", "required field missing")) return issues def validate_metadata(data: dict[str, Any]) -> list[Issue]: metadata = data.get("metadata") if not isinstance(metadata, dict): return [issue("ERROR", "metadata", "metadata must be an object")] issues = validate_required_object( metadata, "metadata", ["id", "name", "owner", "repo", "domain", "contract_version"], ) for key in ("id", "name", "owner", "repo", "domain"): if key in metadata and not isinstance(metadata[key], str): issues.append(issue("ERROR", f"metadata.{key}", "must be a string")) if metadata.get("contract_version") != CONTRACT_VERSION: issues.append(issue("ERROR", "metadata.contract_version", f"must be {CONTRACT_VERSION!r}")) return issues def validate_playbook(spec: dict[str, Any]) -> list[Issue]: playbook = spec.get("playbook") if not isinstance(playbook, dict): return [issue("ERROR", "spec.playbook", "playbook must be an object")] issues = validate_required_object(playbook, "spec.playbook", ["path", "type", "invocation", "description"]) for key in ("path", "type", "invocation", "description"): if key in playbook and not isinstance(playbook[key], str): issues.append(issue("ERROR", f"spec.playbook.{key}", "must be a string")) return issues def validate_capabilities(spec: dict[str, Any]) -> list[Issue]: values = spec.get("capabilities") if not isinstance(values, list) or not values: return [issue("ERROR", "spec.capabilities", "must be a non-empty list")] issues: list[Issue] = [] seen: set[str] = set() for idx, item in enumerate(values): path = f"spec.capabilities[{idx}]" if not isinstance(item, dict): issues.append(issue("ERROR", path, "capability must be an object")) continue issues.extend(validate_required_object(item, path, ["id", "tier", "resource_kinds", "description"])) cap_id = item.get("id") tier = item.get("tier") if cap_id in seen: issues.append(issue("ERROR", f"{path}.id", f"duplicate capability id {cap_id!r}")) seen.add(str(cap_id)) expected_tier = CAPABILITIES.get(cap_id) if expected_tier is None: issues.append(issue("ERROR", f"{path}.id", f"unknown capability id {cap_id!r}")) elif tier != expected_tier: issues.append(issue("ERROR", f"{path}.tier", f"tier must be {expected_tier!r} for {cap_id!r}")) resource_kinds = item.get("resource_kinds") if not isinstance(resource_kinds, list) or not resource_kinds: issues.append(issue("ERROR", f"{path}.resource_kinds", "must be a non-empty list")) else: unknown = sorted(set(str(kind) for kind in resource_kinds) - RESOURCE_KINDS) if unknown: issues.append(issue("ERROR", f"{path}.resource_kinds", f"unknown resource kinds: {unknown}")) return issues def validate_parameters(spec: dict[str, Any]) -> list[Issue]: values = spec.get("parameters") if not isinstance(values, list): return [issue("ERROR", "spec.parameters", "must be a list")] issues: list[Issue] = [] seen: set[str] = set() for idx, item in enumerate(values): path = f"spec.parameters[{idx}]" if not isinstance(item, dict): issues.append(issue("ERROR", path, "parameter must be an object")) continue issues.extend( validate_required_object( item, path, ["name", "type", "required", "sensitivity", "tuning_authority", "description"], ) ) name = item.get("name") declared_type = item.get("type") if name in seen: issues.append(issue("ERROR", f"{path}.name", f"duplicate parameter {name!r}")) seen.add(str(name)) if declared_type not in PARAM_TYPES: issues.append(issue("ERROR", f"{path}.type", f"unknown parameter type {declared_type!r}")) if "required" in item and not isinstance(item["required"], bool): issues.append(issue("ERROR", f"{path}.required", "required must be boolean")) if item.get("sensitivity") not in SENSITIVITIES: issues.append(issue("ERROR", f"{path}.sensitivity", f"unknown sensitivity {item.get('sensitivity')!r}")) if item.get("tuning_authority") not in TUNING_AUTHORITIES: issues.append(issue("ERROR", f"{path}.tuning_authority", f"unknown tuning authority {item.get('tuning_authority')!r}")) if item.get("sensitivity") in {"security_sensitive", "secret_reference"} and item.get("tuning_authority") == "tenant_tunable": issues.append(issue("ERROR", path, "security-sensitive parameters cannot be tenant_tunable")) if "default" in item and declared_type in PARAM_TYPES and not is_type(item["default"], declared_type): issues.append(issue("ERROR", f"{path}.default", f"default does not match type {declared_type!r}")) if declared_type in PARAM_TYPES: issues.extend(validate_constraints(item.get("default"), item, path)) return issues def validate_responsibilities(spec: dict[str, Any]) -> list[Issue]: values = spec.get("responsibilities") if not isinstance(values, list) or not values: return [issue("ERROR", "spec.responsibilities", "must be a non-empty list")] issues: list[Issue] = [] for idx, item in enumerate(values): path = f"spec.responsibilities[{idx}]" if not isinstance(item, dict): issues.append(issue("ERROR", path, "responsibility must be an object")) continue issues.extend( validate_required_object( item, path, ["resource_kind", "owner", "resources", "repo_owns", "netkingdom_orchestrates"], ) ) if item.get("resource_kind") not in RESOURCE_KINDS: issues.append(issue("ERROR", f"{path}.resource_kind", f"unknown resource kind {item.get('resource_kind')!r}")) resources = item.get("resources") if not isinstance(resources, list) or not resources: issues.append(issue("ERROR", f"{path}.resources", "resources must be a non-empty list")) for key in ("owner", "repo_owns", "netkingdom_orchestrates"): if key in item and not isinstance(item[key], str): issues.append(issue("ERROR", f"{path}.{key}", "must be a string")) return issues def validate_trust_states(spec: dict[str, Any]) -> list[Issue]: trust = spec.get("trust") if not isinstance(trust, dict): return [issue("ERROR", "spec.trust", "trust must be an object")] issues: list[Issue] = [] for section in ("requires", "satisfies"): values = trust.get(section, []) path = f"spec.trust.{section}" if not isinstance(values, list): issues.append(issue("ERROR", path, "must be a list")) continue for idx, item in enumerate(values): item_path = f"{path}[{idx}]" if not isinstance(item, dict): issues.append(issue("ERROR", item_path, "trust state entry must be an object")) continue if item.get("state") not in TRUST_STATES: issues.append(issue("ERROR", f"{item_path}.state", f"unknown trust state {item.get('state')!r}")) checks = item.get("readiness_checks", []) if not isinstance(checks, list): issues.append(issue("ERROR", f"{item_path}.readiness_checks", "must be a list")) continue if section == "satisfies" and not checks: issues.append(issue("ERROR", f"{item_path}.readiness_checks", "satisfied trust states require readiness checks")) for cidx, check in enumerate(checks): check_path = f"{item_path}.readiness_checks[{cidx}]" if not isinstance(check, dict): issues.append(issue("ERROR", check_path, "readiness check must be an object")) continue issues.extend(validate_required_object(check, check_path, ["id", "description", "evidence"])) return issues def validate_catalog(spec: dict[str, Any]) -> list[Issue]: catalog = spec.get("catalog") if not isinstance(catalog, dict): return [issue("ERROR", "spec.catalog", "catalog must be an object")] issues = validate_required_object(catalog, "spec.catalog", ["publish", "maturity", "consumers"]) if "consumers" in catalog and not isinstance(catalog["consumers"], list): issues.append(issue("ERROR", "spec.catalog.consumers", "consumers must be a list")) return issues def validate_declaration(declaration: Declaration) -> list[Issue]: data = declaration.data issues: list[Issue] = [] issues.extend(validate_required_object(data, "$", ["apiVersion", "kind", "metadata", "spec"])) if data.get("apiVersion") != API_VERSION: issues.append(issue("ERROR", "apiVersion", f"must be {API_VERSION!r}")) if data.get("kind") != KIND: issues.append(issue("ERROR", "kind", f"must be {KIND!r}")) issues.extend(validate_metadata(data)) spec = data.get("spec") if not isinstance(spec, dict): return issues + [issue("ERROR", "spec", "spec must be an object")] issues.extend(validate_required_object(spec, "spec", ["playbook", "capabilities", "parameters", "responsibilities", "trust", "catalog"])) issues.extend(validate_playbook(spec)) issues.extend(validate_capabilities(spec)) issues.extend(validate_parameters(spec)) issues.extend(validate_responsibilities(spec)) issues.extend(validate_trust_states(spec)) issues.extend(validate_catalog(spec)) return issues def effective_parameter_value(param: dict[str, Any], overrides: dict[str, Any], declaration_id: str) -> tuple[bool, Any]: name = str(param.get("name")) if name in overrides: return True, overrides[name] if "default" in param: return False, param["default"] return False, None def validate_override_allowed(param: dict[str, Any], value: Any, scenario_authority: str, path: str) -> list[Issue]: issues: list[Issue] = [] authority = param.get("tuning_authority") sensitivity = param.get("sensitivity") name = param.get("name") declared_type = str(param.get("type")) if authority in {"forbidden", "playbook_default"}: issues.append(issue("ERROR", path, f"parameter {name!r} cannot be overridden")) if scenario_authority == "tenant" and authority in {"platform_only", "forbidden", "playbook_default"}: issues.append(issue("ERROR", path, f"tenant authority cannot override {authority} parameter {name!r}")) if scenario_authority == "tenant" and sensitivity in {"security_sensitive", "secret_reference"}: issues.append(issue("ERROR", path, f"tenant authority cannot override {sensitivity} parameter {name!r}")) if not is_type(value, declared_type): issues.append(issue("ERROR", path, f"override for {name!r} does not match type {declared_type!r}")) issues.extend(validate_constraints(value, param, path)) return issues def compose_scenario(declarations: list[Declaration], scenario: dict[str, Any]) -> tuple[list[Issue], dict[str, Any]]: issues: list[Issue] = [] authority = scenario.get("authority", "platform") if authority not in SCENARIO_AUTHORITIES: issues.append(issue("ERROR", "scenario.authority", f"unknown authority {authority!r}")) authority = "platform" requires = scenario.get("requires", {}) required_caps = requires.get("capabilities", []) if isinstance(requires, dict) else [] if not isinstance(required_caps, list) or not required_caps: issues.append(issue("ERROR", "scenario.requires.capabilities", "scenario must require at least one capability")) required_caps = [] overrides = scenario.get("parameter_overrides", {}) if overrides is None: overrides = {} if not isinstance(overrides, dict): issues.append(issue("ERROR", "scenario.parameter_overrides", "must be an object")) overrides = {} selected: list[Declaration] = [] for cap_id in required_caps: matches = [declaration for declaration in declarations if cap_id in declaration.capabilities] if not matches: issues.append(issue("ERROR", "scenario.requires.capabilities", f"no declaration provides {cap_id!r}")) continue selected.append(matches[0]) # Preserve order while deduplicating declarations selected for several capabilities. selected_by_id: dict[str, Declaration] = {} for declaration in selected: selected_by_id.setdefault(declaration.id, declaration) composed = { "scenario": scenario.get("id", "scenario:unnamed"), "authority": authority, "selected_declarations": [], } for declaration_id, declaration in selected_by_id.items(): declaration_overrides = overrides.get(declaration_id, {}) if not isinstance(declaration_overrides, dict): issues.append(issue("ERROR", f"scenario.parameter_overrides.{declaration_id}", "must be an object")) declaration_overrides = {} params_out: dict[str, Any] = {} for name in declaration_overrides: if name not in declaration.parameters: issues.append(issue("ERROR", f"scenario.parameter_overrides.{declaration_id}.{name}", "unknown parameter override")) for param in declaration.parameters.values(): name = str(param.get("name")) overridden, value = effective_parameter_value(param, declaration_overrides, declaration_id) if param.get("required") is True and value is None: issues.append(issue("ERROR", f"scenario.parameter_overrides.{declaration_id}.{name}", "required parameter has no default or override")) if overridden: issues.extend(validate_override_allowed(param, value, str(authority), f"scenario.parameter_overrides.{declaration_id}.{name}")) params_out[name] = { "value": value, "source": "override" if overridden else "default", "sensitivity": param.get("sensitivity"), "tuning_authority": param.get("tuning_authority"), } composed["selected_declarations"].append( { "id": declaration_id, "path": str(declaration.path), "capabilities": sorted(declaration.capabilities), "parameters": params_out, } ) return issues, composed def print_report(issues: list[Issue], composed: dict[str, Any] | None, json_output: bool) -> None: if json_output: print( json.dumps( { "issues": [item.__dict__ for item in issues], "composition": composed, }, indent=2, sort_keys=True, ) ) return if not issues: print("PASS playbook capability contract conformance") for item in issues: print(f"{item.level:5} {item.path}: {item.message}") if composed is not None: print("") print("Composition") print(json.dumps(composed, indent=2, sort_keys=True)) def load_declarations(paths: list[str]) -> tuple[list[Declaration], list[Issue]]: declarations: list[Declaration] = [] issues: list[Issue] = [] for raw_path in paths: path = Path(raw_path) try: data = load_yaml(path) except Exception as exc: issues.append(issue("ERROR", str(path), f"failed to load declaration: {exc}")) continue declaration = Declaration(path=path, data=data) issues.extend(validate_declaration(declaration)) declarations.append(declaration) return declarations, issues def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Validate NetKingdom Playbook Capability Contract declarations.") parser.add_argument("declarations", nargs="+", help="Declaration YAML files") parser.add_argument("--scenario", help="Optional scenario YAML to compose from declarations") parser.add_argument("--json", action="store_true", help="Print JSON report") return parser def main(argv: list[str] | None = None) -> int: args = build_parser().parse_args(argv) declarations, issues = load_declarations(args.declarations) composed = None if args.scenario: try: scenario = load_yaml(Path(args.scenario)) except Exception as exc: issues.append(issue("ERROR", args.scenario, f"failed to load scenario: {exc}")) scenario = {} scenario_issues, composed = compose_scenario(declarations, scenario) issues.extend(scenario_issues) print_report(issues, composed, args.json) return 1 if any(item.level == "ERROR" for item in issues) else 0 if __name__ == "__main__": sys.exit(main())