generated from coulomb/repo-seed
Implement NK-WP-0013 playbook capability contract
This commit is contained in:
28
tools/playbook-capability-contract/README.md
Normal file
28
tools/playbook-capability-contract/README.md
Normal file
@@ -0,0 +1,28 @@
|
||||
# Playbook Capability Contract Validator
|
||||
|
||||
Executable checks for
|
||||
`canon/standards/playbook-capability-contract_v0.1.md`.
|
||||
|
||||
Runtime dependency: Python 3.11+ with `PyYAML`. Fixture tests also
|
||||
require `pytest`.
|
||||
|
||||
Validate a declaration:
|
||||
|
||||
```bash
|
||||
python3 tools/playbook-capability-contract/playbook_contract_validator.py \
|
||||
../railiance-infra/capabilities/playbooks/railiance-infra.bootstrap-host.yaml
|
||||
```
|
||||
|
||||
Validate and compose a sample scenario:
|
||||
|
||||
```bash
|
||||
python3 tools/playbook-capability-contract/playbook_contract_validator.py \
|
||||
../railiance-infra/capabilities/playbooks/railiance-infra.bootstrap-host.yaml \
|
||||
--scenario examples/playbook-capability-contract/scenario-s1-host-bootstrap.yaml
|
||||
```
|
||||
|
||||
Run tests:
|
||||
|
||||
```bash
|
||||
python3 -m pytest tools/playbook-capability-contract/tests
|
||||
```
|
||||
@@ -0,0 +1,557 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Validate NetKingdom Playbook Capability Contract v0.1 declarations.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
API_VERSION = "netkingdom.io/playbook-capability/v0.1"
|
||||
KIND = "PlaybookCapabilityDeclaration"
|
||||
CONTRACT_VERSION = "0.1"
|
||||
|
||||
CAPABILITIES = {
|
||||
"s1.os-baseline": "S1",
|
||||
"s1.secret-bootstrap": "S1",
|
||||
"s2.cluster-runtime": "S2",
|
||||
"s3.platform-services": "S3",
|
||||
"c0.bootstrap-identity": "C0",
|
||||
"c1.lightweight-sso": "C1",
|
||||
"c2a.light-2fa": "C2a",
|
||||
"c2b.token-authority": "C2b",
|
||||
"c3.runtime-secrets": "C3",
|
||||
"c4.fine-grained-authorization": "C4",
|
||||
"c5.enterprise-federation": "C5",
|
||||
"c6.self-optimizing-audit": "C6",
|
||||
}
|
||||
RESOURCE_KINDS = {
|
||||
"identities",
|
||||
"roles_scopes_policies",
|
||||
"secrets_credentials",
|
||||
"infrastructure_resources",
|
||||
}
|
||||
PARAM_TYPES = {"string", "integer", "number", "boolean", "array", "object"}
|
||||
SENSITIVITIES = {"public", "operational", "security_sensitive", "secret_reference"}
|
||||
TUNING_AUTHORITIES = {
|
||||
"playbook_default",
|
||||
"netkingdom_tunable",
|
||||
"platform_only",
|
||||
"tenant_tunable",
|
||||
"forbidden",
|
||||
}
|
||||
TRUST_STATES = {
|
||||
"bare_host_trust",
|
||||
"cluster_trust",
|
||||
"bootstrap_secret_trust",
|
||||
"bootstrap_identity_trust",
|
||||
"runtime_secret_trust",
|
||||
"runtime_identity_trust",
|
||||
"runtime_authorization_trust",
|
||||
"tenant_onboarding_trust",
|
||||
}
|
||||
SCENARIO_AUTHORITIES = {"platform", "netkingdom", "tenant"}
|
||||
|
||||
|
||||
@dataclass
|
||||
class Issue:
|
||||
level: str
|
||||
path: str
|
||||
message: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class Declaration:
|
||||
path: Path
|
||||
data: dict[str, Any]
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return str(self.data.get("metadata", {}).get("id", ""))
|
||||
|
||||
@property
|
||||
def capabilities(self) -> set[str]:
|
||||
values = self.data.get("spec", {}).get("capabilities", [])
|
||||
if not isinstance(values, list):
|
||||
return set()
|
||||
return {str(item.get("id")) for item in values if isinstance(item, dict)}
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, dict[str, Any]]:
|
||||
values = self.data.get("spec", {}).get("parameters", [])
|
||||
if not isinstance(values, list):
|
||||
return {}
|
||||
return {
|
||||
str(item.get("name")): item
|
||||
for item in values
|
||||
if isinstance(item, dict) and item.get("name")
|
||||
}
|
||||
|
||||
|
||||
def issue(level: str, path: str, message: str) -> Issue:
|
||||
return Issue(level, path, message)
|
||||
|
||||
|
||||
def load_yaml(path: Path) -> dict[str, Any]:
|
||||
with path.open("r", encoding="utf-8") as handle:
|
||||
data = yaml.safe_load(handle)
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("document must be a YAML object")
|
||||
return data
|
||||
|
||||
|
||||
def is_type(value: Any, declared_type: str) -> bool:
|
||||
if value is None:
|
||||
return True
|
||||
if declared_type == "string":
|
||||
return isinstance(value, str)
|
||||
if declared_type == "integer":
|
||||
return isinstance(value, int) and not isinstance(value, bool)
|
||||
if declared_type == "number":
|
||||
return (isinstance(value, int) or isinstance(value, float)) and not isinstance(value, bool)
|
||||
if declared_type == "boolean":
|
||||
return isinstance(value, bool)
|
||||
if declared_type == "array":
|
||||
return isinstance(value, list)
|
||||
if declared_type == "object":
|
||||
return isinstance(value, dict)
|
||||
return False
|
||||
|
||||
|
||||
def validate_constraints(value: Any, param: dict[str, Any], path: str) -> list[Issue]:
|
||||
issues: list[Issue] = []
|
||||
constraints = param.get("constraints", {})
|
||||
if constraints is None:
|
||||
return issues
|
||||
if not isinstance(constraints, dict):
|
||||
return [issue("ERROR", f"{path}.constraints", "constraints must be an object")]
|
||||
|
||||
if "enum" in constraints:
|
||||
enum = constraints["enum"]
|
||||
if not isinstance(enum, list) or not enum:
|
||||
issues.append(issue("ERROR", f"{path}.constraints.enum", "enum must be a non-empty list"))
|
||||
elif value is not None and value not in enum:
|
||||
issues.append(issue("ERROR", path, f"value {value!r} is outside enum {enum!r}"))
|
||||
|
||||
if value is None:
|
||||
return issues
|
||||
|
||||
for key, comparator in (("minimum", lambda got, want: got < want), ("maximum", lambda got, want: got > want)):
|
||||
if key in constraints:
|
||||
if not isinstance(constraints[key], (int, float)):
|
||||
issues.append(issue("ERROR", f"{path}.constraints.{key}", f"{key} must be numeric"))
|
||||
elif isinstance(value, (int, float)) and not isinstance(value, bool) and comparator(value, constraints[key]):
|
||||
issues.append(issue("ERROR", path, f"value {value!r} violates {key}={constraints[key]!r}"))
|
||||
|
||||
for key, comparator in (("min_items", lambda got, want: len(got) < want), ("max_items", lambda got, want: len(got) > want)):
|
||||
if key in constraints:
|
||||
if not isinstance(constraints[key], int):
|
||||
issues.append(issue("ERROR", f"{path}.constraints.{key}", f"{key} must be an integer"))
|
||||
elif isinstance(value, list) and comparator(value, constraints[key]):
|
||||
issues.append(issue("ERROR", path, f"value length violates {key}={constraints[key]!r}"))
|
||||
|
||||
if "pattern" in constraints:
|
||||
pattern = constraints["pattern"]
|
||||
if not isinstance(pattern, str):
|
||||
issues.append(issue("ERROR", f"{path}.constraints.pattern", "pattern must be a string"))
|
||||
elif isinstance(value, str) and re.search(pattern, value) is None:
|
||||
issues.append(issue("ERROR", path, f"value {value!r} does not match pattern {pattern!r}"))
|
||||
|
||||
return issues
|
||||
|
||||
|
||||
def validate_required_object(parent: dict[str, Any], path: str, required: list[str]) -> list[Issue]:
|
||||
issues: list[Issue] = []
|
||||
for key in required:
|
||||
if key not in parent:
|
||||
issues.append(issue("ERROR", f"{path}.{key}", "required field missing"))
|
||||
return issues
|
||||
|
||||
|
||||
def validate_metadata(data: dict[str, Any]) -> list[Issue]:
|
||||
metadata = data.get("metadata")
|
||||
if not isinstance(metadata, dict):
|
||||
return [issue("ERROR", "metadata", "metadata must be an object")]
|
||||
|
||||
issues = validate_required_object(
|
||||
metadata,
|
||||
"metadata",
|
||||
["id", "name", "owner", "repo", "domain", "contract_version"],
|
||||
)
|
||||
for key in ("id", "name", "owner", "repo", "domain"):
|
||||
if key in metadata and not isinstance(metadata[key], str):
|
||||
issues.append(issue("ERROR", f"metadata.{key}", "must be a string"))
|
||||
if metadata.get("contract_version") != CONTRACT_VERSION:
|
||||
issues.append(issue("ERROR", "metadata.contract_version", f"must be {CONTRACT_VERSION!r}"))
|
||||
return issues
|
||||
|
||||
|
||||
def validate_playbook(spec: dict[str, Any]) -> list[Issue]:
|
||||
playbook = spec.get("playbook")
|
||||
if not isinstance(playbook, dict):
|
||||
return [issue("ERROR", "spec.playbook", "playbook must be an object")]
|
||||
|
||||
issues = validate_required_object(playbook, "spec.playbook", ["path", "type", "invocation", "description"])
|
||||
for key in ("path", "type", "invocation", "description"):
|
||||
if key in playbook and not isinstance(playbook[key], str):
|
||||
issues.append(issue("ERROR", f"spec.playbook.{key}", "must be a string"))
|
||||
return issues
|
||||
|
||||
|
||||
def validate_capabilities(spec: dict[str, Any]) -> list[Issue]:
|
||||
values = spec.get("capabilities")
|
||||
if not isinstance(values, list) or not values:
|
||||
return [issue("ERROR", "spec.capabilities", "must be a non-empty list")]
|
||||
|
||||
issues: list[Issue] = []
|
||||
seen: set[str] = set()
|
||||
for idx, item in enumerate(values):
|
||||
path = f"spec.capabilities[{idx}]"
|
||||
if not isinstance(item, dict):
|
||||
issues.append(issue("ERROR", path, "capability must be an object"))
|
||||
continue
|
||||
issues.extend(validate_required_object(item, path, ["id", "tier", "resource_kinds", "description"]))
|
||||
cap_id = item.get("id")
|
||||
tier = item.get("tier")
|
||||
if cap_id in seen:
|
||||
issues.append(issue("ERROR", f"{path}.id", f"duplicate capability id {cap_id!r}"))
|
||||
seen.add(str(cap_id))
|
||||
expected_tier = CAPABILITIES.get(cap_id)
|
||||
if expected_tier is None:
|
||||
issues.append(issue("ERROR", f"{path}.id", f"unknown capability id {cap_id!r}"))
|
||||
elif tier != expected_tier:
|
||||
issues.append(issue("ERROR", f"{path}.tier", f"tier must be {expected_tier!r} for {cap_id!r}"))
|
||||
|
||||
resource_kinds = item.get("resource_kinds")
|
||||
if not isinstance(resource_kinds, list) or not resource_kinds:
|
||||
issues.append(issue("ERROR", f"{path}.resource_kinds", "must be a non-empty list"))
|
||||
else:
|
||||
unknown = sorted(set(str(kind) for kind in resource_kinds) - RESOURCE_KINDS)
|
||||
if unknown:
|
||||
issues.append(issue("ERROR", f"{path}.resource_kinds", f"unknown resource kinds: {unknown}"))
|
||||
return issues
|
||||
|
||||
|
||||
def validate_parameters(spec: dict[str, Any]) -> list[Issue]:
|
||||
values = spec.get("parameters")
|
||||
if not isinstance(values, list):
|
||||
return [issue("ERROR", "spec.parameters", "must be a list")]
|
||||
|
||||
issues: list[Issue] = []
|
||||
seen: set[str] = set()
|
||||
for idx, item in enumerate(values):
|
||||
path = f"spec.parameters[{idx}]"
|
||||
if not isinstance(item, dict):
|
||||
issues.append(issue("ERROR", path, "parameter must be an object"))
|
||||
continue
|
||||
issues.extend(
|
||||
validate_required_object(
|
||||
item,
|
||||
path,
|
||||
["name", "type", "required", "sensitivity", "tuning_authority", "description"],
|
||||
)
|
||||
)
|
||||
name = item.get("name")
|
||||
declared_type = item.get("type")
|
||||
if name in seen:
|
||||
issues.append(issue("ERROR", f"{path}.name", f"duplicate parameter {name!r}"))
|
||||
seen.add(str(name))
|
||||
|
||||
if declared_type not in PARAM_TYPES:
|
||||
issues.append(issue("ERROR", f"{path}.type", f"unknown parameter type {declared_type!r}"))
|
||||
if "required" in item and not isinstance(item["required"], bool):
|
||||
issues.append(issue("ERROR", f"{path}.required", "required must be boolean"))
|
||||
if item.get("sensitivity") not in SENSITIVITIES:
|
||||
issues.append(issue("ERROR", f"{path}.sensitivity", f"unknown sensitivity {item.get('sensitivity')!r}"))
|
||||
if item.get("tuning_authority") not in TUNING_AUTHORITIES:
|
||||
issues.append(issue("ERROR", f"{path}.tuning_authority", f"unknown tuning authority {item.get('tuning_authority')!r}"))
|
||||
if item.get("sensitivity") in {"security_sensitive", "secret_reference"} and item.get("tuning_authority") == "tenant_tunable":
|
||||
issues.append(issue("ERROR", path, "security-sensitive parameters cannot be tenant_tunable"))
|
||||
|
||||
if "default" in item and declared_type in PARAM_TYPES and not is_type(item["default"], declared_type):
|
||||
issues.append(issue("ERROR", f"{path}.default", f"default does not match type {declared_type!r}"))
|
||||
if declared_type in PARAM_TYPES:
|
||||
issues.extend(validate_constraints(item.get("default"), item, path))
|
||||
return issues
|
||||
|
||||
|
||||
def validate_responsibilities(spec: dict[str, Any]) -> list[Issue]:
|
||||
values = spec.get("responsibilities")
|
||||
if not isinstance(values, list) or not values:
|
||||
return [issue("ERROR", "spec.responsibilities", "must be a non-empty list")]
|
||||
|
||||
issues: list[Issue] = []
|
||||
for idx, item in enumerate(values):
|
||||
path = f"spec.responsibilities[{idx}]"
|
||||
if not isinstance(item, dict):
|
||||
issues.append(issue("ERROR", path, "responsibility must be an object"))
|
||||
continue
|
||||
issues.extend(
|
||||
validate_required_object(
|
||||
item,
|
||||
path,
|
||||
["resource_kind", "owner", "resources", "repo_owns", "netkingdom_orchestrates"],
|
||||
)
|
||||
)
|
||||
if item.get("resource_kind") not in RESOURCE_KINDS:
|
||||
issues.append(issue("ERROR", f"{path}.resource_kind", f"unknown resource kind {item.get('resource_kind')!r}"))
|
||||
resources = item.get("resources")
|
||||
if not isinstance(resources, list) or not resources:
|
||||
issues.append(issue("ERROR", f"{path}.resources", "resources must be a non-empty list"))
|
||||
for key in ("owner", "repo_owns", "netkingdom_orchestrates"):
|
||||
if key in item and not isinstance(item[key], str):
|
||||
issues.append(issue("ERROR", f"{path}.{key}", "must be a string"))
|
||||
return issues
|
||||
|
||||
|
||||
def validate_trust_states(spec: dict[str, Any]) -> list[Issue]:
|
||||
trust = spec.get("trust")
|
||||
if not isinstance(trust, dict):
|
||||
return [issue("ERROR", "spec.trust", "trust must be an object")]
|
||||
|
||||
issues: list[Issue] = []
|
||||
for section in ("requires", "satisfies"):
|
||||
values = trust.get(section, [])
|
||||
path = f"spec.trust.{section}"
|
||||
if not isinstance(values, list):
|
||||
issues.append(issue("ERROR", path, "must be a list"))
|
||||
continue
|
||||
for idx, item in enumerate(values):
|
||||
item_path = f"{path}[{idx}]"
|
||||
if not isinstance(item, dict):
|
||||
issues.append(issue("ERROR", item_path, "trust state entry must be an object"))
|
||||
continue
|
||||
if item.get("state") not in TRUST_STATES:
|
||||
issues.append(issue("ERROR", f"{item_path}.state", f"unknown trust state {item.get('state')!r}"))
|
||||
checks = item.get("readiness_checks", [])
|
||||
if not isinstance(checks, list):
|
||||
issues.append(issue("ERROR", f"{item_path}.readiness_checks", "must be a list"))
|
||||
continue
|
||||
if section == "satisfies" and not checks:
|
||||
issues.append(issue("ERROR", f"{item_path}.readiness_checks", "satisfied trust states require readiness checks"))
|
||||
for cidx, check in enumerate(checks):
|
||||
check_path = f"{item_path}.readiness_checks[{cidx}]"
|
||||
if not isinstance(check, dict):
|
||||
issues.append(issue("ERROR", check_path, "readiness check must be an object"))
|
||||
continue
|
||||
issues.extend(validate_required_object(check, check_path, ["id", "description", "evidence"]))
|
||||
return issues
|
||||
|
||||
|
||||
def validate_catalog(spec: dict[str, Any]) -> list[Issue]:
|
||||
catalog = spec.get("catalog")
|
||||
if not isinstance(catalog, dict):
|
||||
return [issue("ERROR", "spec.catalog", "catalog must be an object")]
|
||||
|
||||
issues = validate_required_object(catalog, "spec.catalog", ["publish", "maturity", "consumers"])
|
||||
if "consumers" in catalog and not isinstance(catalog["consumers"], list):
|
||||
issues.append(issue("ERROR", "spec.catalog.consumers", "consumers must be a list"))
|
||||
return issues
|
||||
|
||||
|
||||
def validate_declaration(declaration: Declaration) -> list[Issue]:
|
||||
data = declaration.data
|
||||
issues: list[Issue] = []
|
||||
issues.extend(validate_required_object(data, "$", ["apiVersion", "kind", "metadata", "spec"]))
|
||||
if data.get("apiVersion") != API_VERSION:
|
||||
issues.append(issue("ERROR", "apiVersion", f"must be {API_VERSION!r}"))
|
||||
if data.get("kind") != KIND:
|
||||
issues.append(issue("ERROR", "kind", f"must be {KIND!r}"))
|
||||
issues.extend(validate_metadata(data))
|
||||
|
||||
spec = data.get("spec")
|
||||
if not isinstance(spec, dict):
|
||||
return issues + [issue("ERROR", "spec", "spec must be an object")]
|
||||
|
||||
issues.extend(validate_required_object(spec, "spec", ["playbook", "capabilities", "parameters", "responsibilities", "trust", "catalog"]))
|
||||
issues.extend(validate_playbook(spec))
|
||||
issues.extend(validate_capabilities(spec))
|
||||
issues.extend(validate_parameters(spec))
|
||||
issues.extend(validate_responsibilities(spec))
|
||||
issues.extend(validate_trust_states(spec))
|
||||
issues.extend(validate_catalog(spec))
|
||||
return issues
|
||||
|
||||
|
||||
def effective_parameter_value(param: dict[str, Any], overrides: dict[str, Any], declaration_id: str) -> tuple[bool, Any]:
|
||||
name = str(param.get("name"))
|
||||
if name in overrides:
|
||||
return True, overrides[name]
|
||||
if "default" in param:
|
||||
return False, param["default"]
|
||||
return False, None
|
||||
|
||||
|
||||
def validate_override_allowed(param: dict[str, Any], value: Any, scenario_authority: str, path: str) -> list[Issue]:
|
||||
issues: list[Issue] = []
|
||||
authority = param.get("tuning_authority")
|
||||
sensitivity = param.get("sensitivity")
|
||||
name = param.get("name")
|
||||
declared_type = str(param.get("type"))
|
||||
|
||||
if authority in {"forbidden", "playbook_default"}:
|
||||
issues.append(issue("ERROR", path, f"parameter {name!r} cannot be overridden"))
|
||||
if scenario_authority == "tenant" and authority in {"platform_only", "forbidden", "playbook_default"}:
|
||||
issues.append(issue("ERROR", path, f"tenant authority cannot override {authority} parameter {name!r}"))
|
||||
if scenario_authority == "tenant" and sensitivity in {"security_sensitive", "secret_reference"}:
|
||||
issues.append(issue("ERROR", path, f"tenant authority cannot override {sensitivity} parameter {name!r}"))
|
||||
|
||||
if not is_type(value, declared_type):
|
||||
issues.append(issue("ERROR", path, f"override for {name!r} does not match type {declared_type!r}"))
|
||||
issues.extend(validate_constraints(value, param, path))
|
||||
return issues
|
||||
|
||||
|
||||
def compose_scenario(declarations: list[Declaration], scenario: dict[str, Any]) -> tuple[list[Issue], dict[str, Any]]:
|
||||
issues: list[Issue] = []
|
||||
authority = scenario.get("authority", "platform")
|
||||
if authority not in SCENARIO_AUTHORITIES:
|
||||
issues.append(issue("ERROR", "scenario.authority", f"unknown authority {authority!r}"))
|
||||
authority = "platform"
|
||||
|
||||
requires = scenario.get("requires", {})
|
||||
required_caps = requires.get("capabilities", []) if isinstance(requires, dict) else []
|
||||
if not isinstance(required_caps, list) or not required_caps:
|
||||
issues.append(issue("ERROR", "scenario.requires.capabilities", "scenario must require at least one capability"))
|
||||
required_caps = []
|
||||
|
||||
overrides = scenario.get("parameter_overrides", {})
|
||||
if overrides is None:
|
||||
overrides = {}
|
||||
if not isinstance(overrides, dict):
|
||||
issues.append(issue("ERROR", "scenario.parameter_overrides", "must be an object"))
|
||||
overrides = {}
|
||||
|
||||
selected: list[Declaration] = []
|
||||
for cap_id in required_caps:
|
||||
matches = [declaration for declaration in declarations if cap_id in declaration.capabilities]
|
||||
if not matches:
|
||||
issues.append(issue("ERROR", "scenario.requires.capabilities", f"no declaration provides {cap_id!r}"))
|
||||
continue
|
||||
selected.append(matches[0])
|
||||
|
||||
# Preserve order while deduplicating declarations selected for several capabilities.
|
||||
selected_by_id: dict[str, Declaration] = {}
|
||||
for declaration in selected:
|
||||
selected_by_id.setdefault(declaration.id, declaration)
|
||||
|
||||
composed = {
|
||||
"scenario": scenario.get("id", "scenario:unnamed"),
|
||||
"authority": authority,
|
||||
"selected_declarations": [],
|
||||
}
|
||||
|
||||
for declaration_id, declaration in selected_by_id.items():
|
||||
declaration_overrides = overrides.get(declaration_id, {})
|
||||
if not isinstance(declaration_overrides, dict):
|
||||
issues.append(issue("ERROR", f"scenario.parameter_overrides.{declaration_id}", "must be an object"))
|
||||
declaration_overrides = {}
|
||||
|
||||
params_out: dict[str, Any] = {}
|
||||
for name in declaration_overrides:
|
||||
if name not in declaration.parameters:
|
||||
issues.append(issue("ERROR", f"scenario.parameter_overrides.{declaration_id}.{name}", "unknown parameter override"))
|
||||
|
||||
for param in declaration.parameters.values():
|
||||
name = str(param.get("name"))
|
||||
overridden, value = effective_parameter_value(param, declaration_overrides, declaration_id)
|
||||
if param.get("required") is True and value is None:
|
||||
issues.append(issue("ERROR", f"scenario.parameter_overrides.{declaration_id}.{name}", "required parameter has no default or override"))
|
||||
if overridden:
|
||||
issues.extend(validate_override_allowed(param, value, str(authority), f"scenario.parameter_overrides.{declaration_id}.{name}"))
|
||||
params_out[name] = {
|
||||
"value": value,
|
||||
"source": "override" if overridden else "default",
|
||||
"sensitivity": param.get("sensitivity"),
|
||||
"tuning_authority": param.get("tuning_authority"),
|
||||
}
|
||||
|
||||
composed["selected_declarations"].append(
|
||||
{
|
||||
"id": declaration_id,
|
||||
"path": str(declaration.path),
|
||||
"capabilities": sorted(declaration.capabilities),
|
||||
"parameters": params_out,
|
||||
}
|
||||
)
|
||||
|
||||
return issues, composed
|
||||
|
||||
|
||||
def print_report(issues: list[Issue], composed: dict[str, Any] | None, json_output: bool) -> None:
|
||||
if json_output:
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"issues": [item.__dict__ for item in issues],
|
||||
"composition": composed,
|
||||
},
|
||||
indent=2,
|
||||
sort_keys=True,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
if not issues:
|
||||
print("PASS playbook capability contract conformance")
|
||||
for item in issues:
|
||||
print(f"{item.level:5} {item.path}: {item.message}")
|
||||
if composed is not None:
|
||||
print("")
|
||||
print("Composition")
|
||||
print(json.dumps(composed, indent=2, sort_keys=True))
|
||||
|
||||
|
||||
def load_declarations(paths: list[str]) -> tuple[list[Declaration], list[Issue]]:
|
||||
declarations: list[Declaration] = []
|
||||
issues: list[Issue] = []
|
||||
for raw_path in paths:
|
||||
path = Path(raw_path)
|
||||
try:
|
||||
data = load_yaml(path)
|
||||
except Exception as exc:
|
||||
issues.append(issue("ERROR", str(path), f"failed to load declaration: {exc}"))
|
||||
continue
|
||||
declaration = Declaration(path=path, data=data)
|
||||
issues.extend(validate_declaration(declaration))
|
||||
declarations.append(declaration)
|
||||
return declarations, issues
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="Validate NetKingdom Playbook Capability Contract declarations.")
|
||||
parser.add_argument("declarations", nargs="+", help="Declaration YAML files")
|
||||
parser.add_argument("--scenario", help="Optional scenario YAML to compose from declarations")
|
||||
parser.add_argument("--json", action="store_true", help="Print JSON report")
|
||||
return parser
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
args = build_parser().parse_args(argv)
|
||||
declarations, issues = load_declarations(args.declarations)
|
||||
composed = None
|
||||
|
||||
if args.scenario:
|
||||
try:
|
||||
scenario = load_yaml(Path(args.scenario))
|
||||
except Exception as exc:
|
||||
issues.append(issue("ERROR", args.scenario, f"failed to load scenario: {exc}"))
|
||||
scenario = {}
|
||||
scenario_issues, composed = compose_scenario(declarations, scenario)
|
||||
issues.extend(scenario_issues)
|
||||
|
||||
print_report(issues, composed, args.json)
|
||||
return 1 if any(item.level == "ERROR" for item in issues) else 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -0,0 +1,198 @@
|
||||
import importlib.util
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
TOOL_PATH = Path(__file__).resolve().parents[1] / "playbook_contract_validator.py"
|
||||
SPEC = importlib.util.spec_from_file_location("playbook_contract_validator", TOOL_PATH)
|
||||
validator = importlib.util.module_from_spec(SPEC)
|
||||
assert SPEC.loader is not None
|
||||
sys.modules[SPEC.name] = validator
|
||||
SPEC.loader.exec_module(validator)
|
||||
|
||||
|
||||
def valid_declaration() -> dict:
|
||||
return {
|
||||
"apiVersion": validator.API_VERSION,
|
||||
"kind": validator.KIND,
|
||||
"metadata": {
|
||||
"id": "railiance-infra.bootstrap-host",
|
||||
"name": "Railiance S1 host bootstrap",
|
||||
"owner": "railiance-infra",
|
||||
"repo": "railiance-infra",
|
||||
"domain": "railiance",
|
||||
"contract_version": "0.1",
|
||||
},
|
||||
"spec": {
|
||||
"playbook": {
|
||||
"path": "ansible/playbooks/bootstrap.yaml",
|
||||
"type": "ansible",
|
||||
"invocation": "make converge",
|
||||
"description": "Converges the S1 host baseline.",
|
||||
},
|
||||
"capabilities": [
|
||||
{
|
||||
"id": "s1.os-baseline",
|
||||
"tier": "S1",
|
||||
"resource_kinds": ["infrastructure_resources", "secrets_credentials"],
|
||||
"description": "OS baseline and bootstrap secret handling.",
|
||||
}
|
||||
],
|
||||
"parameters": [
|
||||
{
|
||||
"name": "target_hosts",
|
||||
"type": "array",
|
||||
"required": True,
|
||||
"constraints": {"min_items": 1},
|
||||
"sensitivity": "operational",
|
||||
"tuning_authority": "netkingdom_tunable",
|
||||
"description": "Inventory hosts to converge.",
|
||||
},
|
||||
{
|
||||
"name": "swapfile_size_mb",
|
||||
"type": "integer",
|
||||
"required": False,
|
||||
"default": 4096,
|
||||
"constraints": {"minimum": 0, "maximum": 65536},
|
||||
"sensitivity": "operational",
|
||||
"tuning_authority": "netkingdom_tunable",
|
||||
"description": "Swap file size.",
|
||||
},
|
||||
{
|
||||
"name": "wireguard_enabled",
|
||||
"type": "boolean",
|
||||
"required": False,
|
||||
"default": False,
|
||||
"sensitivity": "security_sensitive",
|
||||
"tuning_authority": "platform_only",
|
||||
"description": "Enable WireGuard role.",
|
||||
},
|
||||
],
|
||||
"responsibilities": [
|
||||
{
|
||||
"resource_kind": "infrastructure_resources",
|
||||
"owner": "railiance-infra",
|
||||
"resources": ["server:target_hosts", "os-baseline"],
|
||||
"repo_owns": "Ansible convergence mechanics.",
|
||||
"netkingdom_orchestrates": "Whether S1 is selected for the scenario.",
|
||||
}
|
||||
],
|
||||
"trust": {
|
||||
"requires": [],
|
||||
"satisfies": [
|
||||
{
|
||||
"state": "bare_host_trust",
|
||||
"readiness_checks": [
|
||||
{
|
||||
"id": "os-baseline-converged",
|
||||
"description": "Ansible baseline converged.",
|
||||
"evidence": "bootstrap playbook completed",
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
},
|
||||
"catalog": {
|
||||
"publish": "capabilities/playbooks/railiance-infra.bootstrap-host.yaml",
|
||||
"maturity": "reference",
|
||||
"consumers": ["netkingdom-meta-orchestration"],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def declaration_from(data: dict, tmp_path: Path) -> validator.Declaration:
|
||||
path = tmp_path / "declaration.yaml"
|
||||
path.write_text(yaml.safe_dump(data, sort_keys=False), encoding="utf-8")
|
||||
return validator.Declaration(path=path, data=data)
|
||||
|
||||
|
||||
def error_messages(issues):
|
||||
return [item.message for item in issues if item.level == "ERROR"]
|
||||
|
||||
|
||||
def test_valid_declaration_passes(tmp_path):
|
||||
declaration = declaration_from(valid_declaration(), tmp_path)
|
||||
|
||||
issues = validator.validate_declaration(declaration)
|
||||
|
||||
assert error_messages(issues) == []
|
||||
|
||||
|
||||
def test_unknown_capability_fails(tmp_path):
|
||||
data = valid_declaration()
|
||||
data["spec"]["capabilities"][0]["id"] = "s9.magic"
|
||||
declaration = declaration_from(data, tmp_path)
|
||||
|
||||
issues = validator.validate_declaration(declaration)
|
||||
|
||||
assert any("unknown capability id" in msg for msg in error_messages(issues))
|
||||
|
||||
|
||||
def test_tenant_tunable_secret_reference_fails(tmp_path):
|
||||
data = valid_declaration()
|
||||
data["spec"]["parameters"][2]["sensitivity"] = "secret_reference"
|
||||
data["spec"]["parameters"][2]["tuning_authority"] = "tenant_tunable"
|
||||
declaration = declaration_from(data, tmp_path)
|
||||
|
||||
issues = validator.validate_declaration(declaration)
|
||||
|
||||
assert any("security-sensitive parameters cannot be tenant_tunable" in msg for msg in error_messages(issues))
|
||||
|
||||
|
||||
def test_scenario_composition_selects_and_overrides(tmp_path):
|
||||
declaration = declaration_from(valid_declaration(), tmp_path)
|
||||
scenario = {
|
||||
"id": "scenario:s1-host-bootstrap-reference",
|
||||
"authority": "platform",
|
||||
"requires": {"capabilities": ["s1.os-baseline"]},
|
||||
"parameter_overrides": {
|
||||
"railiance-infra.bootstrap-host": {
|
||||
"target_hosts": ["railiance01"],
|
||||
"swapfile_size_mb": 8192,
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
issues, composed = validator.compose_scenario([declaration], scenario)
|
||||
|
||||
assert error_messages(issues) == []
|
||||
selected = composed["selected_declarations"][0]
|
||||
assert selected["id"] == "railiance-infra.bootstrap-host"
|
||||
assert selected["parameters"]["target_hosts"]["source"] == "override"
|
||||
assert selected["parameters"]["swapfile_size_mb"]["value"] == 8192
|
||||
|
||||
|
||||
def test_tenant_authority_cannot_override_platform_only(tmp_path):
|
||||
declaration = declaration_from(valid_declaration(), tmp_path)
|
||||
scenario = {
|
||||
"id": "scenario:bad-tenant-override",
|
||||
"authority": "tenant",
|
||||
"requires": {"capabilities": ["s1.os-baseline"]},
|
||||
"parameter_overrides": {
|
||||
"railiance-infra.bootstrap-host": {
|
||||
"target_hosts": ["tenant-host"],
|
||||
"wireguard_enabled": True,
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
issues, _ = validator.compose_scenario([declaration], scenario)
|
||||
|
||||
assert any("tenant authority cannot override platform_only" in msg for msg in error_messages(issues))
|
||||
|
||||
|
||||
def test_required_parameter_without_override_fails(tmp_path):
|
||||
declaration = declaration_from(valid_declaration(), tmp_path)
|
||||
scenario = {
|
||||
"id": "scenario:missing-required-parameter",
|
||||
"authority": "platform",
|
||||
"requires": {"capabilities": ["s1.os-baseline"]},
|
||||
"parameter_overrides": {},
|
||||
}
|
||||
|
||||
issues, _ = validator.compose_scenario([declaration], scenario)
|
||||
|
||||
assert any("required parameter has no default or override" in msg for msg in error_messages(issues))
|
||||
Reference in New Issue
Block a user