generated from coulomb/repo-seed
Add small SaaS profile proof
This commit is contained in:
402
src/info_tech_canon/profiles.py
Normal file
402
src/info_tech_canon/profiles.py
Normal file
@@ -0,0 +1,402 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import asdict
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
from .bench import export_mermaid, relationship_summary
|
||||
|
||||
|
||||
REQUIRED_SMALL_SAAS_KINDS = {
|
||||
"service",
|
||||
"system",
|
||||
"tenant",
|
||||
"user",
|
||||
"team",
|
||||
"dataset",
|
||||
"deployment",
|
||||
"task",
|
||||
"policy",
|
||||
"control",
|
||||
"evidence",
|
||||
"incident",
|
||||
}
|
||||
|
||||
|
||||
def inspect_profile(context: Any, profile: str) -> dict[str, Any]:
|
||||
definition = load_profile_definition(context, profile)
|
||||
records = profile_artifact_records(context, profile)
|
||||
return {
|
||||
"ok": True,
|
||||
"profile": definition,
|
||||
"path": str(profile_path(context, profile)),
|
||||
"artifact_count": len(records),
|
||||
"artifacts": [artifact.to_dict() for artifact in records],
|
||||
}
|
||||
|
||||
|
||||
def validate_profile(context: Any, profile: str) -> dict[str, Any]:
|
||||
definition = load_profile_definition(context, profile)
|
||||
records = profile_artifact_records(context, profile)
|
||||
payloads = load_profile_artifacts(context, records)
|
||||
errors: list[dict[str, Any]] = []
|
||||
warnings: list[dict[str, Any]] = []
|
||||
|
||||
_check_profile_definition(profile, definition, records, errors)
|
||||
_check_required_artifact_kinds(profile, payloads, errors)
|
||||
_check_artifact_payloads(profile, payloads, errors)
|
||||
_check_service_ownership(payloads, errors)
|
||||
_check_tenant_namespace_separation(payloads, errors)
|
||||
_check_user_management_and_access(payloads, errors)
|
||||
_check_governance_evidence(payloads, errors)
|
||||
|
||||
return {
|
||||
"ok": not errors,
|
||||
"profile": profile,
|
||||
"errors": errors,
|
||||
"warnings": warnings,
|
||||
"details": {
|
||||
"artifact_count": len(records),
|
||||
"payload_count": len(payloads),
|
||||
"kinds": _kind_counts(payloads),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def profile_graph(
|
||||
context: Any,
|
||||
profile: str,
|
||||
*,
|
||||
output_format: str = "json",
|
||||
) -> dict[str, Any]:
|
||||
records = profile_artifact_records(context, profile)
|
||||
record_ids = {artifact.id for artifact in records}
|
||||
include_ids = set(record_ids)
|
||||
for artifact in records:
|
||||
for relationship in artifact.relationships:
|
||||
target = relationship.get("target")
|
||||
if isinstance(target, str):
|
||||
include_ids.add(target)
|
||||
artifacts = [
|
||||
artifact for artifact in context.infospace.artifacts if artifact.id in include_ids
|
||||
]
|
||||
summary = relationship_summary(artifacts)
|
||||
if output_format == "mermaid":
|
||||
return {"ok": True, "profile": profile, "format": "mermaid", "graph": export_mermaid(summary)}
|
||||
if output_format != "json":
|
||||
raise ValueError(f"Unsupported graph format: {output_format}")
|
||||
return {
|
||||
"ok": True,
|
||||
"profile": profile,
|
||||
"format": "json",
|
||||
"graph": {
|
||||
"node_count": summary.node_count,
|
||||
"edge_count": summary.edge_count,
|
||||
"nodes": summary.nodes,
|
||||
"edges": [asdict(edge) for edge in summary.edges],
|
||||
"relationship_types": summary.relationship_types,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def profile_path(context: Any, profile: str) -> Path:
|
||||
return context.infospace_root / "profiles" / profile / "profile.yaml"
|
||||
|
||||
|
||||
def load_profile_definition(context: Any, profile: str) -> dict[str, Any]:
|
||||
path = profile_path(context, profile)
|
||||
with path.open("r", encoding="utf-8") as handle:
|
||||
data = yaml.safe_load(handle) or {}
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError(f"Profile must be a YAML mapping: {profile}")
|
||||
return data
|
||||
|
||||
|
||||
def profile_artifact_records(context: Any, profile: str) -> list[Any]:
|
||||
return [
|
||||
artifact
|
||||
for artifact in context.infospace.artifacts
|
||||
if artifact.id == f"profile/{profile}"
|
||||
or artifact.provenance.get("profile") == profile
|
||||
]
|
||||
|
||||
|
||||
def load_profile_artifacts(context: Any, records: list[Any]) -> dict[str, dict[str, Any]]:
|
||||
payloads: dict[str, dict[str, Any]] = {}
|
||||
for artifact in records:
|
||||
path = context.infospace_root / artifact.path
|
||||
with path.open("r", encoding="utf-8") as handle:
|
||||
data = yaml.safe_load(handle) or {}
|
||||
if isinstance(data, dict):
|
||||
payloads[artifact.id] = data
|
||||
return payloads
|
||||
|
||||
|
||||
def _check_profile_definition(
|
||||
profile: str,
|
||||
definition: dict[str, Any],
|
||||
records: list[Any],
|
||||
errors: list[dict[str, Any]],
|
||||
) -> None:
|
||||
for field in ("id", "title", "scope", "conformance_level", "required_standards"):
|
||||
if not definition.get(field):
|
||||
errors.append(
|
||||
{
|
||||
"code": "missing_profile_field",
|
||||
"profile": profile,
|
||||
"field": field,
|
||||
}
|
||||
)
|
||||
declared = set(definition.get("artifact_ids") or [])
|
||||
actual = {artifact.id for artifact in records}
|
||||
missing = sorted(declared - actual)
|
||||
for artifact_id in missing:
|
||||
errors.append(
|
||||
{
|
||||
"code": "missing_profile_artifact_record",
|
||||
"profile": profile,
|
||||
"artifact_id": artifact_id,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _check_required_artifact_kinds(
|
||||
profile: str,
|
||||
payloads: dict[str, dict[str, Any]],
|
||||
errors: list[dict[str, Any]],
|
||||
) -> None:
|
||||
kinds = {payload.get("kind") for payload in payloads.values()}
|
||||
for kind in sorted(REQUIRED_SMALL_SAAS_KINDS - kinds):
|
||||
errors.append(
|
||||
{
|
||||
"code": "missing_required_profile_kind",
|
||||
"profile": profile,
|
||||
"kind": kind,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _check_artifact_payloads(
|
||||
profile: str,
|
||||
payloads: dict[str, dict[str, Any]],
|
||||
errors: list[dict[str, Any]],
|
||||
) -> None:
|
||||
ids = set(payloads)
|
||||
for artifact_id, payload in payloads.items():
|
||||
for field in ("id", "kind", "title", "profile"):
|
||||
if not payload.get(field):
|
||||
errors.append(
|
||||
{
|
||||
"code": "missing_profile_artifact_field",
|
||||
"profile": profile,
|
||||
"artifact_id": artifact_id,
|
||||
"field": field,
|
||||
}
|
||||
)
|
||||
if payload.get("profile") not in {profile, None}:
|
||||
errors.append(
|
||||
{
|
||||
"code": "profile_artifact_profile_mismatch",
|
||||
"profile": profile,
|
||||
"artifact_id": artifact_id,
|
||||
"value": payload.get("profile"),
|
||||
}
|
||||
)
|
||||
for relationship in payload.get("relationships") or []:
|
||||
target = relationship.get("target")
|
||||
if target and target not in ids:
|
||||
errors.append(
|
||||
{
|
||||
"code": "profile_relationship_target_not_in_payloads",
|
||||
"profile": profile,
|
||||
"artifact_id": artifact_id,
|
||||
"target": target,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _check_service_ownership(
|
||||
payloads: dict[str, dict[str, Any]],
|
||||
errors: list[dict[str, Any]],
|
||||
) -> None:
|
||||
service = _one_kind(payloads, "service")
|
||||
if not service:
|
||||
return
|
||||
owner_team = service.get("owner_team")
|
||||
if not _exists_kind(payloads, owner_team, "team"):
|
||||
errors.append(
|
||||
{
|
||||
"code": "invalid_service_owner_team",
|
||||
"artifact_id": service.get("id"),
|
||||
"owner_team": owner_team,
|
||||
}
|
||||
)
|
||||
team = payloads.get(str(owner_team))
|
||||
if team and not _exists_kind(payloads, team.get("owner_user"), "user"):
|
||||
errors.append(
|
||||
{
|
||||
"code": "invalid_team_owner_user",
|
||||
"artifact_id": team.get("id"),
|
||||
"owner_user": team.get("owner_user"),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _check_tenant_namespace_separation(
|
||||
payloads: dict[str, dict[str, Any]],
|
||||
errors: list[dict[str, Any]],
|
||||
) -> None:
|
||||
tenants = [payload for payload in payloads.values() if payload.get("kind") == "tenant"]
|
||||
namespaces = [tenant.get("namespace") for tenant in tenants]
|
||||
if len(namespaces) != len(set(namespaces)):
|
||||
errors.append({"code": "duplicate_tenant_namespace"})
|
||||
deployment = _one_kind(payloads, "deployment")
|
||||
if deployment:
|
||||
if deployment.get("namespace_strategy") != "namespace-per-tenant":
|
||||
errors.append(
|
||||
{
|
||||
"code": "invalid_namespace_strategy",
|
||||
"artifact_id": deployment.get("id"),
|
||||
"namespace_strategy": deployment.get("namespace_strategy"),
|
||||
}
|
||||
)
|
||||
tenant_namespaces = deployment.get("tenant_namespaces") or {}
|
||||
for tenant in tenants:
|
||||
if tenant.get("id") not in tenant_namespaces:
|
||||
errors.append(
|
||||
{
|
||||
"code": "tenant_missing_deployment_namespace",
|
||||
"tenant_id": tenant.get("id"),
|
||||
"deployment_id": deployment.get("id"),
|
||||
}
|
||||
)
|
||||
dataset = _one_kind(payloads, "dataset")
|
||||
if dataset:
|
||||
if dataset.get("tenant_scope") != "per-tenant":
|
||||
errors.append(
|
||||
{
|
||||
"code": "dataset_not_per_tenant",
|
||||
"artifact_id": dataset.get("id"),
|
||||
}
|
||||
)
|
||||
tenant_ids = set(dataset.get("tenant_ids") or [])
|
||||
for tenant in tenants:
|
||||
if tenant.get("id") not in tenant_ids:
|
||||
errors.append(
|
||||
{
|
||||
"code": "tenant_missing_dataset_partition",
|
||||
"tenant_id": tenant.get("id"),
|
||||
"dataset_id": dataset.get("id"),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _check_user_management_and_access(
|
||||
payloads: dict[str, dict[str, Any]],
|
||||
errors: list[dict[str, Any]],
|
||||
) -> None:
|
||||
evidence_ids = _ids_by_kind(payloads, "evidence")
|
||||
policy_ids = _ids_by_kind(payloads, "policy")
|
||||
tenant_ids = _ids_by_kind(payloads, "tenant")
|
||||
for user in [payload for payload in payloads.values() if payload.get("kind") == "user"]:
|
||||
if not user.get("teams"):
|
||||
errors.append(
|
||||
{
|
||||
"code": "user_missing_team_membership",
|
||||
"artifact_id": user.get("id"),
|
||||
}
|
||||
)
|
||||
grants = user.get("access_grants") or []
|
||||
if not grants:
|
||||
errors.append(
|
||||
{
|
||||
"code": "user_missing_access_grant",
|
||||
"artifact_id": user.get("id"),
|
||||
}
|
||||
)
|
||||
for grant in grants:
|
||||
if grant.get("tenant_id") not in tenant_ids:
|
||||
errors.append(
|
||||
{
|
||||
"code": "access_grant_missing_tenant",
|
||||
"artifact_id": user.get("id"),
|
||||
"tenant_id": grant.get("tenant_id"),
|
||||
}
|
||||
)
|
||||
if grant.get("policy_id") not in policy_ids:
|
||||
errors.append(
|
||||
{
|
||||
"code": "access_grant_missing_policy",
|
||||
"artifact_id": user.get("id"),
|
||||
"policy_id": grant.get("policy_id"),
|
||||
}
|
||||
)
|
||||
if grant.get("evidence_id") not in evidence_ids:
|
||||
errors.append(
|
||||
{
|
||||
"code": "access_grant_missing_evidence",
|
||||
"artifact_id": user.get("id"),
|
||||
"evidence_id": grant.get("evidence_id"),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _check_governance_evidence(
|
||||
payloads: dict[str, dict[str, Any]],
|
||||
errors: list[dict[str, Any]],
|
||||
) -> None:
|
||||
evidence_ids = _ids_by_kind(payloads, "evidence")
|
||||
for kind in ("policy", "control", "incident", "service", "dataset", "deployment", "task"):
|
||||
for payload in [item for item in payloads.values() if item.get("kind") == kind]:
|
||||
ids = set(payload.get("evidence_ids") or [])
|
||||
if not ids:
|
||||
errors.append(
|
||||
{
|
||||
"code": "artifact_missing_evidence",
|
||||
"artifact_id": payload.get("id"),
|
||||
"kind": kind,
|
||||
}
|
||||
)
|
||||
for evidence_id in ids:
|
||||
if evidence_id not in evidence_ids:
|
||||
errors.append(
|
||||
{
|
||||
"code": "artifact_missing_evidence_target",
|
||||
"artifact_id": payload.get("id"),
|
||||
"evidence_id": evidence_id,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _kind_counts(payloads: dict[str, dict[str, Any]]) -> dict[str, int]:
|
||||
counts: dict[str, int] = {}
|
||||
for payload in payloads.values():
|
||||
kind = str(payload.get("kind") or "unknown")
|
||||
counts[kind] = counts.get(kind, 0) + 1
|
||||
return dict(sorted(counts.items()))
|
||||
|
||||
|
||||
def _one_kind(payloads: dict[str, dict[str, Any]], kind: str) -> dict[str, Any] | None:
|
||||
for payload in payloads.values():
|
||||
if payload.get("kind") == kind:
|
||||
return payload
|
||||
return None
|
||||
|
||||
|
||||
def _exists_kind(
|
||||
payloads: dict[str, dict[str, Any]],
|
||||
artifact_id: Any,
|
||||
kind: str,
|
||||
) -> bool:
|
||||
payload = payloads.get(str(artifact_id))
|
||||
return bool(payload and payload.get("kind") == kind)
|
||||
|
||||
|
||||
def _ids_by_kind(payloads: dict[str, dict[str, Any]], kind: str) -> set[str]:
|
||||
return {
|
||||
str(payload["id"])
|
||||
for payload in payloads.values()
|
||||
if payload.get("kind") == kind and payload.get("id")
|
||||
}
|
||||
Reference in New Issue
Block a user