Files
info-tech-canon/src/info_tech_canon/profiles.py
2026-05-23 04:26:28 +02:00

403 lines
13 KiB
Python

from __future__ import annotations
from dataclasses import asdict
from pathlib import Path
from typing import Any
import yaml
from .bench import export_mermaid, relationship_summary
REQUIRED_SMALL_SAAS_KINDS = {
"service",
"system",
"tenant",
"user",
"team",
"dataset",
"deployment",
"task",
"policy",
"control",
"evidence",
"incident",
}
def inspect_profile(context: Any, profile: str) -> dict[str, Any]:
definition = load_profile_definition(context, profile)
records = profile_artifact_records(context, profile)
return {
"ok": True,
"profile": definition,
"path": str(profile_path(context, profile)),
"artifact_count": len(records),
"artifacts": [artifact.to_dict() for artifact in records],
}
def validate_profile(context: Any, profile: str) -> dict[str, Any]:
definition = load_profile_definition(context, profile)
records = profile_artifact_records(context, profile)
payloads = load_profile_artifacts(context, records)
errors: list[dict[str, Any]] = []
warnings: list[dict[str, Any]] = []
_check_profile_definition(profile, definition, records, errors)
_check_required_artifact_kinds(profile, payloads, errors)
_check_artifact_payloads(profile, payloads, errors)
_check_service_ownership(payloads, errors)
_check_tenant_namespace_separation(payloads, errors)
_check_user_management_and_access(payloads, errors)
_check_governance_evidence(payloads, errors)
return {
"ok": not errors,
"profile": profile,
"errors": errors,
"warnings": warnings,
"details": {
"artifact_count": len(records),
"payload_count": len(payloads),
"kinds": _kind_counts(payloads),
},
}
def profile_graph(
context: Any,
profile: str,
*,
output_format: str = "json",
) -> dict[str, Any]:
records = profile_artifact_records(context, profile)
record_ids = {artifact.id for artifact in records}
include_ids = set(record_ids)
for artifact in records:
for relationship in artifact.relationships:
target = relationship.get("target")
if isinstance(target, str):
include_ids.add(target)
artifacts = [
artifact for artifact in context.infospace.artifacts if artifact.id in include_ids
]
summary = relationship_summary(artifacts)
if output_format == "mermaid":
return {"ok": True, "profile": profile, "format": "mermaid", "graph": export_mermaid(summary)}
if output_format != "json":
raise ValueError(f"Unsupported graph format: {output_format}")
return {
"ok": True,
"profile": profile,
"format": "json",
"graph": {
"node_count": summary.node_count,
"edge_count": summary.edge_count,
"nodes": summary.nodes,
"edges": [asdict(edge) for edge in summary.edges],
"relationship_types": summary.relationship_types,
},
}
def profile_path(context: Any, profile: str) -> Path:
return context.infospace_root / "profiles" / profile / "profile.yaml"
def load_profile_definition(context: Any, profile: str) -> dict[str, Any]:
path = profile_path(context, profile)
with path.open("r", encoding="utf-8") as handle:
data = yaml.safe_load(handle) or {}
if not isinstance(data, dict):
raise ValueError(f"Profile must be a YAML mapping: {profile}")
return data
def profile_artifact_records(context: Any, profile: str) -> list[Any]:
return [
artifact
for artifact in context.infospace.artifacts
if artifact.id == f"profile/{profile}"
or artifact.provenance.get("profile") == profile
]
def load_profile_artifacts(context: Any, records: list[Any]) -> dict[str, dict[str, Any]]:
payloads: dict[str, dict[str, Any]] = {}
for artifact in records:
path = context.infospace_root / artifact.path
with path.open("r", encoding="utf-8") as handle:
data = yaml.safe_load(handle) or {}
if isinstance(data, dict):
payloads[artifact.id] = data
return payloads
def _check_profile_definition(
profile: str,
definition: dict[str, Any],
records: list[Any],
errors: list[dict[str, Any]],
) -> None:
for field in ("id", "title", "scope", "conformance_level", "required_standards"):
if not definition.get(field):
errors.append(
{
"code": "missing_profile_field",
"profile": profile,
"field": field,
}
)
declared = set(definition.get("artifact_ids") or [])
actual = {artifact.id for artifact in records}
missing = sorted(declared - actual)
for artifact_id in missing:
errors.append(
{
"code": "missing_profile_artifact_record",
"profile": profile,
"artifact_id": artifact_id,
}
)
def _check_required_artifact_kinds(
profile: str,
payloads: dict[str, dict[str, Any]],
errors: list[dict[str, Any]],
) -> None:
kinds = {payload.get("kind") for payload in payloads.values()}
for kind in sorted(REQUIRED_SMALL_SAAS_KINDS - kinds):
errors.append(
{
"code": "missing_required_profile_kind",
"profile": profile,
"kind": kind,
}
)
def _check_artifact_payloads(
profile: str,
payloads: dict[str, dict[str, Any]],
errors: list[dict[str, Any]],
) -> None:
ids = set(payloads)
for artifact_id, payload in payloads.items():
for field in ("id", "kind", "title", "profile"):
if not payload.get(field):
errors.append(
{
"code": "missing_profile_artifact_field",
"profile": profile,
"artifact_id": artifact_id,
"field": field,
}
)
if payload.get("profile") not in {profile, None}:
errors.append(
{
"code": "profile_artifact_profile_mismatch",
"profile": profile,
"artifact_id": artifact_id,
"value": payload.get("profile"),
}
)
for relationship in payload.get("relationships") or []:
target = relationship.get("target")
if target and target not in ids:
errors.append(
{
"code": "profile_relationship_target_not_in_payloads",
"profile": profile,
"artifact_id": artifact_id,
"target": target,
}
)
def _check_service_ownership(
payloads: dict[str, dict[str, Any]],
errors: list[dict[str, Any]],
) -> None:
service = _one_kind(payloads, "service")
if not service:
return
owner_team = service.get("owner_team")
if not _exists_kind(payloads, owner_team, "team"):
errors.append(
{
"code": "invalid_service_owner_team",
"artifact_id": service.get("id"),
"owner_team": owner_team,
}
)
team = payloads.get(str(owner_team))
if team and not _exists_kind(payloads, team.get("owner_user"), "user"):
errors.append(
{
"code": "invalid_team_owner_user",
"artifact_id": team.get("id"),
"owner_user": team.get("owner_user"),
}
)
def _check_tenant_namespace_separation(
payloads: dict[str, dict[str, Any]],
errors: list[dict[str, Any]],
) -> None:
tenants = [payload for payload in payloads.values() if payload.get("kind") == "tenant"]
namespaces = [tenant.get("namespace") for tenant in tenants]
if len(namespaces) != len(set(namespaces)):
errors.append({"code": "duplicate_tenant_namespace"})
deployment = _one_kind(payloads, "deployment")
if deployment:
if deployment.get("namespace_strategy") != "namespace-per-tenant":
errors.append(
{
"code": "invalid_namespace_strategy",
"artifact_id": deployment.get("id"),
"namespace_strategy": deployment.get("namespace_strategy"),
}
)
tenant_namespaces = deployment.get("tenant_namespaces") or {}
for tenant in tenants:
if tenant.get("id") not in tenant_namespaces:
errors.append(
{
"code": "tenant_missing_deployment_namespace",
"tenant_id": tenant.get("id"),
"deployment_id": deployment.get("id"),
}
)
dataset = _one_kind(payloads, "dataset")
if dataset:
if dataset.get("tenant_scope") != "per-tenant":
errors.append(
{
"code": "dataset_not_per_tenant",
"artifact_id": dataset.get("id"),
}
)
tenant_ids = set(dataset.get("tenant_ids") or [])
for tenant in tenants:
if tenant.get("id") not in tenant_ids:
errors.append(
{
"code": "tenant_missing_dataset_partition",
"tenant_id": tenant.get("id"),
"dataset_id": dataset.get("id"),
}
)
def _check_user_management_and_access(
payloads: dict[str, dict[str, Any]],
errors: list[dict[str, Any]],
) -> None:
evidence_ids = _ids_by_kind(payloads, "evidence")
policy_ids = _ids_by_kind(payloads, "policy")
tenant_ids = _ids_by_kind(payloads, "tenant")
for user in [payload for payload in payloads.values() if payload.get("kind") == "user"]:
if not user.get("teams"):
errors.append(
{
"code": "user_missing_team_membership",
"artifact_id": user.get("id"),
}
)
grants = user.get("access_grants") or []
if not grants:
errors.append(
{
"code": "user_missing_access_grant",
"artifact_id": user.get("id"),
}
)
for grant in grants:
if grant.get("tenant_id") not in tenant_ids:
errors.append(
{
"code": "access_grant_missing_tenant",
"artifact_id": user.get("id"),
"tenant_id": grant.get("tenant_id"),
}
)
if grant.get("policy_id") not in policy_ids:
errors.append(
{
"code": "access_grant_missing_policy",
"artifact_id": user.get("id"),
"policy_id": grant.get("policy_id"),
}
)
if grant.get("evidence_id") not in evidence_ids:
errors.append(
{
"code": "access_grant_missing_evidence",
"artifact_id": user.get("id"),
"evidence_id": grant.get("evidence_id"),
}
)
def _check_governance_evidence(
payloads: dict[str, dict[str, Any]],
errors: list[dict[str, Any]],
) -> None:
evidence_ids = _ids_by_kind(payloads, "evidence")
for kind in ("policy", "control", "incident", "service", "dataset", "deployment", "task"):
for payload in [item for item in payloads.values() if item.get("kind") == kind]:
ids = set(payload.get("evidence_ids") or [])
if not ids:
errors.append(
{
"code": "artifact_missing_evidence",
"artifact_id": payload.get("id"),
"kind": kind,
}
)
for evidence_id in ids:
if evidence_id not in evidence_ids:
errors.append(
{
"code": "artifact_missing_evidence_target",
"artifact_id": payload.get("id"),
"evidence_id": evidence_id,
}
)
def _kind_counts(payloads: dict[str, dict[str, Any]]) -> dict[str, int]:
counts: dict[str, int] = {}
for payload in payloads.values():
kind = str(payload.get("kind") or "unknown")
counts[kind] = counts.get(kind, 0) + 1
return dict(sorted(counts.items()))
def _one_kind(payloads: dict[str, dict[str, Any]], kind: str) -> dict[str, Any] | None:
for payload in payloads.values():
if payload.get("kind") == kind:
return payload
return None
def _exists_kind(
payloads: dict[str, dict[str, Any]],
artifact_id: Any,
kind: str,
) -> bool:
payload = payloads.get(str(artifact_id))
return bool(payload and payload.get("kind") == kind)
def _ids_by_kind(payloads: dict[str, dict[str, Any]], kind: str) -> set[str]:
return {
str(payload["id"])
for payload in payloads.values()
if payload.get("kind") == kind and payload.get("id")
}