Files
markitect-tool/tests/test_policy_gateway.py

272 lines
8.2 KiB
Python

import json
from pathlib import Path
from click.testing import CliRunner
from markitect_tool.cli import main
from markitect_tool.policy import (
DirectoryGroupResolution,
DirectoryGroupResolutionRequest,
EnterpriseIdentity,
EnterprisePolicyMapRequest,
LocalLabelPolicy,
LocalLabelPolicyGateway,
)
POLICY_TEXT = """id: example-policy
mode: enforce
default_labels: [public]
default_subject: public-agent
subjects:
public-agent:
allowed_labels: [public]
trust_zones: [public]
internal-agent:
allowed_labels: [public, internal]
trust_zones: [public, internal]
path_rules:
- id: private-path
pattern: private/**
labels: [internal]
trust_zone: internal
"""
def test_local_label_policy_authorizes_by_labels_and_path_rules():
gateway = LocalLabelPolicyGateway(LocalLabelPolicy.from_mapping(_policy_mapping()))
public = gateway.authorize(
"public-agent",
"query",
"public.md",
context={"object": {"path": "public.md", "labels": ["public"], "trust_zone": "public"}},
)
private = gateway.authorize(
"public-agent",
"query",
"private/doc.md",
context={"object": {"path": "private/doc.md"}},
)
internal = gateway.authorize(
"internal-agent",
"query",
"private/doc.md",
context={"object": {"path": "private/doc.md"}},
)
assert public["allowed"] is True
assert private["allowed"] is False
assert private["effect"] == "deny"
assert "lacks labels" in private["reason"]
assert internal["allowed"] is True
def test_policy_filter_can_redact_denied_results():
policy = LocalLabelPolicy.from_mapping(_policy_mapping() | {"on_denied": "redact"})
gateway = LocalLabelPolicyGateway(policy)
result = gateway.filter_results(
"public-agent",
"search",
[
{"path": "public.md", "text": "Visible", "policy": {"labels": ["public"]}},
{"path": "private/doc.md", "text": "Secret"},
],
)
assert result["policy"]["redacted"] == 1
assert len(result["results"]) == 2
assert result["results"][1]["text"] == "[redacted by policy]"
assert result["diagnostics"][0]["code"] == "policy.result.redacted"
def test_policy_audit_mode_keeps_results_but_records_would_deny():
gateway = LocalLabelPolicyGateway(LocalLabelPolicy.from_mapping(_policy_mapping()), mode="audit")
result = gateway.filter_results(
"public-agent",
"query",
[{"path": "private/doc.md", "text": "Internal"}],
)
assert len(result["results"]) == 1
assert result["policy"]["audit_denied"] == 1
assert result["results"][0]["policy"]["effect"] == "audit_denied"
def test_mkt_policy_check_reports_denied_decision(tmp_path: Path):
policy_file = tmp_path / "policy.yaml"
policy_file.write_text(POLICY_TEXT, encoding="utf-8")
result = CliRunner().invoke(
main,
[
"policy",
"check",
"public-agent",
"query",
"private/doc.md",
"--policy",
str(policy_file),
"--path",
"private/doc.md",
],
)
assert result.exit_code == 1
assert "denied" in result.output
assert "lacks labels" in result.output
def test_mkt_search_filters_local_index_results_by_policy(tmp_path: Path):
policy_file = tmp_path / "policy.yaml"
private_dir = tmp_path / "private"
private_dir.mkdir()
policy_file.write_text(POLICY_TEXT, encoding="utf-8")
(tmp_path / "public.md").write_text("# Public\n\nKnowledge for everyone.\n", encoding="utf-8")
(private_dir / "restricted.md").write_text(
"# Restricted\n\nKnowledge for internal work.\n",
encoding="utf-8",
)
runner = CliRunner()
indexed = runner.invoke(main, ["cache", "index", str(tmp_path), "--root", str(tmp_path)])
result = runner.invoke(
main,
[
"search",
"Knowledge",
"--root",
str(tmp_path),
"--policy",
str(policy_file),
"--subject",
"public-agent",
"--format",
"json",
],
)
data = json.loads(result.output)
assert indexed.exit_code == 0
assert result.exit_code == 0
assert data["count"] >= 1
assert all("private/restricted.md" != match["path"] for match in data["matches"])
assert data["policy"]["denied"] >= 1
def test_mkt_cache_query_filters_indexed_documents_by_policy(tmp_path: Path):
policy_file = tmp_path / "policy.yaml"
private_dir = tmp_path / "private"
private_dir.mkdir()
policy_file.write_text(POLICY_TEXT, encoding="utf-8")
(tmp_path / "public.md").write_text("# Public\n\n## Decision\n\nShare it.\n", encoding="utf-8")
(private_dir / "restricted.md").write_text(
"# Restricted\n\n## Decision\n\nKeep it internal.\n",
encoding="utf-8",
)
runner = CliRunner()
indexed = runner.invoke(main, ["cache", "index", str(tmp_path), "--root", str(tmp_path)])
result = runner.invoke(
main,
[
"cache",
"query",
"sections[heading=Decision]",
"--root",
str(tmp_path),
"--policy",
str(policy_file),
"--subject",
"public-agent",
"--format",
"json",
],
)
data = json.loads(result.output)
assert indexed.exit_code == 0
assert result.exit_code == 0
assert data["count"] == 1
assert data["matches"][0]["source_path"] == "public.md"
assert data["policy"]["denied"] == 1
def test_enterprise_identity_maps_to_policy_subject():
identity = EnterpriseIdentity(
issuer="https://sso.example.test/realms/netkingdom",
subject="user-123",
preferred_username="ada",
roles=["viewer"],
scopes=["markitect:read"],
groups=["/markitect/readers"],
assurance={"mfa": True},
directory={"source": "keycloak"},
)
subject = identity.to_policy_subject(
allowed_labels=["public", "internal"],
trust_zones=["public", "internal"],
allowed_actions=["query", "search"],
)
assert subject.id == "oidc:https://sso.example.test/realms/netkingdom#user-123"
assert subject.roles == ["viewer"]
assert subject.allowed_labels == ["public", "internal"]
assert subject.allowed_actions == ["query", "search"]
assert subject.attributes["issuer"] == "https://sso.example.test/realms/netkingdom"
assert subject.attributes["groups"] == ["/markitect/readers"]
assert subject.attributes["assurance"]["mfa"] is True
def test_enterprise_policy_adapter_requests_serialize_cleanly():
group_request = DirectoryGroupResolutionRequest(
subject_id="oidc:https://sso.example.test/realms/netkingdom#user-123",
issuer="https://sso.example.test/realms/netkingdom",
claims={"hasgroups": True},
)
group_result = DirectoryGroupResolution(
groups=["/markitect/readers"],
source="keycloak",
refreshed_at="2026-05-04T10:00:00Z",
overage=True,
)
map_request = EnterprisePolicyMapRequest(
identity=EnterpriseIdentity(
issuer="https://sso.example.test/realms/netkingdom",
subject="user-123",
),
policy_map={"groups": {"/markitect/readers": {"allowed_labels": ["internal"]}}},
groups=group_result.groups,
)
assert group_request.to_dict()["claims"] == {"hasgroups": True}
assert group_result.to_dict()["overage"] is True
assert map_request.to_dict()["identity"]["canonical_id"].endswith("#user-123")
def _policy_mapping() -> dict:
return {
"id": "example-policy",
"mode": "enforce",
"default_labels": ["public"],
"default_subject": "public-agent",
"subjects": {
"public-agent": {"allowed_labels": ["public"], "trust_zones": ["public"]},
"internal-agent": {
"allowed_labels": ["public", "internal"],
"trust_zones": ["public", "internal"],
},
},
"path_rules": [
{
"id": "private-path",
"pattern": "private/**",
"labels": ["internal"],
"trust_zone": "internal",
}
],
}