feat: add durable store record serialization

This commit is contained in:
2026-06-16 03:43:55 +02:00
parent abb3c5bd34
commit 6810d9a3aa
7 changed files with 576 additions and 0 deletions

View File

@@ -249,3 +249,10 @@ with a factory that returns a fresh store. The harness covers readiness,
idempotent migration, core save/read/query behavior, transaction rollback,
outbox ordering, and diagnostics that expose counts without raw factor or
profile values.
`user_engine.store_records` defines the JSONB serialization contract for the
generic record table. `store_record_for` turns supported domain dataclasses
into `StoreRecord` envelopes with deterministic keys and index metadata, while
`domain_record_from_store_record` restores those payloads to domain objects.
These payloads are durable state and may contain sensitive values, so they must
not be emitted as diagnostics.

View File

@@ -17,6 +17,7 @@ src/user_engine/
ports.py adapter protocols for identity, authorization, events, audit,
membership export, application bindings, and secrets
service.py headless service API for the isolated MVP
store_records.py JSON-safe durable-store record serialization
testing/ local fixtures for tests and examples
tests/ standard-library unittest suite
```

View File

@@ -289,6 +289,18 @@ bootstrap schema, and `user_engine.testing.store_conformance` exposes a
reusable harness that future adapters can run with their own store factory.
The standard local suite runs that harness against `InMemoryUserEngineStore`.
USER-WP-0017 adds the provider-neutral serialization layer. Future Postgres
adapters should use `user_engine.store_records.store_record_for` before writing
to `user_engine_records` and `domain_record_from_store_record` after reading
JSONB payloads back. The `StoreRecord` envelope maps directly to the generic
record table columns: `record_type`, `record_key`, `tenant`, `user_id`,
`application_id`, `scope_type`, `scope_id`, and `payload`.
Durable payloads are raw state, not diagnostics. They can include factor
values, profile values, prepared-account matches, and access-profile defaults.
Adapters must avoid logging payloads and should use `record_counts` or other
redacted diagnostics for observability.
Likely future follow-up work should be:
- Add a Postgres adapter behind the existing store boundary.

View File

@@ -0,0 +1,348 @@
"""JSONB-oriented durable store record serialization."""
from __future__ import annotations
import json
from collections.abc import Mapping as MappingABC
from dataclasses import dataclass, fields, is_dataclass
from datetime import datetime
from enum import Enum
from types import NoneType, UnionType
from typing import Any, Callable, Mapping, Union, get_args, get_origin, get_type_hints
from user_engine.domain import (
Account,
AccessProfile,
ActiveAccessContext,
Application,
ApplicationBinding,
AuditRecord,
Catalog,
ExternalIdentity,
FamilyInvitation,
IdentityFactor,
Membership,
OnboardingJourney,
OutboxEvent,
PreparedAccount,
ProfileValue,
RegistrationSession,
TenantAccount,
User,
WelcomeProtocol,
)
from user_engine.migrations import USER_ENGINE_STORE_RECORD_TYPES
@dataclass(frozen=True)
class StoreRecord:
"""One generic durable-store row for a domain object payload."""
record_type: str
record_key: str
payload: Mapping[str, Any]
tenant: str | None = None
user_id: str | None = None
application_id: str | None = None
scope_type: str | None = None
scope_id: str | None = None
@dataclass(frozen=True)
class StoreRecordCodec:
"""Codec rule for one user-engine store record type."""
record_type: str
model_type: type[Any]
record_key: Callable[[Any], str]
metadata: Callable[[Any], Mapping[str, str | None]]
def store_record_for(value: Any) -> StoreRecord:
"""Return a generic durable-store record for a supported domain object."""
codec = _CODECS_BY_MODEL.get(type(value))
if codec is None:
raise TypeError(f"unsupported store record type: {type(value).__name__}")
metadata = dict(codec.metadata(value))
return StoreRecord(
record_type=codec.record_type,
record_key=codec.record_key(value),
payload=_encode_dataclass(value),
tenant=metadata.get("tenant"),
user_id=metadata.get("user_id"),
application_id=metadata.get("application_id"),
scope_type=metadata.get("scope_type"),
scope_id=metadata.get("scope_id"),
)
def domain_record_from_store_record(record: StoreRecord) -> Any:
"""Decode a durable-store record payload into its domain dataclass."""
codec = _CODECS_BY_RECORD_TYPE.get(record.record_type)
if codec is None:
raise ValueError(f"unsupported store record type: {record.record_type}")
return _decode_dataclass(codec.model_type, record.payload)
def validate_store_record_codecs() -> tuple[str, ...]:
"""Return codec coverage errors against the durable-store manifest."""
errors: list[str] = []
manifest_types = set(USER_ENGINE_STORE_RECORD_TYPES)
codec_types = set(_CODECS_BY_RECORD_TYPE)
missing = sorted(manifest_types - codec_types)
extra = sorted(codec_types - manifest_types)
if missing:
errors.append(f"missing codecs for: {', '.join(missing)}")
if extra:
errors.append(f"extra codecs for: {', '.join(extra)}")
return tuple(errors)
def _encode_dataclass(value: Any) -> Mapping[str, Any]:
if not is_dataclass(value):
raise TypeError(f"expected dataclass, got {type(value).__name__}")
return {
field.name: _encode_value(getattr(value, field.name))
for field in fields(value)
}
def _encode_value(value: Any) -> Any:
if is_dataclass(value):
return _encode_dataclass(value)
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, Enum):
return value.value
if isinstance(value, tuple):
return [_encode_value(item) for item in value]
if isinstance(value, list):
return [_encode_value(item) for item in value]
if isinstance(value, MappingABC):
return {str(key): _encode_value(item) for key, item in value.items()}
return value
def _decode_dataclass(model_type: type[Any], payload: Mapping[str, Any]) -> Any:
hints = get_type_hints(model_type)
kwargs = {
field.name: _decode_value(payload[field.name], hints[field.name])
for field in fields(model_type)
if field.name in payload
}
return model_type(**kwargs)
def _decode_value(value: Any, type_hint: Any) -> Any:
if value is None:
return None
if type_hint is Any:
return value
origin = get_origin(type_hint)
args = get_args(type_hint)
if origin in (UnionType, Union):
non_none_args = tuple(arg for arg in args if arg is not NoneType)
if len(non_none_args) == 1 and len(non_none_args) != len(args):
return _decode_value(value, non_none_args[0])
for arg in non_none_args:
try:
return _decode_value(value, arg)
except (TypeError, ValueError):
continue
return value
if type_hint is datetime:
return datetime.fromisoformat(str(value))
if isinstance(type_hint, type) and issubclass(type_hint, Enum):
return type_hint(value)
if isinstance(type_hint, type) and is_dataclass(type_hint):
return _decode_dataclass(type_hint, value)
if origin is tuple:
if not args:
return tuple(value)
item_hint = args[0] if len(args) == 2 and args[1] is Ellipsis else None
if item_hint is not None:
return tuple(_decode_value(item, item_hint) for item in value)
return tuple(
_decode_value(item, args[index])
for index, item in enumerate(value)
)
if origin is list:
item_hint = args[0] if args else Any
return [_decode_value(item, item_hint) for item in value]
if origin in (dict, Mapping, MappingABC):
return dict(value)
if type_hint in (str, int, float, bool):
return type_hint(value)
return value
def _single_key(value: str) -> str:
return value
def _composite_key(*parts: str | None) -> str:
return json.dumps(list(parts), separators=(",", ":"), ensure_ascii=True)
def _enum_value(value: Any) -> str | None:
if value is None:
return None
if isinstance(value, Enum):
return str(value.value)
return str(value)
_CODECS = (
StoreRecordCodec(
"users",
User,
lambda value: _single_key(value.user_id),
lambda value: {"user_id": value.user_id},
),
StoreRecordCodec(
"accounts",
Account,
lambda value: _single_key(value.user_id),
lambda value: {"user_id": value.user_id},
),
StoreRecordCodec(
"external_identities",
ExternalIdentity,
lambda value: _composite_key(value.issuer, value.subject),
lambda value: {"user_id": value.user_id},
),
StoreRecordCodec(
"tenant_accounts",
TenantAccount,
lambda value: _composite_key(value.tenant, value.user_id),
lambda value: {"tenant": value.tenant, "user_id": value.user_id},
),
StoreRecordCodec(
"memberships",
Membership,
lambda value: _single_key(value.membership_id),
lambda value: {
"tenant": value.tenant,
"user_id": value.user_id,
"scope_type": value.scope_type,
"scope_id": value.scope_id,
},
),
StoreRecordCodec(
"applications",
Application,
lambda value: _single_key(value.application_id),
lambda value: {"application_id": value.application_id},
),
StoreRecordCodec(
"application_bindings",
ApplicationBinding,
lambda value: _single_key(value.application_id),
lambda value: {"application_id": value.application_id},
),
StoreRecordCodec(
"catalogs",
Catalog,
lambda value: _single_key(value.catalog_id),
lambda value: {"application_id": value.owning_application_id},
),
StoreRecordCodec(
"family_invitations",
FamilyInvitation,
lambda value: _single_key(value.invitation_id),
lambda value: {
"tenant": value.tenant,
"user_id": value.user_id,
"application_id": value.application_id,
"scope_type": "family",
"scope_id": value.family_scope_id,
},
),
StoreRecordCodec(
"registration_sessions",
RegistrationSession,
lambda value: _single_key(value.registration_id),
lambda value: {"tenant": value.tenant, "user_id": value.user_id},
),
StoreRecordCodec(
"identity_factors",
IdentityFactor,
lambda value: _single_key(value.factor_id),
lambda value: {"user_id": value.user_id},
),
StoreRecordCodec(
"prepared_accounts",
PreparedAccount,
lambda value: _single_key(value.prepared_account_id),
lambda value: {"tenant": value.tenant},
),
StoreRecordCodec(
"access_profiles",
AccessProfile,
lambda value: _single_key(value.access_profile_id),
lambda value: {
"tenant": value.tenant,
"scope_type": _enum_value(value.scope_type),
"scope_id": value.scope_id,
},
),
StoreRecordCodec(
"active_access_contexts",
ActiveAccessContext,
lambda value: _composite_key(value.user_id, value.tenant),
lambda value: {
"tenant": value.tenant,
"user_id": value.user_id,
"scope_type": _enum_value(value.scope_type),
"scope_id": value.scope_id,
},
),
StoreRecordCodec(
"welcome_protocols",
WelcomeProtocol,
lambda value: _single_key(value.protocol_id),
lambda value: {
"tenant": value.tenant,
"application_id": value.application_id,
},
),
StoreRecordCodec(
"onboarding_journeys",
OnboardingJourney,
lambda value: _single_key(value.journey_id),
lambda value: {"tenant": value.tenant, "user_id": value.user_id},
),
StoreRecordCodec(
"profile_values",
ProfileValue,
lambda value: _composite_key(
value.user_id,
value.attribute_key,
_enum_value(value.scope),
value.scope_id,
),
lambda value: {
"user_id": value.user_id,
"scope_type": _enum_value(value.scope),
"scope_id": value.scope_id,
},
),
StoreRecordCodec(
"audit_records",
AuditRecord,
lambda value: _single_key(value.audit_id),
lambda value: {"tenant": value.tenant, "user_id": value.subject},
),
StoreRecordCodec(
"outbox_events",
OutboxEvent,
lambda value: _single_key(value.event_id),
lambda value: {"tenant": value.tenant},
),
)
_CODECS_BY_RECORD_TYPE = {codec.record_type: codec for codec in _CODECS}
_CODECS_BY_MODEL = {codec.model_type: codec for codec in _CODECS}

View File

@@ -89,6 +89,11 @@ def assert_user_engine_store_conformance(
_assert_diagnostics_contract(testcase, store_factory)
def reference_store_records(store: UserEngineStore) -> dict[str, Any]:
"""Write and return a representative record for every store record type."""
return _write_reference_records(store)
def _assert_readiness_contract(testcase: TestCase, store_factory: StoreFactory) -> None:
store = store_factory()
if store.schema_version is None:

View File

@@ -0,0 +1,93 @@
import json
import unittest
from user_engine.adapters.local import InMemoryUserEngineStore
from user_engine.migrations import USER_ENGINE_STORE_RECORD_TYPES
from user_engine.store_records import (
StoreRecord,
domain_record_from_store_record,
store_record_for,
validate_store_record_codecs,
)
from user_engine.testing.store_conformance import (
PROFILE_SECRET_VALUE,
RAW_FACTOR_VALUE,
TENANT,
USER_ID,
reference_store_records,
)
class StoreRecordSerializationTests(unittest.TestCase):
def test_codecs_cover_migration_manifest_record_types(self):
self.assertEqual(validate_store_record_codecs(), ())
def test_reference_records_round_trip_through_json_safe_payloads(self):
store = InMemoryUserEngineStore()
store.migrate()
records = reference_store_records(store)
expected_types = set(USER_ENGINE_STORE_RECORD_TYPES)
encoded_types = set()
for value in records.values():
record = store_record_for(value)
encoded_types.add(record.record_type)
json.dumps(record.payload)
decoded = domain_record_from_store_record(
StoreRecord(
record_type=record.record_type,
record_key=record.record_key,
payload=json.loads(json.dumps(record.payload)),
tenant=record.tenant,
user_id=record.user_id,
application_id=record.application_id,
scope_type=record.scope_type,
scope_id=record.scope_id,
)
)
self.assertEqual(decoded, value)
self.assertEqual(encoded_types, expected_types)
def test_record_metadata_supports_provider_indexes(self):
store = InMemoryUserEngineStore()
store.migrate()
records = reference_store_records(store)
tenant_account = store_record_for(records["tenant_account"])
active_context = store_record_for(records["access_context"])
profile_value = store_record_for(records["profile_value"])
factor = store_record_for(records["factor"])
self.assertEqual(tenant_account.record_key, f'["{TENANT}","{USER_ID}"]')
self.assertEqual(tenant_account.tenant, TENANT)
self.assertEqual(tenant_account.user_id, USER_ID)
self.assertEqual(active_context.tenant, TENANT)
self.assertEqual(active_context.user_id, USER_ID)
self.assertEqual(active_context.scope_type, "tenant")
self.assertEqual(profile_value.scope_type, "global")
self.assertEqual(factor.user_id, USER_ID)
def test_durable_payloads_are_raw_state_not_diagnostics(self):
store = InMemoryUserEngineStore()
store.migrate()
records = reference_store_records(store)
factor = store_record_for(records["factor"])
access_profile = store_record_for(records["access_profile"])
self.assertIn(RAW_FACTOR_VALUE, json.dumps(factor.payload))
self.assertIn(PROFILE_SECRET_VALUE, json.dumps(access_profile.payload))
def test_unknown_record_type_fails_closed(self):
with self.assertRaises(ValueError):
domain_record_from_store_record(
StoreRecord(record_type="unknown", record_key="1", payload={})
)
with self.assertRaises(TypeError):
store_record_for(object())
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,110 @@
---
id: USER-WP-0017
type: workplan
title: "Durable Store Record Serialization"
domain: netkingdom
repo: user-engine
status: finished
owner: codex
topic_slug: netkingdom
planning_priority: medium
planning_order: 17
created: "2026-06-16"
updated: "2026-06-16"
depends_on:
- USER-WP-0016
---
# USER-WP-0017 - Durable Store Record Serialization
## Goal
Define a dependency-free serialization contract for the generic durable store
record shape introduced by USER-WP-0016 so a future Postgres adapter can persist
and restore domain dataclasses through JSONB without embedding ad hoc codecs.
## Scope Direction
This workplan should cover deterministic record keys, adapter metadata columns,
JSON-safe payload encoding, and round-trip decoding for every logical record
type in the migration manifest.
## Non-Goals
- Do not add a production Postgres driver.
- Do not implement connection pooling, migrations, locks, or outbox claiming.
- Do not redact durable payloads; adapters must avoid logging raw payloads.
## Tasks
```task
id: USER-WP-0017-T1
status: done
priority: high
```
Add a store-record envelope matching the generic Postgres bootstrap table
columns.
```task
id: USER-WP-0017-T2
status: done
priority: high
```
Add deterministic record-key and metadata extraction rules for all manifest
record types.
```task
id: USER-WP-0017-T3
status: done
priority: high
```
Add JSON-safe payload encoding for dataclasses, enums, datetimes, tuples, and
mapping fields.
```task
id: USER-WP-0017-T4
status: done
priority: high
```
Add payload decoding back into the original domain dataclasses.
```task
id: USER-WP-0017-T5
status: done
priority: medium
```
Document how future Postgres adapters should use the serialization contract.
## Acceptance Criteria
- Every logical record type in the migration manifest has a codec.
- Encoded payloads can be passed through `json.dumps`.
- Domain records round-trip through `StoreRecord` without losing enum,
datetime, tuple, or nested dataclass structure.
- Composite keys are deterministic and collision-resistant for scoped records.
- Documentation warns that durable payloads may contain sensitive values and
must not be used as diagnostics output.
## Expected Outputs
- `user_engine.store_records` module.
- Store-record serialization tests.
- Durable-store documentation updates.
## Implementation Notes
Implemented on 2026-06-16:
- Added `StoreRecord`, `store_record_for`, `domain_record_from_store_record`,
and manifest validation helpers.
- Added JSON-safe recursive encode/decode support for all current domain
dataclasses used by `UserEngineStore`.
- Added round-trip tests using the same reference records as the conformance
harness.
- Documented the serialization layer as the provider-neutral prerequisite to a
live Postgres adapter.