Implement credentialed drill packaging workplan

This commit is contained in:
2026-05-19 01:27:59 +02:00
parent 022cd8d37e
commit 6e0372d21a
23 changed files with 924 additions and 43 deletions

View File

@@ -70,3 +70,58 @@ def test_audit_retention_plan_identifies_eligible_records() -> None:
assert plan["valid"] is True
assert plan["plan"]["eligible_operation_ids"] == ["op:old"]
assert plan["plan"]["eligible_count"] == 1
def test_audit_retention_apply_prunes_eligible_records_and_records_apply() -> None:
runtime = PhaseMemoryRuntime()
runtime.audit_sink.record(
{
"schema_version": "phase_memory.audit.event.v1",
"operation_id": "op:old",
"operation": "manual",
"timestamp": "2026-01-01T00:00:00+00:00",
"subject": {"kind": "audit_events", "id": "old"},
"source": {"ref": "test"},
"dry_run": True,
"allowed": True,
}
)
runtime.audit_sink.record(
{
"schema_version": "phase_memory.audit.event.v1",
"operation_id": "op:new",
"operation": "manual",
"timestamp": "2026-05-18T00:00:00+00:00",
"subject": {"kind": "audit_events", "id": "new"},
"source": {"ref": "test"},
"dry_run": True,
"allowed": True,
}
)
plan = runtime.audit_retention_plan(retention_days=30, now=datetime(2026, 5, 19, tzinfo=timezone.utc))
applied = runtime.apply_audit_retention(plan["plan"])
remaining_ids = [event["operation_id"] for event in runtime.audit_sink.query()]
assert applied["valid"] is True
assert applied["result"]["pruned_operation_ids"] == ["op:old"]
assert "op:old" not in remaining_ids
assert "op:new" in remaining_ids
assert any(event["operation"] == "audit.retention.apply" for event in runtime.audit_sink.query())
def test_audit_retention_apply_noop_and_unsupported_paths() -> None:
runtime = PhaseMemoryRuntime()
noop = runtime.apply_audit_retention(retention_days=30, now=datetime(2026, 5, 19, tzinfo=timezone.utc))
assert noop["valid"] is True
assert noop["result"]["changed"] is False
class UnsupportedAuditSink:
def record(self, event):
return {"recorded": True, "event": event}
unsupported = PhaseMemoryRuntime(audit_sink=UnsupportedAuditSink())
result = unsupported.apply_audit_retention(retention_days=30)
assert result["valid"] is False
assert result["diagnostics"][0]["code"] == "audit_retention_apply_unsupported"