generated from coulomb/repo-seed
Access controlled knowledge gateway functionality
This commit is contained in:
@@ -123,6 +123,20 @@ from markitect_tool.processor import (
|
||||
discover_fenced_processors,
|
||||
run_fenced_processors,
|
||||
)
|
||||
from markitect_tool.policy import (
|
||||
LocalLabelPolicy,
|
||||
LocalLabelPolicyGateway,
|
||||
LocalPathPolicyRule,
|
||||
PolicyDecision,
|
||||
PolicyFilterResult,
|
||||
PolicyObject,
|
||||
PolicySubject,
|
||||
RelationshipPolicyAdapter,
|
||||
RelationshipPolicyRequest,
|
||||
RulePolicyAdapter,
|
||||
RulePolicyRequest,
|
||||
policy_metadata_from_document,
|
||||
)
|
||||
from markitect_tool.query import (
|
||||
InvalidQueryError,
|
||||
QueryMatch,
|
||||
@@ -294,6 +308,18 @@ __all__ = [
|
||||
"default_processor_registry",
|
||||
"discover_fenced_processors",
|
||||
"run_fenced_processors",
|
||||
"LocalLabelPolicy",
|
||||
"LocalLabelPolicyGateway",
|
||||
"LocalPathPolicyRule",
|
||||
"PolicyDecision",
|
||||
"PolicyFilterResult",
|
||||
"PolicyObject",
|
||||
"PolicySubject",
|
||||
"RelationshipPolicyAdapter",
|
||||
"RelationshipPolicyRequest",
|
||||
"RulePolicyAdapter",
|
||||
"RulePolicyRequest",
|
||||
"policy_metadata_from_document",
|
||||
"InvalidQueryError",
|
||||
"QueryMatch",
|
||||
"extract_document",
|
||||
|
||||
@@ -294,6 +294,32 @@ class LocalSnapshotStore:
|
||||
for row in rows
|
||||
]
|
||||
|
||||
def search_with_policy(
|
||||
self,
|
||||
query: str,
|
||||
*,
|
||||
subject: str,
|
||||
gateway: Any,
|
||||
action: str = "search",
|
||||
limit: int = 20,
|
||||
context: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Search and apply a policy gateway before returning result rows."""
|
||||
|
||||
matches = []
|
||||
for result in self.search(query, limit=limit):
|
||||
item = result.to_dict()
|
||||
item["policy"] = self.policy_metadata(result.path)
|
||||
matches.append(item)
|
||||
return gateway.filter_results(subject, action, matches, context=context)
|
||||
|
||||
def policy_metadata(self, path: str) -> dict[str, Any]:
|
||||
"""Return document-derived policy metadata for an indexed source path."""
|
||||
|
||||
from markitect_tool.policy import policy_metadata_from_document
|
||||
|
||||
return policy_metadata_from_document(self.get_document(path), path=path)
|
||||
|
||||
def build(
|
||||
self,
|
||||
paths: list[str | Path],
|
||||
|
||||
@@ -52,6 +52,7 @@ from markitect_tool.generation import (
|
||||
from markitect_tool.literate import tangle_markdown, weave_markdown, write_tangle_files
|
||||
from markitect_tool.ops import IncludeError, compose_files, resolve_includes, transform_markdown
|
||||
from markitect_tool.processor import ProcessorContext, run_fenced_processors
|
||||
from markitect_tool.policy import LocalLabelPolicyGateway
|
||||
from markitect_tool.query import (
|
||||
InvalidQueryError,
|
||||
extract_document,
|
||||
@@ -727,6 +728,69 @@ def backend_refresh_plan(
|
||||
raise click.exceptions.Exit(1 if plan.dirty else 0)
|
||||
|
||||
|
||||
@main.group()
|
||||
def policy() -> None:
|
||||
"""Check local access policy decisions."""
|
||||
|
||||
|
||||
@policy.command("check")
|
||||
@click.argument("subject")
|
||||
@click.argument("action")
|
||||
@click.argument("object_id")
|
||||
@click.option(
|
||||
"--policy",
|
||||
"policy_file",
|
||||
type=click.Path(exists=True, dir_okay=False, path_type=Path),
|
||||
help="Local label policy file.",
|
||||
)
|
||||
@click.option("--label", "labels", multiple=True, help="Object policy label. May be repeated.")
|
||||
@click.option("--path", "object_path", help="Object path for path ACL and path-label rules.")
|
||||
@click.option("--trust-zone", help="Object trust zone.")
|
||||
@click.option(
|
||||
"--policy-mode",
|
||||
type=click.Choice(["off", "audit", "enforce"], case_sensitive=False),
|
||||
help="Override policy mode for this check.",
|
||||
)
|
||||
@click.option(
|
||||
"--format",
|
||||
"output_format",
|
||||
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
|
||||
default="text",
|
||||
show_default=True,
|
||||
)
|
||||
def policy_check(
|
||||
subject: str,
|
||||
action: str,
|
||||
object_id: str,
|
||||
policy_file: Path | None,
|
||||
labels: tuple[str, ...],
|
||||
object_path: str | None,
|
||||
trust_zone: str | None,
|
||||
policy_mode: str | None,
|
||||
output_format: str,
|
||||
) -> None:
|
||||
"""Authorize one subject/action/object tuple with local label policy."""
|
||||
|
||||
try:
|
||||
gateway = _load_policy_gateway(policy_file, policy_mode) or LocalLabelPolicyGateway()
|
||||
decision = gateway.authorize(
|
||||
subject,
|
||||
action,
|
||||
object_id,
|
||||
context={
|
||||
"object": {
|
||||
"labels": list(labels),
|
||||
"path": object_path,
|
||||
"trust_zone": trust_zone,
|
||||
}
|
||||
},
|
||||
)
|
||||
except ValueError as exc:
|
||||
raise click.ClickException(str(exc)) from exc
|
||||
_emit_policy_result({"decision": decision}, output_format)
|
||||
raise click.exceptions.Exit(0 if decision.get("allowed") else 1)
|
||||
|
||||
|
||||
@main.group("class")
|
||||
def class_group() -> None:
|
||||
"""Resolve deterministic content classes."""
|
||||
@@ -1028,6 +1092,18 @@ def cache_index(
|
||||
multiple=True,
|
||||
help="Restrict query to one or more indexed relative paths.",
|
||||
)
|
||||
@click.option(
|
||||
"--policy",
|
||||
"policy_file",
|
||||
type=click.Path(exists=True, dir_okay=False, path_type=Path),
|
||||
help="Local label policy file used to filter results.",
|
||||
)
|
||||
@click.option("--subject", default="anonymous", help="Policy subject id.")
|
||||
@click.option(
|
||||
"--policy-mode",
|
||||
type=click.Choice(["off", "audit", "enforce"], case_sensitive=False),
|
||||
help="Override policy mode for this query.",
|
||||
)
|
||||
@click.option(
|
||||
"--engine",
|
||||
type=click.Choice(["selector", "jsonpath"], case_sensitive=False),
|
||||
@@ -1047,17 +1123,22 @@ def cache_query(
|
||||
root: Path,
|
||||
index_path: Path | None,
|
||||
paths: tuple[str, ...],
|
||||
policy_file: Path | None,
|
||||
subject: str,
|
||||
policy_mode: str | None,
|
||||
engine: str,
|
||||
output_format: str,
|
||||
) -> None:
|
||||
"""Run a selector or JSONPath query over indexed document snapshots."""
|
||||
|
||||
store = LocalSnapshotStore(local_index_path_for(root, index_path))
|
||||
policy_gateway = _load_policy_gateway(policy_file, policy_mode)
|
||||
indexed_paths = sorted(paths or [state.path for state in store.load_state()])
|
||||
all_matches = []
|
||||
try:
|
||||
for indexed_path in indexed_paths:
|
||||
document = Document.from_dict(store.get_document(indexed_path))
|
||||
policy_metadata = store.policy_metadata(indexed_path) if policy_gateway else {}
|
||||
matches = (
|
||||
query_document_jsonpath(document, selector)
|
||||
if engine == "jsonpath"
|
||||
@@ -1066,11 +1147,17 @@ def cache_query(
|
||||
for match in matches:
|
||||
item = match.to_dict()
|
||||
item["source_path"] = indexed_path
|
||||
if policy_metadata:
|
||||
item["policy"] = policy_metadata
|
||||
all_matches.append(item)
|
||||
except KeyError as exc:
|
||||
raise click.ClickException(str(exc)) from exc
|
||||
except InvalidQueryError as exc:
|
||||
raise click.ClickException(str(exc)) from exc
|
||||
policy_result = None
|
||||
if policy_gateway:
|
||||
policy_result = policy_gateway.filter_results(subject, "query", all_matches)
|
||||
all_matches = policy_result["results"]
|
||||
data = {
|
||||
"selector": selector,
|
||||
"engine": engine,
|
||||
@@ -1078,6 +1165,10 @@ def cache_query(
|
||||
"count": len(all_matches),
|
||||
"matches": all_matches,
|
||||
}
|
||||
if policy_result:
|
||||
data["policy"] = policy_result.get("policy")
|
||||
data["policy_decisions"] = policy_result.get("decisions")
|
||||
data["diagnostics"] = policy_result.get("diagnostics")
|
||||
_emit_query(data, output_format)
|
||||
|
||||
|
||||
@@ -1096,6 +1187,18 @@ def cache_query(
|
||||
help="SQLite index path. Defaults to .markitect/cache/index.sqlite3 under root.",
|
||||
)
|
||||
@click.option("--limit", type=int, default=20, show_default=True)
|
||||
@click.option(
|
||||
"--policy",
|
||||
"policy_file",
|
||||
type=click.Path(exists=True, dir_okay=False, path_type=Path),
|
||||
help="Local label policy file used to filter results.",
|
||||
)
|
||||
@click.option("--subject", default="anonymous", help="Policy subject id.")
|
||||
@click.option(
|
||||
"--policy-mode",
|
||||
type=click.Choice(["off", "audit", "enforce"], case_sensitive=False),
|
||||
help="Override policy mode for this search.",
|
||||
)
|
||||
@click.option(
|
||||
"--format",
|
||||
"output_format",
|
||||
@@ -1108,21 +1211,39 @@ def search(
|
||||
root: Path,
|
||||
index_path: Path | None,
|
||||
limit: int,
|
||||
policy_file: Path | None,
|
||||
subject: str,
|
||||
policy_mode: str | None,
|
||||
output_format: str,
|
||||
) -> None:
|
||||
"""Search the local SQLite index with FTS5."""
|
||||
|
||||
try:
|
||||
store = LocalSnapshotStore(local_index_path_for(root, index_path))
|
||||
results = store.search(text, limit=limit)
|
||||
policy_gateway = _load_policy_gateway(policy_file, policy_mode)
|
||||
if policy_gateway:
|
||||
policy_result = store.search_with_policy(
|
||||
text,
|
||||
subject=subject,
|
||||
gateway=policy_gateway,
|
||||
limit=limit,
|
||||
)
|
||||
matches = policy_result["results"]
|
||||
else:
|
||||
policy_result = None
|
||||
matches = [result.to_dict() for result in store.search(text, limit=limit)]
|
||||
except ValueError as exc:
|
||||
raise click.ClickException(str(exc)) from exc
|
||||
data = {
|
||||
"query": text,
|
||||
"index_path": str(local_index_path_for(root, index_path)),
|
||||
"count": len(results),
|
||||
"matches": [result.to_dict() for result in results],
|
||||
"count": len(matches),
|
||||
"matches": matches,
|
||||
}
|
||||
if policy_result:
|
||||
data["policy"] = policy_result.get("policy")
|
||||
data["policy_decisions"] = policy_result.get("decisions")
|
||||
data["diagnostics"] = policy_result.get("diagnostics")
|
||||
_emit_search_results(data, output_format)
|
||||
|
||||
|
||||
@@ -1529,6 +1650,20 @@ def contract_form_state(
|
||||
raise click.exceptions.Exit(0 if form_state.valid else 1)
|
||||
|
||||
|
||||
def _load_policy_gateway(
|
||||
policy_file: Path | None,
|
||||
policy_mode: str | None,
|
||||
) -> LocalLabelPolicyGateway | None:
|
||||
if policy_file is None and policy_mode is None:
|
||||
return None
|
||||
try:
|
||||
if policy_file:
|
||||
return LocalLabelPolicyGateway.from_file(policy_file, mode=policy_mode)
|
||||
return LocalLabelPolicyGateway(mode=policy_mode)
|
||||
except ValueError as exc:
|
||||
raise click.ClickException(str(exc)) from exc
|
||||
|
||||
|
||||
def _emit_result(data: dict, output_format: str) -> None:
|
||||
if output_format == "json":
|
||||
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
|
||||
@@ -1588,6 +1723,19 @@ def _emit_form_state(data: dict, output_format: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
def _emit_policy_result(data: dict, output_format: str) -> None:
|
||||
if output_format == "json":
|
||||
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
|
||||
elif output_format == "yaml":
|
||||
click.echo(yaml.safe_dump(data, sort_keys=False))
|
||||
else:
|
||||
decision = data["decision"]
|
||||
click.echo("allowed" if decision.get("allowed") else "denied")
|
||||
click.echo(f"effect: {decision.get('effect')}")
|
||||
click.echo(f"decision_id: {decision.get('decision_id')}")
|
||||
click.echo(f"reason: {decision.get('reason')}")
|
||||
|
||||
|
||||
def _emit_metrics(data: dict, output_format: str) -> None:
|
||||
if output_format == "json":
|
||||
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
|
||||
@@ -1615,11 +1763,15 @@ def _emit_query(data: dict, output_format: str) -> None:
|
||||
click.echo(yaml.safe_dump(data, sort_keys=False))
|
||||
else:
|
||||
click.echo(f"{data['count']} match(es)")
|
||||
if data.get("policy"):
|
||||
_emit_policy_summary(data["policy"])
|
||||
for match in data["matches"]:
|
||||
location = f":{match['line']}" if match.get("line") else ""
|
||||
click.echo(f"- {match['kind']} {match['path']}{location}")
|
||||
if match.get("text"):
|
||||
click.echo(f" {match['text'].splitlines()[0]}")
|
||||
for diagnostic in data.get("diagnostics", []):
|
||||
click.echo(f"! [{diagnostic['severity']}] {diagnostic['code']}: {diagnostic['message']}")
|
||||
|
||||
|
||||
def _emit_extract(data: dict, output_format: str) -> None:
|
||||
@@ -1709,6 +1861,8 @@ def _emit_search_results(data: dict, output_format: str) -> None:
|
||||
click.echo(yaml.safe_dump(data, sort_keys=False))
|
||||
else:
|
||||
click.echo(f"{data['count']} match(es)")
|
||||
if data.get("policy"):
|
||||
_emit_policy_summary(data["policy"])
|
||||
for match in data["matches"]:
|
||||
span = ""
|
||||
if match.get("line_start"):
|
||||
@@ -1720,6 +1874,19 @@ def _emit_search_results(data: dict, output_format: str) -> None:
|
||||
preview = " ".join(str(match.get("text", "")).split())
|
||||
if preview:
|
||||
click.echo(f" {preview[:160]}")
|
||||
for diagnostic in data.get("diagnostics", []):
|
||||
click.echo(f"! [{diagnostic['severity']}] {diagnostic['code']}: {diagnostic['message']}")
|
||||
|
||||
|
||||
def _emit_policy_summary(policy_data: dict) -> None:
|
||||
click.echo(
|
||||
"policy: "
|
||||
f"mode={policy_data.get('mode')} "
|
||||
f"subject={policy_data.get('subject')} "
|
||||
f"allowed={policy_data.get('allowed', 0)} "
|
||||
f"denied={policy_data.get('denied', 0)} "
|
||||
f"redacted={policy_data.get('redacted', 0)}"
|
||||
)
|
||||
|
||||
|
||||
def _emit_workflow_result(data: dict, output_format: str) -> None:
|
||||
|
||||
@@ -17,6 +17,7 @@ def builtin_extension_registry() -> ExtensionRegistry:
|
||||
_runtime_context_descriptor(),
|
||||
_runtime_form_state_descriptor(),
|
||||
_runtime_assessment_descriptor(),
|
||||
_local_label_policy_descriptor(),
|
||||
]:
|
||||
registry.register(descriptor)
|
||||
return registry
|
||||
@@ -86,6 +87,7 @@ def _local_sqlite_backend_descriptor() -> ExtensionDescriptor:
|
||||
ProcessingCapability(id="fts", kind="backend"),
|
||||
ProcessingCapability(id="sql", kind="backend"),
|
||||
ProcessingCapability(id="provenance", kind="backend"),
|
||||
ProcessingCapability(id="policy_filter", kind="backend"),
|
||||
],
|
||||
safety={"reads_files": True, "writes_local_cache": True, "network": False},
|
||||
input_contract="Markdown files/directories",
|
||||
@@ -188,3 +190,37 @@ def _runtime_assessment_descriptor() -> ExtensionDescriptor:
|
||||
examples=["examples/runtime/concept-note-assessment.contract.md"],
|
||||
metadata={"provider_implementation": "external adapter required"},
|
||||
)
|
||||
|
||||
|
||||
def _local_label_policy_descriptor() -> ExtensionDescriptor:
|
||||
return ExtensionDescriptor(
|
||||
id="policy.local-label",
|
||||
kind="policy-gateway",
|
||||
summary="Local label, trust-zone, and path policy gateway.",
|
||||
capabilities=[
|
||||
ProcessingCapability(id="policy", kind="authorize"),
|
||||
ProcessingCapability(id="policy_filter", kind="filter"),
|
||||
ProcessingCapability(id="diagnostics", kind="emit"),
|
||||
ProcessingCapability(id="provenance", kind="emit"),
|
||||
],
|
||||
safety={"network": False, "external_policy_engine": False},
|
||||
input_contract="PolicySubject + PolicyObject + local label policy",
|
||||
output_contract="PolicyDecision | PolicyFilterResult",
|
||||
diagnostics_namespace="policy",
|
||||
provenance_prefix="policy.local_label",
|
||||
cli={
|
||||
"commands": [
|
||||
"mkt policy check",
|
||||
"mkt cache query --policy",
|
||||
"mkt search --policy",
|
||||
]
|
||||
},
|
||||
docs=["docs/access-control-policy-gateway.md"],
|
||||
examples=["examples/policy/local-label-policy.yaml"],
|
||||
metadata={
|
||||
"external_adapters": [
|
||||
"RelationshipPolicyAdapter",
|
||||
"RulePolicyAdapter",
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
35
src/markitect_tool/policy/__init__.py
Normal file
35
src/markitect_tool/policy/__init__.py
Normal file
@@ -0,0 +1,35 @@
|
||||
"""Access policy gateways and adapter protocols."""
|
||||
|
||||
from markitect_tool.policy.adapters import (
|
||||
RelationshipPolicyAdapter,
|
||||
RelationshipPolicyRequest,
|
||||
RulePolicyAdapter,
|
||||
RulePolicyRequest,
|
||||
)
|
||||
from markitect_tool.policy.local import (
|
||||
LocalLabelPolicy,
|
||||
LocalLabelPolicyGateway,
|
||||
LocalPathPolicyRule,
|
||||
policy_metadata_from_document,
|
||||
)
|
||||
from markitect_tool.policy.models import (
|
||||
PolicyDecision,
|
||||
PolicyFilterResult,
|
||||
PolicyObject,
|
||||
PolicySubject,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"LocalLabelPolicy",
|
||||
"LocalLabelPolicyGateway",
|
||||
"LocalPathPolicyRule",
|
||||
"PolicyDecision",
|
||||
"PolicyFilterResult",
|
||||
"PolicyObject",
|
||||
"PolicySubject",
|
||||
"RelationshipPolicyAdapter",
|
||||
"RelationshipPolicyRequest",
|
||||
"RulePolicyAdapter",
|
||||
"RulePolicyRequest",
|
||||
"policy_metadata_from_document",
|
||||
]
|
||||
65
src/markitect_tool/policy/adapters.py
Normal file
65
src/markitect_tool/policy/adapters.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""Protocol boundaries for external authorization engines."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from typing import Any, Protocol
|
||||
|
||||
from markitect_tool.policy.models import PolicyDecision
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RelationshipPolicyRequest:
|
||||
"""Relationship-based authorization request.
|
||||
|
||||
This maps cleanly to Zanzibar/OpenFGA/SpiceDB-style checks without binding
|
||||
Markitect core to one service or tuple schema.
|
||||
"""
|
||||
|
||||
subject: str
|
||||
relation: str
|
||||
object_id: str
|
||||
namespace: str | None = None
|
||||
context: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return _drop_empty(asdict(self))
|
||||
|
||||
|
||||
class RelationshipPolicyAdapter(Protocol):
|
||||
"""Adapter boundary for relationship authorization systems."""
|
||||
|
||||
def check(self, request: RelationshipPolicyRequest) -> PolicyDecision | dict[str, Any]:
|
||||
"""Return a policy decision for a relationship check."""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RulePolicyRequest:
|
||||
"""Attribute/rule policy evaluation request.
|
||||
|
||||
This can be mapped to OPA/Rego, Cedar, or local policy-as-data engines.
|
||||
"""
|
||||
|
||||
subject: dict[str, Any]
|
||||
action: str
|
||||
object: dict[str, Any]
|
||||
context: dict[str, Any] = field(default_factory=dict)
|
||||
policy_id: str | None = None
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return _drop_empty(asdict(self))
|
||||
|
||||
|
||||
class RulePolicyAdapter(Protocol):
|
||||
"""Adapter boundary for rule/attribute policy systems."""
|
||||
|
||||
def evaluate(self, request: RulePolicyRequest) -> PolicyDecision | dict[str, Any]:
|
||||
"""Return a policy decision for a rule evaluation."""
|
||||
|
||||
|
||||
def _drop_empty(data: dict[str, Any]) -> dict[str, Any]:
|
||||
return {
|
||||
key: value
|
||||
for key, value in data.items()
|
||||
if value not in (None, [], {}, "")
|
||||
}
|
||||
482
src/markitect_tool/policy/local.py
Normal file
482
src/markitect_tool/policy/local.py
Normal file
@@ -0,0 +1,482 @@
|
||||
"""Local label policy gateway for cache, query, and context-package results."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import fnmatch
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
from markitect_tool.diagnostics import Diagnostic
|
||||
from markitect_tool.policy.models import (
|
||||
PolicyDecision,
|
||||
PolicyFilterResult,
|
||||
PolicyObject,
|
||||
PolicySubject,
|
||||
)
|
||||
|
||||
|
||||
POLICY_MODES = {"off", "audit", "enforce"}
|
||||
DENIED_BEHAVIOR = {"drop", "redact"}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class LocalPathPolicyRule:
|
||||
"""Path rule that can add labels, set trust zone, or deny directly."""
|
||||
|
||||
pattern: str
|
||||
labels: list[str] = field(default_factory=list)
|
||||
trust_zone: str | None = None
|
||||
deny: bool = False
|
||||
id: str | None = None
|
||||
|
||||
@classmethod
|
||||
def from_mapping(cls, raw: dict[str, Any], *, fallback_id: str) -> "LocalPathPolicyRule":
|
||||
return cls(
|
||||
pattern=str(raw.get("pattern") or raw.get("glob") or raw.get("path") or "*"),
|
||||
labels=_string_list(raw.get("labels") or raw.get("label")),
|
||||
trust_zone=raw.get("trust_zone") or raw.get("zone"),
|
||||
deny=bool(raw.get("deny", False)),
|
||||
id=raw.get("id") or fallback_id,
|
||||
)
|
||||
|
||||
def matches(self, path: str | None) -> bool:
|
||||
return bool(path) and fnmatch.fnmatch(path, self.pattern)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class LocalLabelPolicy:
|
||||
"""Declarative local policy for labels, trust zones, and path ACLs."""
|
||||
|
||||
id: str = "local-label-policy"
|
||||
mode: str = "enforce"
|
||||
default_labels: list[str] = field(default_factory=lambda: ["public"])
|
||||
default_trust_zone: str | None = None
|
||||
default_subject: str = "anonymous"
|
||||
on_denied: str = "drop"
|
||||
subjects: dict[str, PolicySubject] = field(default_factory=dict)
|
||||
path_rules: list[LocalPathPolicyRule] = field(default_factory=list)
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
@classmethod
|
||||
def from_mapping(cls, raw: dict[str, Any]) -> "LocalLabelPolicy":
|
||||
policy = raw.get("policy") if isinstance(raw.get("policy"), dict) else raw
|
||||
subjects = _subjects_from_mapping(policy.get("subjects"))
|
||||
default_subject = str(policy.get("default_subject", "anonymous"))
|
||||
if default_subject not in subjects:
|
||||
subjects[default_subject] = PolicySubject(
|
||||
id=default_subject,
|
||||
allowed_labels=_string_list(
|
||||
policy.get("default_allowed_labels") or policy.get("default_labels") or ["public"]
|
||||
),
|
||||
trust_zones=_string_list(policy.get("default_trust_zones")),
|
||||
)
|
||||
mode = str(policy.get("mode", "enforce")).strip().lower()
|
||||
if mode not in POLICY_MODES:
|
||||
mode = "enforce"
|
||||
on_denied = str(policy.get("on_denied", "drop")).strip().lower()
|
||||
if on_denied not in DENIED_BEHAVIOR:
|
||||
on_denied = "drop"
|
||||
return cls(
|
||||
id=str(policy.get("id", "local-label-policy")),
|
||||
mode=mode,
|
||||
default_labels=_string_list(policy.get("default_labels") or ["public"]),
|
||||
default_trust_zone=policy.get("default_trust_zone"),
|
||||
default_subject=default_subject,
|
||||
on_denied=on_denied,
|
||||
subjects=subjects,
|
||||
path_rules=_path_rules_from_value(policy.get("path_rules") or policy.get("paths")),
|
||||
metadata=dict(policy.get("metadata") or {}),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, path: str | Path) -> "LocalLabelPolicy":
|
||||
policy_path = Path(path)
|
||||
data = yaml.safe_load(policy_path.read_text(encoding="utf-8")) or {}
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("Policy file must contain a mapping.")
|
||||
return cls.from_mapping(data)
|
||||
|
||||
|
||||
class LocalLabelPolicyGateway:
|
||||
"""AccessPolicyGateway implementation for local label policies."""
|
||||
|
||||
gateway_id = "policy.local-label"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
policy: LocalLabelPolicy | dict[str, Any] | None = None,
|
||||
*,
|
||||
mode: str | None = None,
|
||||
) -> None:
|
||||
if isinstance(policy, LocalLabelPolicy):
|
||||
loaded = policy
|
||||
elif isinstance(policy, dict):
|
||||
loaded = LocalLabelPolicy.from_mapping(policy)
|
||||
else:
|
||||
loaded = LocalLabelPolicy()
|
||||
if mode:
|
||||
normalized = mode.strip().lower()
|
||||
if normalized not in POLICY_MODES:
|
||||
raise ValueError(f"Unsupported policy mode `{mode}`.")
|
||||
loaded = LocalLabelPolicy(
|
||||
id=loaded.id,
|
||||
mode=normalized,
|
||||
default_labels=loaded.default_labels,
|
||||
default_trust_zone=loaded.default_trust_zone,
|
||||
default_subject=loaded.default_subject,
|
||||
on_denied=loaded.on_denied,
|
||||
subjects=loaded.subjects,
|
||||
path_rules=loaded.path_rules,
|
||||
metadata=loaded.metadata,
|
||||
)
|
||||
self.policy = loaded
|
||||
self._decisions: dict[str, PolicyDecision] = {}
|
||||
|
||||
@classmethod
|
||||
def from_file(
|
||||
cls,
|
||||
path: str | Path,
|
||||
*,
|
||||
mode: str | None = None,
|
||||
) -> "LocalLabelPolicyGateway":
|
||||
return cls(LocalLabelPolicy.from_file(path), mode=mode)
|
||||
|
||||
def authorize(
|
||||
self,
|
||||
subject: str,
|
||||
action: str,
|
||||
object_id: str,
|
||||
context: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Authorize one action against one object."""
|
||||
|
||||
decision = self.decide(subject, action, object_id, context=context)
|
||||
return decision.to_dict()
|
||||
|
||||
def decide(
|
||||
self,
|
||||
subject: str,
|
||||
action: str,
|
||||
object_id: str,
|
||||
context: dict[str, Any] | None = None,
|
||||
) -> PolicyDecision:
|
||||
subject_model = self._subject(subject, context)
|
||||
object_model = self._object(object_id, context)
|
||||
decision = self._evaluate(subject_model, action, object_model)
|
||||
self._decisions[decision.decision_id] = decision
|
||||
return decision
|
||||
|
||||
def filter_results(
|
||||
self,
|
||||
subject: str,
|
||||
action: str,
|
||||
results: list[dict[str, Any]],
|
||||
context: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Filter results and return policy decisions."""
|
||||
|
||||
kept: list[dict[str, Any]] = []
|
||||
diagnostics: list[Diagnostic] = []
|
||||
decisions: list[PolicyDecision] = []
|
||||
for index, item in enumerate(results):
|
||||
object_id = _object_id_for_result(item, index)
|
||||
item_context = dict(context or {})
|
||||
item_context["result"] = item
|
||||
item_context.setdefault("object", {}).update(_policy_object_mapping(item))
|
||||
decision = self.decide(subject, action, object_id, context=item_context)
|
||||
decisions.append(decision)
|
||||
|
||||
annotated = _annotate_result(item, decision)
|
||||
if decision.effect == "allow" or decision.effect == "audit_denied":
|
||||
kept.append(annotated)
|
||||
elif decision.effect == "redact":
|
||||
kept.append(_redact_result(annotated))
|
||||
diagnostics.append(_denied_diagnostic(decision, redacted=True))
|
||||
else:
|
||||
diagnostics.append(_denied_diagnostic(decision))
|
||||
|
||||
result = PolicyFilterResult(
|
||||
results=kept,
|
||||
decisions=decisions,
|
||||
diagnostics=diagnostics,
|
||||
mode=self.policy.mode,
|
||||
subject=subject,
|
||||
action=action,
|
||||
)
|
||||
return result.to_dict()
|
||||
|
||||
def explain_decision(self, decision_id: str) -> dict[str, Any]:
|
||||
"""Explain one policy decision made by this gateway instance."""
|
||||
|
||||
try:
|
||||
return self._decisions[decision_id].to_dict()
|
||||
except KeyError as exc:
|
||||
raise KeyError(f"Unknown policy decision `{decision_id}`") from exc
|
||||
|
||||
def _subject(self, subject: str, context: dict[str, Any] | None) -> PolicySubject:
|
||||
if context and isinstance(context.get("subject"), dict):
|
||||
merged = self.policy.subjects.get(subject) or self.policy.subjects[self.policy.default_subject]
|
||||
override = context["subject"]
|
||||
return PolicySubject(
|
||||
id=subject,
|
||||
allowed_labels=_unique(merged.allowed_labels + _string_list(override.get("allowed_labels") or override.get("labels"))),
|
||||
trust_zones=_unique(merged.trust_zones + _string_list(override.get("trust_zones") or override.get("zones"))),
|
||||
roles=_unique(merged.roles + _string_list(override.get("roles"))),
|
||||
allowed_actions=_unique(merged.allowed_actions + _string_list(override.get("allowed_actions") or override.get("actions"))),
|
||||
path_allow=_unique(merged.path_allow + _string_list(override.get("path_allow") or override.get("allow_paths"))),
|
||||
path_deny=_unique(merged.path_deny + _string_list(override.get("path_deny") or override.get("deny_paths"))),
|
||||
attributes=merged.attributes | dict(override.get("attributes") or {}),
|
||||
)
|
||||
return self.policy.subjects.get(subject) or self.policy.subjects[self.policy.default_subject]
|
||||
|
||||
def _object(self, object_id: str, context: dict[str, Any] | None) -> PolicyObject:
|
||||
raw_object = context.get("object", {}) if context else {}
|
||||
result = context.get("result", {}) if context else {}
|
||||
path = raw_object.get("path") or result.get("source_path") or result.get("path")
|
||||
labels = _unique(
|
||||
self.policy.default_labels
|
||||
+ _string_list(raw_object.get("labels") or raw_object.get("label"))
|
||||
+ _string_list(result.get("labels"))
|
||||
)
|
||||
trust_zone = raw_object.get("trust_zone") or self.policy.default_trust_zone
|
||||
deny_by_path = False
|
||||
matched_rules: list[str] = []
|
||||
for rule in self.policy.path_rules:
|
||||
if not rule.matches(path):
|
||||
continue
|
||||
matched_rules.append(rule.id or rule.pattern)
|
||||
labels = _unique(labels + rule.labels)
|
||||
trust_zone = rule.trust_zone or trust_zone
|
||||
deny_by_path = deny_by_path or rule.deny
|
||||
attributes = dict(raw_object.get("attributes") or {})
|
||||
attributes["matched_path_rules"] = matched_rules
|
||||
attributes["deny_by_path_rule"] = deny_by_path
|
||||
return PolicyObject(
|
||||
id=object_id,
|
||||
path=path,
|
||||
labels=labels,
|
||||
trust_zone=trust_zone,
|
||||
attributes=attributes,
|
||||
)
|
||||
|
||||
def _evaluate(
|
||||
self,
|
||||
subject: PolicySubject,
|
||||
action: str,
|
||||
policy_object: PolicyObject,
|
||||
) -> PolicyDecision:
|
||||
if self.policy.mode == "off":
|
||||
return self._decision(subject, action, policy_object, "allow", "policy mode is off")
|
||||
|
||||
denial_reason, rule_id = self._denial_reason(subject, action, policy_object)
|
||||
if denial_reason is None:
|
||||
return self._decision(subject, action, policy_object, "allow", "label policy allowed")
|
||||
|
||||
if self.policy.mode == "audit":
|
||||
return self._decision(subject, action, policy_object, "audit_denied", denial_reason, rule_id)
|
||||
if self.policy.on_denied == "redact":
|
||||
return self._decision(subject, action, policy_object, "redact", denial_reason, rule_id)
|
||||
return self._decision(subject, action, policy_object, "deny", denial_reason, rule_id)
|
||||
|
||||
def _denial_reason(
|
||||
self,
|
||||
subject: PolicySubject,
|
||||
action: str,
|
||||
policy_object: PolicyObject,
|
||||
) -> tuple[str | None, str | None]:
|
||||
if policy_object.attributes.get("deny_by_path_rule"):
|
||||
return "object path is denied by local path policy", "path.deny"
|
||||
if subject.allowed_actions and action not in subject.allowed_actions:
|
||||
return f"subject `{subject.id}` is not allowed to perform `{action}`", "subject.action"
|
||||
if policy_object.path:
|
||||
if any(fnmatch.fnmatch(policy_object.path, pattern) for pattern in subject.path_deny):
|
||||
return "object path is denied for subject", "subject.path_deny"
|
||||
if subject.path_allow and not any(
|
||||
fnmatch.fnmatch(policy_object.path, pattern) for pattern in subject.path_allow
|
||||
):
|
||||
return "object path is outside subject allow list", "subject.path_allow"
|
||||
missing_labels = sorted(set(policy_object.labels) - set(subject.allowed_labels))
|
||||
if missing_labels:
|
||||
return (
|
||||
f"subject `{subject.id}` lacks labels {missing_labels}",
|
||||
"labels",
|
||||
)
|
||||
if policy_object.trust_zone and subject.trust_zones and policy_object.trust_zone not in subject.trust_zones:
|
||||
return (
|
||||
f"subject `{subject.id}` is outside trust zone `{policy_object.trust_zone}`",
|
||||
"trust_zone",
|
||||
)
|
||||
return None, None
|
||||
|
||||
def _decision(
|
||||
self,
|
||||
subject: PolicySubject,
|
||||
action: str,
|
||||
policy_object: PolicyObject,
|
||||
effect: str,
|
||||
reason: str,
|
||||
rule_id: str | None = None,
|
||||
) -> PolicyDecision:
|
||||
return PolicyDecision(
|
||||
subject=subject.id,
|
||||
action=action,
|
||||
object_id=policy_object.id,
|
||||
effect=effect,
|
||||
reason=reason,
|
||||
mode=self.policy.mode,
|
||||
rule_id=rule_id,
|
||||
labels=policy_object.labels,
|
||||
trust_zone=policy_object.trust_zone,
|
||||
metadata={"path": policy_object.path, "policy_id": self.policy.id},
|
||||
)
|
||||
|
||||
|
||||
def policy_metadata_from_document(
|
||||
document: dict[str, Any],
|
||||
*,
|
||||
path: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Extract stable policy metadata from parsed document frontmatter."""
|
||||
|
||||
frontmatter = document.get("frontmatter", {}) if isinstance(document, dict) else {}
|
||||
policy = frontmatter.get("policy") if isinstance(frontmatter.get("policy"), dict) else {}
|
||||
labels = _unique(
|
||||
_string_list(policy.get("labels") or policy.get("label"))
|
||||
+ _string_list(frontmatter.get("labels") or frontmatter.get("label"))
|
||||
+ _string_list(frontmatter.get("classification"))
|
||||
)
|
||||
data = {
|
||||
"path": path or document.get("source_path"),
|
||||
"labels": labels,
|
||||
"trust_zone": policy.get("trust_zone") or policy.get("zone") or frontmatter.get("trust_zone"),
|
||||
"attributes": {
|
||||
"document_type": frontmatter.get("document_type") or frontmatter.get("type"),
|
||||
"title": frontmatter.get("title"),
|
||||
},
|
||||
}
|
||||
return {key: value for key, value in data.items() if value not in (None, [], {})}
|
||||
|
||||
|
||||
def _subjects_from_mapping(value: Any) -> dict[str, PolicySubject]:
|
||||
if not isinstance(value, dict):
|
||||
return {}
|
||||
return {
|
||||
str(subject_id): PolicySubject(
|
||||
id=str(subject_id),
|
||||
allowed_labels=_string_list(raw.get("allowed_labels") or raw.get("labels") or raw.get("clearance"))
|
||||
if isinstance(raw, dict)
|
||||
else [],
|
||||
trust_zones=_string_list(raw.get("trust_zones") or raw.get("zones"))
|
||||
if isinstance(raw, dict)
|
||||
else [],
|
||||
roles=_string_list(raw.get("roles")) if isinstance(raw, dict) else [],
|
||||
allowed_actions=_string_list(raw.get("allowed_actions") or raw.get("actions"))
|
||||
if isinstance(raw, dict)
|
||||
else [],
|
||||
path_allow=_string_list(raw.get("path_allow") or raw.get("allow_paths"))
|
||||
if isinstance(raw, dict)
|
||||
else [],
|
||||
path_deny=_string_list(raw.get("path_deny") or raw.get("deny_paths"))
|
||||
if isinstance(raw, dict)
|
||||
else [],
|
||||
attributes=dict(raw.get("attributes") or {}) if isinstance(raw, dict) else {},
|
||||
)
|
||||
for subject_id, raw in value.items()
|
||||
}
|
||||
|
||||
|
||||
def _path_rules_from_value(value: Any) -> list[LocalPathPolicyRule]:
|
||||
if value is None:
|
||||
return []
|
||||
if isinstance(value, dict):
|
||||
return [
|
||||
LocalPathPolicyRule.from_mapping(raw if isinstance(raw, dict) else {"pattern": pattern}, fallback_id=str(pattern))
|
||||
for pattern, raw in value.items()
|
||||
]
|
||||
if isinstance(value, list):
|
||||
return [
|
||||
LocalPathPolicyRule.from_mapping(raw, fallback_id=f"path-{index + 1}")
|
||||
for index, raw in enumerate(value)
|
||||
if isinstance(raw, dict)
|
||||
]
|
||||
return []
|
||||
|
||||
|
||||
def _policy_object_mapping(item: dict[str, Any]) -> dict[str, Any]:
|
||||
policy = item.get("policy") if isinstance(item.get("policy"), dict) else {}
|
||||
return {
|
||||
"path": policy.get("path") or item.get("source_path") or item.get("path"),
|
||||
"labels": policy.get("labels") or item.get("labels"),
|
||||
"trust_zone": policy.get("trust_zone"),
|
||||
"attributes": policy.get("attributes", {}),
|
||||
}
|
||||
|
||||
|
||||
def _object_id_for_result(item: dict[str, Any], index: int) -> str:
|
||||
path = item.get("source_path") or item.get("path") or "<memory>"
|
||||
unit = item.get("path") if item.get("source_path") else item.get("unit_index", index)
|
||||
return f"{path}#{unit}"
|
||||
|
||||
|
||||
def _annotate_result(item: dict[str, Any], decision: PolicyDecision) -> dict[str, Any]:
|
||||
annotated = dict(item)
|
||||
policy = dict(annotated.get("policy") or {})
|
||||
policy.update(
|
||||
{
|
||||
"decision_id": decision.decision_id,
|
||||
"effect": decision.effect,
|
||||
"labels": decision.labels,
|
||||
"trust_zone": decision.trust_zone,
|
||||
}
|
||||
)
|
||||
annotated["policy"] = {key: value for key, value in policy.items() if value not in (None, [], {})}
|
||||
return annotated
|
||||
|
||||
|
||||
def _redact_result(item: dict[str, Any]) -> dict[str, Any]:
|
||||
redacted = dict(item)
|
||||
if "text" in redacted:
|
||||
redacted["text"] = "[redacted by policy]"
|
||||
if "value" in redacted:
|
||||
redacted["value"] = None
|
||||
policy = dict(redacted.get("policy") or {})
|
||||
policy["redacted"] = True
|
||||
redacted["policy"] = policy
|
||||
return redacted
|
||||
|
||||
|
||||
def _denied_diagnostic(decision: PolicyDecision, *, redacted: bool = False) -> Diagnostic:
|
||||
return Diagnostic(
|
||||
severity="warning",
|
||||
code="policy.result.redacted" if redacted else "policy.result.denied",
|
||||
message=(
|
||||
f"Policy redacted `{decision.object_id}`: {decision.reason}"
|
||||
if redacted
|
||||
else f"Policy denied `{decision.object_id}`: {decision.reason}"
|
||||
),
|
||||
rule_id=decision.rule_id,
|
||||
details={"decision_id": decision.decision_id, "effect": decision.effect},
|
||||
)
|
||||
|
||||
|
||||
def _string_list(value: Any) -> list[str]:
|
||||
if value is None:
|
||||
return []
|
||||
if isinstance(value, list):
|
||||
return [str(item) for item in value if item is not None]
|
||||
return [str(value)]
|
||||
|
||||
|
||||
def _unique(values: list[str]) -> list[str]:
|
||||
seen: set[str] = set()
|
||||
result: list[str] = []
|
||||
for value in values:
|
||||
normalized = str(value).strip()
|
||||
key = normalized.lower()
|
||||
if normalized and key not in seen:
|
||||
result.append(normalized)
|
||||
seen.add(key)
|
||||
return result
|
||||
149
src/markitect_tool/policy/models.py
Normal file
149
src/markitect_tool/policy/models.py
Normal file
@@ -0,0 +1,149 @@
|
||||
"""Policy gateway models shared by local and external policy adapters."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PolicySubject:
|
||||
"""Actor asking to read, query, search, or package knowledge."""
|
||||
|
||||
id: str
|
||||
allowed_labels: list[str] = field(default_factory=list)
|
||||
trust_zones: list[str] = field(default_factory=list)
|
||||
roles: list[str] = field(default_factory=list)
|
||||
allowed_actions: list[str] = field(default_factory=list)
|
||||
path_allow: list[str] = field(default_factory=list)
|
||||
path_deny: list[str] = field(default_factory=list)
|
||||
attributes: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return _drop_empty(asdict(self))
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PolicyObject:
|
||||
"""Knowledge object considered by a policy decision."""
|
||||
|
||||
id: str
|
||||
path: str | None = None
|
||||
labels: list[str] = field(default_factory=list)
|
||||
trust_zone: str | None = None
|
||||
attributes: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return _drop_empty(asdict(self))
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PolicyDecision:
|
||||
"""Explainable policy decision for one subject/action/object tuple."""
|
||||
|
||||
subject: str
|
||||
action: str
|
||||
object_id: str
|
||||
effect: str
|
||||
reason: str
|
||||
mode: str = "enforce"
|
||||
rule_id: str | None = None
|
||||
labels: list[str] = field(default_factory=list)
|
||||
trust_zone: str | None = None
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
@property
|
||||
def allowed(self) -> bool:
|
||||
return self.effect in {"allow", "audit_denied"}
|
||||
|
||||
@property
|
||||
def denied(self) -> bool:
|
||||
return self.effect in {"deny", "audit_denied"}
|
||||
|
||||
@property
|
||||
def redacted(self) -> bool:
|
||||
return self.effect == "redact"
|
||||
|
||||
@property
|
||||
def decision_id(self) -> str:
|
||||
payload = {
|
||||
"subject": self.subject,
|
||||
"action": self.action,
|
||||
"object_id": self.object_id,
|
||||
"effect": self.effect,
|
||||
"reason": self.reason,
|
||||
"mode": self.mode,
|
||||
"rule_id": self.rule_id,
|
||||
"labels": self.labels,
|
||||
"trust_zone": self.trust_zone,
|
||||
"metadata": self.metadata,
|
||||
}
|
||||
return "policy:" + hashlib.sha256(
|
||||
json.dumps(payload, sort_keys=True, ensure_ascii=False, default=str).encode("utf-8")
|
||||
).hexdigest()
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
data = asdict(self)
|
||||
data["decision_id"] = self.decision_id
|
||||
data["allowed"] = self.allowed
|
||||
return _drop_empty(data)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PolicyFilterResult:
|
||||
"""Results after policy filtering plus decision and diagnostic metadata."""
|
||||
|
||||
results: list[dict[str, Any]]
|
||||
decisions: list[PolicyDecision] = field(default_factory=list)
|
||||
diagnostics: list[Any] = field(default_factory=list)
|
||||
mode: str = "enforce"
|
||||
subject: str | None = None
|
||||
action: str | None = None
|
||||
|
||||
@property
|
||||
def filtered(self) -> bool:
|
||||
return any(decision.denied for decision in self.decisions if not decision.allowed)
|
||||
|
||||
@property
|
||||
def denied_count(self) -> int:
|
||||
return sum(1 for decision in self.decisions if decision.effect == "deny")
|
||||
|
||||
@property
|
||||
def redacted_count(self) -> int:
|
||||
return sum(1 for decision in self.decisions if decision.effect == "redact")
|
||||
|
||||
@property
|
||||
def audit_denied_count(self) -> int:
|
||||
return sum(1 for decision in self.decisions if decision.effect == "audit_denied")
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
data = {
|
||||
"results": self.results,
|
||||
"decisions": [decision.to_dict() for decision in self.decisions],
|
||||
"diagnostics": [
|
||||
diagnostic.to_dict() if hasattr(diagnostic, "to_dict") else diagnostic
|
||||
for diagnostic in self.diagnostics
|
||||
],
|
||||
"policy": {
|
||||
"mode": self.mode,
|
||||
"subject": self.subject,
|
||||
"action": self.action,
|
||||
"filtered": self.filtered,
|
||||
"allowed": sum(1 for decision in self.decisions if decision.effect == "allow"),
|
||||
"denied": self.denied_count,
|
||||
"redacted": self.redacted_count,
|
||||
"audit_denied": self.audit_denied_count,
|
||||
"total_decisions": len(self.decisions),
|
||||
},
|
||||
}
|
||||
return _drop_empty(data)
|
||||
|
||||
|
||||
def _drop_empty(data: dict[str, Any]) -> dict[str, Any]:
|
||||
return {
|
||||
key: value
|
||||
for key, value in data.items()
|
||||
if value not in (None, [], {}, "")
|
||||
}
|
||||
Reference in New Issue
Block a user