import json from pathlib import Path from click.testing import CliRunner from markitect_tool.cli import main from markitect_tool.policy import ( DirectoryGroupResolution, DirectoryGroupResolutionRequest, EnterprisePolicyError, EnterprisePolicyMap, EnterpriseIdentity, EnterprisePolicyMapRequest, FlexAuthResourceManifest, LocalDecisionLogStore, LocalEnterprisePolicyMapper, LocalLabelPolicy, LocalLabelPolicyGateway, NetKingdomIdentityClaimsAdapter, StaticDirectoryGroupResolver, ) from markitect_tool.policy.models import PolicyDecision POLICY_TEXT = """id: example-policy mode: enforce default_labels: [public] default_subject: public-agent subjects: public-agent: allowed_labels: [public] trust_zones: [public] internal-agent: allowed_labels: [public, internal] trust_zones: [public, internal] path_rules: - id: private-path pattern: private/** labels: [internal] trust_zone: internal """ def test_local_label_policy_authorizes_by_labels_and_path_rules(): gateway = LocalLabelPolicyGateway(LocalLabelPolicy.from_mapping(_policy_mapping())) public = gateway.authorize( "public-agent", "query", "public.md", context={"object": {"path": "public.md", "labels": ["public"], "trust_zone": "public"}}, ) private = gateway.authorize( "public-agent", "query", "private/doc.md", context={"object": {"path": "private/doc.md"}}, ) internal = gateway.authorize( "internal-agent", "query", "private/doc.md", context={"object": {"path": "private/doc.md"}}, ) assert public["allowed"] is True assert private["allowed"] is False assert private["effect"] == "deny" assert "lacks labels" in private["reason"] assert internal["allowed"] is True def test_policy_filter_can_redact_denied_results(): policy = LocalLabelPolicy.from_mapping(_policy_mapping() | {"on_denied": "redact"}) gateway = LocalLabelPolicyGateway(policy) result = gateway.filter_results( "public-agent", "search", [ {"path": "public.md", "text": "Visible", "policy": {"labels": ["public"]}}, {"path": "private/doc.md", "text": "Secret"}, ], ) assert result["policy"]["redacted"] == 1 assert len(result["results"]) == 2 assert result["results"][1]["text"] == "[redacted by policy]" assert result["diagnostics"][0]["code"] == "policy.result.redacted" def test_policy_audit_mode_keeps_results_but_records_would_deny(): gateway = LocalLabelPolicyGateway(LocalLabelPolicy.from_mapping(_policy_mapping()), mode="audit") result = gateway.filter_results( "public-agent", "query", [{"path": "private/doc.md", "text": "Internal"}], ) assert len(result["results"]) == 1 assert result["policy"]["audit_denied"] == 1 assert result["results"][0]["policy"]["effect"] == "audit_denied" def test_mkt_policy_check_reports_denied_decision(tmp_path: Path): policy_file = tmp_path / "policy.yaml" policy_file.write_text(POLICY_TEXT, encoding="utf-8") result = CliRunner().invoke( main, [ "policy", "check", "public-agent", "query", "private/doc.md", "--policy", str(policy_file), "--path", "private/doc.md", ], ) assert result.exit_code == 1 assert "denied" in result.output assert "lacks labels" in result.output def test_mkt_search_filters_local_index_results_by_policy(tmp_path: Path): policy_file = tmp_path / "policy.yaml" private_dir = tmp_path / "private" private_dir.mkdir() policy_file.write_text(POLICY_TEXT, encoding="utf-8") (tmp_path / "public.md").write_text("# Public\n\nKnowledge for everyone.\n", encoding="utf-8") (private_dir / "restricted.md").write_text( "# Restricted\n\nKnowledge for internal work.\n", encoding="utf-8", ) runner = CliRunner() indexed = runner.invoke(main, ["cache", "index", str(tmp_path), "--root", str(tmp_path)]) result = runner.invoke( main, [ "search", "Knowledge", "--root", str(tmp_path), "--policy", str(policy_file), "--subject", "public-agent", "--format", "json", ], ) data = json.loads(result.output) assert indexed.exit_code == 0 assert result.exit_code == 0 assert data["count"] >= 1 assert all("private/restricted.md" != match["path"] for match in data["matches"]) assert data["policy"]["denied"] >= 1 def test_mkt_cache_query_filters_indexed_documents_by_policy(tmp_path: Path): policy_file = tmp_path / "policy.yaml" private_dir = tmp_path / "private" private_dir.mkdir() policy_file.write_text(POLICY_TEXT, encoding="utf-8") (tmp_path / "public.md").write_text("# Public\n\n## Decision\n\nShare it.\n", encoding="utf-8") (private_dir / "restricted.md").write_text( "# Restricted\n\n## Decision\n\nKeep it internal.\n", encoding="utf-8", ) runner = CliRunner() indexed = runner.invoke(main, ["cache", "index", str(tmp_path), "--root", str(tmp_path)]) result = runner.invoke( main, [ "cache", "query", "sections[heading=Decision]", "--root", str(tmp_path), "--policy", str(policy_file), "--subject", "public-agent", "--format", "json", ], ) data = json.loads(result.output) assert indexed.exit_code == 0 assert result.exit_code == 0 assert data["count"] == 1 assert data["matches"][0]["source_path"] == "public.md" assert data["policy"]["denied"] == 1 def test_enterprise_identity_maps_to_policy_subject(): identity = EnterpriseIdentity( issuer="https://sso.example.test/realms/netkingdom", subject="user-123", preferred_username="ada", roles=["viewer"], scopes=["markitect:read"], groups=["/markitect/readers"], assurance={"mfa": True}, directory={"source": "keycloak"}, ) subject = identity.to_policy_subject( allowed_labels=["public", "internal"], trust_zones=["public", "internal"], allowed_actions=["query", "search"], ) assert subject.id == "oidc:https://sso.example.test/realms/netkingdom#user-123" assert subject.roles == ["viewer"] assert subject.allowed_labels == ["public", "internal"] assert subject.allowed_actions == ["query", "search"] assert subject.attributes["issuer"] == "https://sso.example.test/realms/netkingdom" assert subject.attributes["groups"] == ["/markitect/readers"] assert subject.attributes["assurance"]["mfa"] is True def test_enterprise_policy_adapter_requests_serialize_cleanly(): group_request = DirectoryGroupResolutionRequest( subject_id="oidc:https://sso.example.test/realms/netkingdom#user-123", issuer="https://sso.example.test/realms/netkingdom", claims={"hasgroups": True}, ) group_result = DirectoryGroupResolution( groups=["/markitect/readers"], source="keycloak", refreshed_at="2026-05-04T10:00:00Z", overage=True, ) map_request = EnterprisePolicyMapRequest( identity=EnterpriseIdentity( issuer="https://sso.example.test/realms/netkingdom", subject="user-123", ), policy_map={"groups": {"/markitect/readers": {"allowed_labels": ["internal"]}}}, groups=group_result.groups, ) assert group_request.to_dict()["claims"] == {"hasgroups": True} assert group_result.to_dict()["overage"] is True assert map_request.to_dict()["identity"]["canonical_id"].endswith("#user-123") def test_netkingdom_claims_adapter_validates_required_claims_and_audience(): adapter = NetKingdomIdentityClaimsAdapter( issuer="https://sso.example.test/realms/netkingdom", audiences=["markitect-tool"], ) identity = adapter.verify(_claims()) assert identity.canonical_id == "oidc:https://sso.example.test/realms/netkingdom#user-123" assert identity.roles == ["viewer"] assert identity.scopes == ["openid", "profile", "markitect:read"] assert identity.groups == ["/markitect/readers"] assert identity.assurance["mfa"] is True def test_netkingdom_claims_adapter_rejects_local_issuer_in_production(): adapter = NetKingdomIdentityClaimsAdapter(audiences=["markitect-tool"]) claims = _claims() | {"iss": "http://localhost:8080/realms/dev"} try: adapter.verify(claims, context={"environment": "production"}) except EnterprisePolicyError as exc: assert "Local development issuer" in str(exc) else: raise AssertionError("expected local production issuer rejection") def test_static_group_resolver_reports_group_overage_and_freshness(): request = DirectoryGroupResolutionRequest( subject_id="oidc:https://sso.example.test/realms/netkingdom#user-123", issuer="https://sso.example.test/realms/netkingdom", claims={"hasgroups": True}, ) resolver = StaticDirectoryGroupResolver( groups_by_subject={request.subject_id: ["/markitect/readers"]}, refreshed_at="2026-05-04T10:00:00Z", ) result = resolver.resolve(request) assert result.groups == ["/markitect/readers"] assert result.overage is True assert result.refreshed_at == "2026-05-04T10:00:00Z" def test_local_enterprise_policy_mapper_maps_groups_roles_and_scopes(): identity = NetKingdomIdentityClaimsAdapter( issuer="https://sso.example.test/realms/netkingdom", audiences=["markitect-tool"], ).verify(_claims()) policy_map = EnterprisePolicyMap.from_mapping(_enterprise_policy_map()) subject = LocalEnterprisePolicyMapper(policy_map).map_subject( EnterprisePolicyMapRequest(identity=identity) ) assert subject.allowed_labels == ["public", "internal"] assert subject.trust_zones == ["public", "internal"] assert subject.allowed_actions == ["read", "query", "search"] assert subject.attributes["matched_policy_rules"] == [ "group:/markitect/readers", "role:viewer", "scope:markitect:read", ] def test_flex_auth_resource_manifest_serializes_resources(): manifest = FlexAuthResourceManifest.from_mapping( { "id": "markitect-example", "system": "markitect-tool", "actions": ["read", "query"], "resources": [ { "id": "document:public-note", "type": "document", "path": "examples/policy/public-note.md", "labels": ["public"], "trust_zone": "public", } ], } ) data = manifest.to_dict() assert data["id"] == "markitect-example" assert data["resources"][0]["id"] == "document:public-note" def test_local_decision_log_store_redacts_token_context(tmp_path: Path): store = LocalDecisionLogStore(tmp_path / "decisions.jsonl") decision = PolicyDecision( subject="subject-1", action="query", object_id="document:internal", effect="deny", reason="missing label", ) audit_id = store.record(decision, context={"token": "secret", "policy_version": "v1"}) entry = store.get(decision.decision_id) assert audit_id.startswith("audit:") assert entry is not None assert entry["context"]["token"] == "" assert entry["context"]["policy_version"] == "v1" def test_mkt_policy_subject_maps_claims_file_to_subject(tmp_path: Path): claims_file = tmp_path / "claims.yaml" map_file = tmp_path / "policy-map.yaml" claims_file.write_text(yaml_dump(_claims()), encoding="utf-8") map_file.write_text(yaml_dump(_enterprise_policy_map()), encoding="utf-8") result = CliRunner().invoke( main, [ "policy", "subject", str(claims_file), "--policy-map", str(map_file), "--format", "json", ], ) data = json.loads(result.output) assert result.exit_code == 0 assert data["subject"]["allowed_labels"] == ["public", "internal"] assert data["subject"]["allowed_actions"] == ["read", "query", "search"] def test_mkt_policy_resource_manifest_inspects_manifest(tmp_path: Path): manifest_file = tmp_path / "resources.yaml" manifest_file.write_text( yaml_dump( { "id": "manifest-1", "system": "markitect-tool", "actions": ["read"], "resources": [{"id": "document:one", "type": "document"}], } ), encoding="utf-8", ) result = CliRunner().invoke( main, ["policy", "resource-manifest", str(manifest_file), "--format", "json"], ) data = json.loads(result.output) assert result.exit_code == 0 assert data["manifest"]["resources"][0]["id"] == "document:one" def yaml_dump(value: dict) -> str: import yaml return yaml.safe_dump(value, sort_keys=False) def _claims() -> dict: return { "iss": "https://sso.example.test/realms/netkingdom", "sub": "user-123", "aud": ["markitect-tool"], "exp": 4102444800, "iat": 1767225600, "preferred_username": "ada", "scope": "openid profile markitect:read", "realm_access": {"roles": ["viewer"]}, "groups": ["/markitect/readers"], "amr": ["pwd", "otp"], } def _enterprise_policy_map() -> dict: return { "id": "markitect-enterprise-policy-map", "issuer": "https://sso.example.test/realms/netkingdom", "audiences": ["markitect-tool"], "defaults": {"allowed_labels": ["public"], "trust_zones": ["public"]}, "groups": { "/markitect/readers": { "allowed_labels": ["internal"], "trust_zones": ["internal"], "actions": ["read", "query", "search"], } }, "roles": {"viewer": {"actions": ["read", "query", "search"]}}, "scopes": {"markitect:read": {"actions": ["read", "query", "search"]}}, "trust_zones": {"internal": {"required_groups": ["/markitect/readers"]}}, } def _policy_mapping() -> dict: return { "id": "example-policy", "mode": "enforce", "default_labels": ["public"], "default_subject": "public-agent", "subjects": { "public-agent": {"allowed_labels": ["public"], "trust_zones": ["public"]}, "internal-agent": { "allowed_labels": ["public", "internal"], "trust_zones": ["public", "internal"], }, }, "path_rules": [ { "id": "private-path", "pattern": "private/**", "labels": ["internal"], "trust_zone": "internal", } ], }