Files
net-kingdom/tools/iam-profile-conformance/tests/test_iam_profile_conformance.py

311 lines
11 KiB
Python

import base64
import http.server
import importlib.util
import json
import sys
import threading
import time
import urllib.parse
from pathlib import Path
import pytest
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa
TOOL_PATH = Path(__file__).resolve().parents[1] / "iam_profile_conformance.py"
SPEC = importlib.util.spec_from_file_location("iam_profile_conformance", TOOL_PATH)
conformance = importlib.util.module_from_spec(SPEC)
assert SPEC.loader is not None
sys.modules[SPEC.name] = conformance
SPEC.loader.exec_module(conformance)
def b64url(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def jwk_from_key(private_key, kid: str) -> dict:
numbers = private_key.public_key().public_numbers()
n = b64url(numbers.n.to_bytes((numbers.n.bit_length() + 7) // 8, "big"))
e = b64url(numbers.e.to_bytes((numbers.e.bit_length() + 7) // 8, "big"))
return {"kty": "RSA", "use": "sig", "alg": "RS256", "kid": kid, "n": n, "e": e}
def sign_jwt(private_key, kid: str, payload: dict) -> str:
header = {"alg": "RS256", "typ": "JWT", "kid": kid}
header_b64 = b64url(json.dumps(header, separators=(",", ":")).encode())
payload_b64 = b64url(json.dumps(payload, separators=(",", ":")).encode())
signing_input = f"{header_b64}.{payload_b64}".encode("ascii")
signature = private_key.sign(signing_input, padding.PKCS1v15(), hashes.SHA256())
return f"{header_b64}.{payload_b64}.{b64url(signature)}"
def default_payload(issuer: str, audience: str, **overrides) -> dict:
now = int(time.time())
payload = {
"iss": issuer,
"sub": "user:alice",
"aud": [audience, "profile-consumer"],
"exp": now + 600,
"iat": now,
"nbf": now - 5,
"jti": "test-token",
"tenant": "tenant:platform",
"principal_type": "human",
"preferred_username": "alice",
"email": "alice@example.test",
"groups": ["netkingdom-admins"],
"roles": ["admin"],
"scope": "openid profile email",
"assurance": {
"level": "aal2",
"methods": ["pwd", "otp"],
"mfa": True,
"source": "key-cape",
"at": now,
},
}
payload.update(overrides)
return payload
@pytest.fixture
def issuer_fixture():
servers = []
def start(name: str, discovery_overrides: dict | None = None):
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
kid = f"{name}-kid"
class Handler(http.server.BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
pass
def do_GET(self):
parsed = urllib.parse.urlparse(self.path)
base = f"http://127.0.0.1:{self.server.server_port}/{name}"
issuer = base
discovery = {
"issuer": issuer,
"authorization_endpoint": f"{base}/auth",
"token_endpoint": f"{base}/token",
"userinfo_endpoint": f"{base}/userinfo",
"jwks_uri": f"{base}/jwks",
"end_session_endpoint": f"{base}/logout",
"response_types_supported": ["code"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"scopes_supported": ["openid", "profile", "email"],
"grant_types_supported": ["authorization_code", "client_credentials"],
"code_challenge_methods_supported": ["S256"],
"claims_supported": [
"iss",
"sub",
"aud",
"exp",
"iat",
"tenant",
"principal_type",
"groups",
"roles",
"assurance",
],
}
if discovery_overrides:
discovery.update(discovery_overrides)
if parsed.path == f"/{name}/.well-known/openid-configuration":
self._json(discovery)
return
if parsed.path == f"/{name}/jwks":
self._json({"keys": [jwk_from_key(private_key, kid)]})
return
if parsed.path == f"/{name}/auth":
query = urllib.parse.parse_qs(parsed.query)
redirect_uri = query.get("redirect_uri", ["http://localhost/callback"])[0]
if "code_challenge" not in query:
location = redirect_uri + "?error=invalid_request&error_description=pkce_code_challenge_required"
self.send_response(302)
self.send_header("Location", location)
self.end_headers()
return
self.send_response(302)
self.send_header("Location", redirect_uri + "?code=test-code")
self.end_headers()
return
self.send_response(404)
self.end_headers()
def _json(self, payload):
data = json.dumps(payload).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
httpd = http.server.HTTPServer(("127.0.0.1", 0), Handler)
thread = threading.Thread(target=httpd.serve_forever, daemon=True)
thread.start()
servers.append(httpd)
return f"http://127.0.0.1:{httpd.server_port}/{name}", private_key, kid
yield start
for server in servers:
server.shutdown()
def statuses(results):
return {result.name: result.status for result in results}
def messages(results):
return {result.name: result.message for result in results}
def test_keycape_fixture_passes_full_conformance(issuer_fixture):
issuer, private_key, kid = issuer_fixture("keycape")
token = sign_jwt(private_key, kid, default_payload(issuer, "service-a"))
config = conformance.Config(
issuer=issuer,
audience="service-a",
access_token=token,
client_id="iam-profile-conformance",
redirect_uri="http://localhost/callback",
environment="local",
)
results = conformance.run_suite(config)
assert "FAIL" not in statuses(results).values(), messages(results)
assert statuses(results)["pkce-probe"] == "PASS"
assert statuses(results)["roles-claim"] == "PASS"
def test_keycloak_fixture_passes_with_canonical_roles_and_realm_roles(issuer_fixture):
issuer, private_key, kid = issuer_fixture("realms/platform")
payload = default_payload(
issuer,
"service-a",
assurance={
"level": "aal2",
"methods": ["upstream_mfa"],
"mfa": True,
"source": "keycloak",
"at": int(time.time()),
},
realm_access={"roles": ["admin", "offline_access"]},
)
token = sign_jwt(private_key, kid, payload)
config = conformance.Config(
issuer=issuer,
audience="service-a",
access_token=token,
client_id="iam-profile-conformance",
redirect_uri="http://localhost/callback",
environment="local",
)
results = conformance.run_suite(config)
assert "FAIL" not in statuses(results).values(), messages(results)
assert statuses(results)["roles-claim"] == "PASS"
def test_provider_native_roles_warn_without_canonical_roles(issuer_fixture):
issuer, private_key, kid = issuer_fixture("realms/legacy")
payload = default_payload(
issuer,
"service-a",
realm_access={"roles": ["admin"]},
)
payload.pop("roles")
token = sign_jwt(private_key, kid, payload)
config = conformance.Config(
issuer=issuer,
audience="service-a",
access_token=token,
client_id="iam-profile-conformance",
redirect_uri="http://localhost/callback",
environment="local",
)
results = conformance.run_suite(config)
assert "FAIL" not in statuses(results).values(), messages(results)
assert statuses(results)["roles-claim"] == "WARN"
def test_production_rejects_local_issuer(issuer_fixture):
issuer, private_key, kid = issuer_fixture("keycape")
token = sign_jwt(private_key, kid, default_payload(issuer, "service-a"))
config = conformance.Config(
issuer=issuer,
audience="service-a",
access_token=token,
client_id="iam-profile-conformance",
redirect_uri="http://localhost/callback",
environment="production",
)
results = conformance.run_suite(config)
assert statuses(results)["local-issuer-policy"] == "FAIL"
def test_missing_tenant_fails_claim_contract(issuer_fixture):
issuer, private_key, kid = issuer_fixture("keycape")
payload = default_payload(issuer, "service-a")
payload.pop("tenant")
token = sign_jwt(private_key, kid, payload)
config = conformance.Config(
issuer=issuer,
audience="service-a",
access_token=token,
client_id="iam-profile-conformance",
redirect_uri="http://localhost/callback",
environment="local",
)
results = conformance.run_suite(config)
assert statuses(results)["claim-shape"] == "FAIL"
assert statuses(results)["tenant-claim"] == "FAIL"
def test_delegated_agent_shape_passes(issuer_fixture):
issuer, private_key, kid = issuer_fixture("agent-issuer")
payload = default_payload(
issuer,
"service-a",
sub="agent:build-runner",
principal_type="agent",
preferred_username=None,
roles=["operator"],
agent={"id": "agent:build-runner", "mode": "delegated"},
actor_sub="user:alice",
assurance={
"level": "aal2",
"methods": ["workload_identity", "delegated_user_mfa"],
"mfa": True,
"source": "key-cape",
"at": int(time.time()),
},
)
token = sign_jwt(private_key, kid, payload)
config = conformance.Config(
issuer=issuer,
audience="service-a",
access_token=token,
client_id="iam-profile-conformance",
redirect_uri="http://localhost/callback",
environment="local",
)
results = conformance.run_suite(config)
assert "FAIL" not in statuses(results).values(), messages(results)
assert statuses(results)["principal-shape"] == "PASS"