Files
net-kingdom/local-identity/tests/test_serve.py
tegwick d35823df08 feat(local-identity): Stage 3 — minimal native OIDC provider (NK-WP-0002-T03)
Add local-identity serve command: a minimal Authorization Code flow OIDC
server backed by file-store users.  Implemented natively with no heavy
OIDC library — only stdlib http.server and the cryptography package.

New modules:
  keys.py      RSA-2048 signing key generation + JWKS helpers
  tls.py       Self-signed TLS certificate (localhost/127.0.0.1 SANs)
  jwt_utils.py RS256 JWT creation and verification
  serve.py     OIDCHandler + make_handler() factory + run_server()

Endpoints: /.well-known/openid-configuration, /jwks, /auth, /token,
/userinfo.  Server binds to 127.0.0.1 only; tokens carry iss: local-identity
which production Keycloak rejects by design.

104 tests passing (16 new for Stage 3).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-02 01:05:50 +01:00

440 lines
14 KiB
Python

"""
Tests for local-identity OIDC serve module.
The tests spin up a plain HTTP (non-TLS) server on a random port using
make_handler() with scheme="http". This keeps tests fast and free of
certificate trust issues while exercising all the OIDC logic.
"""
import http.server
import json
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
import pytest
from local_identity import store
from local_identity.jwt_utils import JWTError, create_token, verify_token
from local_identity.keys import ensure_signing_key, jwk_public, key_id
from local_identity.serve import _BIND_HOST, _ISSUER, make_handler
from local_identity.user import UserRecord, make_test_user
# ------------------------------------------------------------------ #
# Fixtures #
# ------------------------------------------------------------------ #
@pytest.fixture
def populated_store(tmp_store):
"""Init store with primary user alice + two test users."""
store.init_dirs()
primary = UserRecord(
username="alice",
fullname="Alice Smith",
email="alice@example.com",
)
store.write_user(primary)
store.write_user(make_test_user(primary, 1))
store.write_user(make_test_user(primary, 2))
return primary
@pytest.fixture
def oidc_server(populated_store, tmp_store):
"""
Start a plain HTTP OIDC server on a random port.
Yields (base_url, private_key).
"""
private_key = ensure_signing_key()
kid = key_id(private_key)
HandlerClass = make_handler(private_key, kid, token_ttl=60, scheme="http")
httpd = http.server.HTTPServer((_BIND_HOST, 0), HandlerClass)
port = httpd.server_address[1]
thread = threading.Thread(target=httpd.serve_forever)
thread.daemon = True
thread.start()
yield f"http://{_BIND_HOST}:{port}", private_key
httpd.shutdown()
# ------------------------------------------------------------------ #
# Helpers #
# ------------------------------------------------------------------ #
def _get_json(url: str) -> dict:
with urllib.request.urlopen(url) as resp:
return json.loads(resp.read())
def _post_form(url: str, fields: dict) -> tuple[int, dict]:
data = urllib.parse.urlencode(fields).encode()
req = urllib.request.Request(
url, data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
with urllib.request.urlopen(req) as resp:
return resp.status, json.loads(resp.read())
class _NoRedirect(urllib.request.HTTPRedirectHandler):
"""Opener that surfaces redirects as HTTPError instead of following them."""
def redirect_request(self, req, fp, code, msg, headers, newurl):
return None
def _post_form_no_redirect(url: str, fields: dict) -> tuple[int, str]:
"""POST a form, capture the Location header without following the redirect."""
data = urllib.parse.urlencode(fields).encode()
req = urllib.request.Request(
url, data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
opener = urllib.request.build_opener(_NoRedirect())
try:
opener.open(req)
return 200, ""
except urllib.error.HTTPError as e:
return e.code, e.headers.get("Location", "")
def _do_auth_code_flow(base_url: str, username: str = "alice") -> dict:
"""
Execute the full authorization code flow for the given username.
Returns the token response dict.
"""
callback = "http://localhost:9999/callback"
# POST /auth (skip GET — form is cosmetic only)
status, location = _post_form_no_redirect(
f"{base_url}/auth",
{
"username": username,
"client_id": "testclient",
"redirect_uri": callback,
"state": "s1",
"nonce": "n1",
},
)
assert status == 302, f"Expected 302, got {status}"
assert "code=" in location, f"No code in redirect: {location}"
qs = urllib.parse.parse_qs(urllib.parse.urlparse(location).query)
code = qs["code"][0]
_, token_resp = _post_form(
f"{base_url}/token",
{
"grant_type": "authorization_code",
"code": code,
"redirect_uri": callback,
"client_id": "testclient",
},
)
return token_resp
# ------------------------------------------------------------------ #
# OIDC discovery #
# ------------------------------------------------------------------ #
def test_discovery_structure(oidc_server):
base_url, _ = oidc_server
doc = _get_json(f"{base_url}/.well-known/openid-configuration")
assert doc["issuer"] == _ISSUER
for key in ("authorization_endpoint", "token_endpoint",
"userinfo_endpoint", "jwks_uri"):
assert doc[key].startswith(base_url), f"{key} should point to server"
assert "code" in doc["response_types_supported"]
assert "RS256" in doc["id_token_signing_alg_values_supported"]
assert "openid" in doc["scopes_supported"]
# ------------------------------------------------------------------ #
# JWKS #
# ------------------------------------------------------------------ #
def test_jwks_contains_rsa_key(oidc_server):
base_url, private_key = oidc_server
data = _get_json(f"{base_url}/jwks")
assert "keys" in data
assert len(data["keys"]) == 1
jwk = data["keys"][0]
assert jwk["kty"] == "RSA"
assert jwk["alg"] == "RS256"
assert jwk["use"] == "sig"
assert "n" in jwk and "e" in jwk
assert jwk["kid"] == key_id(private_key)
# ------------------------------------------------------------------ #
# Authorization endpoint (GET) #
# ------------------------------------------------------------------ #
def test_auth_get_returns_html_form(oidc_server):
base_url, _ = oidc_server
params = urllib.parse.urlencode({
"response_type": "code",
"client_id": "testclient",
"redirect_uri": "http://localhost:9999/callback",
"scope": "openid profile email",
"state": "xyz",
})
with urllib.request.urlopen(f"{base_url}/auth?{params}") as resp:
body = resp.read().decode()
assert "<form" in body
assert 'method="POST"' in body
assert "alice" in body # primary user listed
def test_auth_get_wrong_response_type_redirects_error(oidc_server):
base_url, _ = oidc_server
callback = "http://localhost:9999/callback"
params = urllib.parse.urlencode({
"response_type": "token", # implicit — unsupported
"client_id": "testclient",
"redirect_uri": callback,
"state": "xyz",
})
status, location = _post_form_no_redirect(
f"{base_url}/auth?{params}", {}
)
# GET with wrong type triggers GET handler which sends redirect
# Actually we need to GET, not POST. Use the no-redirect opener differently.
req = urllib.request.Request(f"{base_url}/auth?{params}")
opener = urllib.request.build_opener(_NoRedirect())
try:
opener.open(req)
pytest.fail("Expected redirect")
except urllib.error.HTTPError as e:
loc = e.headers.get("Location", "")
assert "unsupported_response_type" in loc
# ------------------------------------------------------------------ #
# Full authorization code flow #
# ------------------------------------------------------------------ #
def test_full_flow_returns_valid_token(oidc_server):
base_url, private_key = oidc_server
token_resp = _do_auth_code_flow(base_url, username="alice")
assert "access_token" in token_resp
assert "id_token" in token_resp
assert token_resp["token_type"] == "Bearer"
assert token_resp["expires_in"] == 60
payload = verify_token(token_resp["access_token"], private_key.public_key())
assert payload["sub"] == "alice"
assert payload["iss"] == _ISSUER
assert payload["email"] == "alice@example.com"
assert payload["name"] == "Alice Smith"
assert payload["preferred_username"] == "alice"
assert payload["nonce"] == "n1"
def test_full_flow_userinfo(oidc_server):
base_url, _ = oidc_server
token_resp = _do_auth_code_flow(base_url, username="alice")
req = urllib.request.Request(
f"{base_url}/userinfo",
headers={"Authorization": f"Bearer {token_resp['access_token']}"},
)
with urllib.request.urlopen(req) as resp:
userinfo = json.loads(resp.read())
assert userinfo["sub"] == "alice"
assert userinfo["email"] == "alice@example.com"
assert userinfo["preferred_username"] == "alice"
def test_full_flow_test_user(oidc_server):
"""Test user alice1 can also authenticate."""
base_url, private_key = oidc_server
token_resp = _do_auth_code_flow(base_url, username="alice1")
payload = verify_token(token_resp["access_token"], private_key.public_key())
assert payload["sub"] == "alice1"
assert payload["email"] == "alice+test1@example.com"
# ------------------------------------------------------------------ #
# Code replay / expiry #
# ------------------------------------------------------------------ #
def test_code_cannot_be_reused(oidc_server):
base_url, _ = oidc_server
callback = "http://localhost:9999/callback"
status, location = _post_form_no_redirect(
f"{base_url}/auth",
{
"username": "alice",
"client_id": "testclient",
"redirect_uri": callback,
"state": "s",
"nonce": "",
},
)
qs = urllib.parse.parse_qs(urllib.parse.urlparse(location).query)
code = qs["code"][0]
token_fields = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": callback,
"client_id": "testclient",
}
# First use succeeds
_, first = _post_form(f"{base_url}/token", token_fields)
assert "access_token" in first
# Second use fails
try:
_post_form(f"{base_url}/token", token_fields)
pytest.fail("Expected 400 on code replay")
except urllib.error.HTTPError as e:
assert e.code == 400
body = json.loads(e.read())
assert body["error"] == "invalid_grant"
def test_unknown_code_rejected(oidc_server):
base_url, _ = oidc_server
try:
_post_form(
f"{base_url}/token",
{
"grant_type": "authorization_code",
"code": "notacode",
"redirect_uri": "http://localhost:9999/callback",
"client_id": "testclient",
},
)
pytest.fail("Expected 400")
except urllib.error.HTTPError as e:
assert e.code == 400
body = json.loads(e.read())
assert body["error"] == "invalid_grant"
# ------------------------------------------------------------------ #
# Userinfo — token errors #
# ------------------------------------------------------------------ #
def test_userinfo_no_bearer_returns_401(oidc_server):
base_url, _ = oidc_server
try:
urllib.request.urlopen(f"{base_url}/userinfo")
pytest.fail("Expected 401")
except urllib.error.HTTPError as e:
assert e.code == 401
def test_userinfo_tampered_token_rejected(oidc_server):
base_url, private_key = oidc_server
token_resp = _do_auth_code_flow(base_url)
good_token = token_resp["access_token"]
# Tamper with the signature (flip last char)
parts = good_token.split(".")
tampered = parts[0] + "." + parts[1] + "." + parts[2][:-1] + (
"A" if parts[2][-1] != "A" else "B"
)
req = urllib.request.Request(
f"{base_url}/userinfo",
headers={"Authorization": f"Bearer {tampered}"},
)
try:
urllib.request.urlopen(req)
pytest.fail("Expected 401")
except urllib.error.HTTPError as e:
assert e.code == 401
# ------------------------------------------------------------------ #
# Bind-host assertion #
# ------------------------------------------------------------------ #
def test_server_binds_to_localhost_only(oidc_server):
"""Verify the server address is always 127.0.0.1."""
base_url, _ = oidc_server
assert base_url.startswith(f"http://{_BIND_HOST}:")
def test_bind_host_constant():
assert _BIND_HOST == "127.0.0.1"
# ------------------------------------------------------------------ #
# JWT unit tests (independent of HTTP server) #
# ------------------------------------------------------------------ #
def test_jwt_roundtrip(tmp_store):
store.init_dirs()
private_key = ensure_signing_key()
kid = key_id(private_key)
token = create_token(
private_key=private_key,
kid=kid,
sub="bob",
iss=_ISSUER,
aud="myapp",
email="bob@example.com",
name="Bob Jones",
preferred_username="bob",
ttl=300,
nonce="abc",
)
payload = verify_token(token, private_key.public_key())
assert payload["sub"] == "bob"
assert payload["iss"] == _ISSUER
assert payload["aud"] == "myapp"
assert payload["email"] == "bob@example.com"
assert payload["name"] == "Bob Jones"
assert payload["nonce"] == "abc"
assert payload["exp"] > int(time.time())
def test_jwt_expired_rejected(tmp_store):
store.init_dirs()
private_key = ensure_signing_key()
kid = key_id(private_key)
token = create_token(
private_key=private_key,
kid=kid,
sub="bob",
iss=_ISSUER,
aud="x",
email="b@x.com",
name="Bob",
preferred_username="bob",
ttl=-1, # already expired
)
with pytest.raises(JWTError, match="expired"):
verify_token(token, private_key.public_key())
def test_jwt_signing_key_persisted(tmp_store):
"""ensure_signing_key() returns the same key on repeated calls."""
store.init_dirs()
k1 = ensure_signing_key()
k2 = ensure_signing_key()
assert key_id(k1) == key_id(k2)