From c271385e35a14226e232bf0ac14b0cb9a4cf8a18 Mon Sep 17 00:00:00 2001 From: tegwick Date: Wed, 6 May 2026 02:50:08 +0200 Subject: [PATCH] metadata schema validation --- docs/asset-registry-implementation.md | 8 +- src/kontextual_engine/__init__.py | 8 + src/kontextual_engine/core/__init__.py | 15 +- src/kontextual_engine/core/metadata.py | 281 ++++++++++++++++++ .../services/asset_service.py | 17 ++ tests/test_asset_registry.py | 49 +++ tests/test_core_architecture.py | 34 +++ ...WP-0005-asset-registry-governance-state.md | 10 +- 8 files changed, 415 insertions(+), 7 deletions(-) diff --git a/docs/asset-registry-implementation.md b/docs/asset-registry-implementation.md index a25675c..80d16c9 100644 --- a/docs/asset-registry-implementation.md +++ b/docs/asset-registry-implementation.md @@ -33,6 +33,8 @@ and SQLite repositories are adapters behind those ports. - Stable `KnowledgeAsset` creation with explicit source references. - Separate source, normalized, and derived `AssetRepresentation` records. - `MetadataRecord` persistence with inferred/confirmed semantics preserved. +- Custom metadata schema primitives with structured validation issues. +- Metadata schema validation before asset create and metadata update writes. - Actor and `OperationContext` required for material mutations. - Policy gateway authorization before asset mutations. - Fail-closed policy denial through `AuthorizationError`. @@ -70,7 +72,8 @@ idempotency key. ## Not Yet Implemented -- Full custom metadata schema validation. +- Schema registry persistence and policy-assigned schema selection. +- Standard metadata filtering beyond lifecycle and asset type. - Policy assignment storage and enterprise policy adapters. - Conflict detection beyond version-sequence uniqueness. - Restore and supersession service operations. @@ -91,4 +94,5 @@ These remain in scope for later `KONT-WP-0005` tasks or adjacent workplans. - idempotent asset creation and conflicting idempotency-key reuse, - relationship creation with source-asset versioning and audit, - SQLite reload preserving context entities, relationships, and idempotency - records. + records, +- custom metadata schema validation before registry writes. diff --git a/src/kontextual_engine/__init__.py b/src/kontextual_engine/__init__.py index ed8e648..7b7f60f 100644 --- a/src/kontextual_engine/__init__.py +++ b/src/kontextual_engine/__init__.py @@ -36,7 +36,11 @@ from .core import ( IngestionJobStatus, KnowledgeAsset, LifecycleState, + MetadataFieldDefinition, MetadataRecord, + MetadataSchema, + MetadataValidationIssue, + MetadataValueType, NormalizedDocument, OperationContext, PolicyDecision, @@ -135,7 +139,11 @@ __all__ = [ "KnowledgeAsset", "KontextualError", "LifecycleState", + "MetadataFieldDefinition", "MetadataRecord", + "MetadataSchema", + "MetadataValidationIssue", + "MetadataValueType", "NormalizedDocument", "NotFoundError", "OperationRun", diff --git a/src/kontextual_engine/core/__init__.py b/src/kontextual_engine/core/__init__.py index a2d7e62..0db0aec 100644 --- a/src/kontextual_engine/core/__init__.py +++ b/src/kontextual_engine/core/__init__.py @@ -14,7 +14,16 @@ from .ingestion import ( NormalizedDocument, SourcePayload, ) -from .metadata import Classification, LifecycleState, MetadataRecord, Sensitivity +from .metadata import ( + Classification, + LifecycleState, + MetadataFieldDefinition, + MetadataRecord, + MetadataSchema, + MetadataValidationIssue, + MetadataValueType, + Sensitivity, +) from .policy import PolicyDecision, PolicyEffect from .primitives import content_digest, mapping_digest, new_id, stable_json_dumps, utc_now from .provenance import ( @@ -52,7 +61,11 @@ __all__ = [ "IngestionJobStatus", "KnowledgeAsset", "LifecycleState", + "MetadataFieldDefinition", "MetadataRecord", + "MetadataSchema", + "MetadataValidationIssue", + "MetadataValueType", "NormalizedDocument", "OperationContext", "PolicyDecision", diff --git a/src/kontextual_engine/core/metadata.py b/src/kontextual_engine/core/metadata.py index 2ce5e9f..ab76af5 100644 --- a/src/kontextual_engine/core/metadata.py +++ b/src/kontextual_engine/core/metadata.py @@ -25,6 +25,271 @@ class Sensitivity(str, Enum): RESTRICTED = "restricted" +class MetadataValueType(str, Enum): + STRING = "string" + INTEGER = "integer" + NUMBER = "number" + BOOLEAN = "boolean" + LIST = "list" + OBJECT = "object" + + +@dataclass(frozen=True) +class MetadataValidationIssue: + code: str + message: str + key: str | None = None + details: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "code": self.code, + "message": self.message, + "key": self.key, + "details": dict(self.details), + } + ) + + +@dataclass(frozen=True) +class MetadataFieldDefinition: + key: str + value_type: MetadataValueType | str + required: bool = False + allow_multiple: bool = False + allowed_values: tuple[Any, ...] = () + min_value: int | float | None = None + max_value: int | float | None = None + min_length: int | None = None + max_length: int | None = None + require_confirmed: bool = False + description: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + object.__setattr__(self, "value_type", MetadataValueType(self.value_type)) + object.__setattr__(self, "allowed_values", tuple(self.allowed_values)) + + def validate(self, records: list["MetadataRecord"]) -> list[MetadataValidationIssue]: + issues: list[MetadataValidationIssue] = [] + if self.required and not records: + issues.append( + MetadataValidationIssue( + code="metadata.required_missing", + key=self.key, + message="Required metadata field is missing", + ) + ) + return issues + if not self.allow_multiple and len(records) > 1: + issues.append( + MetadataValidationIssue( + code="metadata.multiple_not_allowed", + key=self.key, + message="Metadata field does not allow multiple values", + details={"count": len(records)}, + ) + ) + for record in records: + issues.extend(self._validate_record(record)) + return issues + + def _validate_record(self, record: "MetadataRecord") -> list[MetadataValidationIssue]: + issues: list[MetadataValidationIssue] = [] + value = record.value + if not _matches_type(value, self.value_type): + issues.append( + MetadataValidationIssue( + code="metadata.type_mismatch", + key=self.key, + message="Metadata value does not match schema type", + details={ + "expected": self.value_type.value, + "actual": type(value).__name__, + }, + ) + ) + return issues + if self.require_confirmed and not record.confirmed: + issues.append( + MetadataValidationIssue( + code="metadata.confirmation_required", + key=self.key, + message="Metadata field requires confirmed value", + ) + ) + if self.allowed_values and value not in self.allowed_values: + issues.append( + MetadataValidationIssue( + code="metadata.value_not_allowed", + key=self.key, + message="Metadata value is not in the allowed set", + details={"allowed_values": list(self.allowed_values), "value": value}, + ) + ) + if isinstance(value, (int, float)) and not isinstance(value, bool): + if self.min_value is not None and value < self.min_value: + issues.append( + MetadataValidationIssue( + code="metadata.value_too_small", + key=self.key, + message="Metadata numeric value is below minimum", + details={"min_value": self.min_value, "value": value}, + ) + ) + if self.max_value is not None and value > self.max_value: + issues.append( + MetadataValidationIssue( + code="metadata.value_too_large", + key=self.key, + message="Metadata numeric value is above maximum", + details={"max_value": self.max_value, "value": value}, + ) + ) + if isinstance(value, (str, list, dict)): + length = len(value) + if self.min_length is not None and length < self.min_length: + issues.append( + MetadataValidationIssue( + code="metadata.length_too_short", + key=self.key, + message="Metadata value length is below minimum", + details={"min_length": self.min_length, "length": length}, + ) + ) + if self.max_length is not None and length > self.max_length: + issues.append( + MetadataValidationIssue( + code="metadata.length_too_long", + key=self.key, + message="Metadata value length is above maximum", + details={"max_length": self.max_length, "length": length}, + ) + ) + return issues + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "key": self.key, + "value_type": self.value_type.value, + "required": self.required, + "allow_multiple": self.allow_multiple, + "allowed_values": list(self.allowed_values), + "min_value": self.min_value, + "max_value": self.max_value, + "min_length": self.min_length, + "max_length": self.max_length, + "require_confirmed": self.require_confirmed, + "description": self.description, + "metadata": dict(self.metadata), + } + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "MetadataFieldDefinition": + return cls( + key=data["key"], + value_type=MetadataValueType(data["value_type"]), + required=bool(data.get("required", False)), + allow_multiple=bool(data.get("allow_multiple", False)), + allowed_values=tuple(data.get("allowed_values", [])), + min_value=data.get("min_value"), + max_value=data.get("max_value"), + min_length=data.get("min_length"), + max_length=data.get("max_length"), + require_confirmed=bool(data.get("require_confirmed", False)), + description=data.get("description"), + metadata=dict(data.get("metadata", {})), + ) + + +@dataclass(frozen=True) +class MetadataSchema: + schema_id: str + name: str + fields: tuple[MetadataFieldDefinition, ...] + allow_unknown: bool = True + asset_types: tuple[str, ...] = () + version: str = "1" + description: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + object.__setattr__(self, "fields", tuple(self.fields)) + object.__setattr__(self, "asset_types", tuple(self.asset_types)) + + def applies_to(self, classification: "Classification") -> bool: + return not self.asset_types or classification.asset_type in self.asset_types + + def validate( + self, + records: list["MetadataRecord"] | tuple["MetadataRecord", ...], + ) -> list[MetadataValidationIssue]: + by_key: dict[str, list[MetadataRecord]] = {} + for record in records: + by_key.setdefault(record.key, []).append(record) + + issues: list[MetadataValidationIssue] = [] + known_keys = {field.key for field in self.fields} + for field_definition in self.fields: + issues.extend(field_definition.validate(by_key.get(field_definition.key, []))) + if not self.allow_unknown: + for key in sorted(set(by_key) - known_keys): + issues.append( + MetadataValidationIssue( + code="metadata.unknown_field", + key=key, + message="Metadata field is not allowed by schema", + ) + ) + return issues + + def validate_or_raise( + self, + records: list["MetadataRecord"] | tuple["MetadataRecord", ...], + ) -> None: + from kontextual_engine.errors import ValidationError + + issues = self.validate(records) + if issues: + raise ValidationError( + "Metadata schema validation failed", + details={ + "schema_id": self.schema_id, + "issues": [issue.to_dict() for issue in issues], + }, + ) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "schema_id": self.schema_id, + "name": self.name, + "fields": [field_definition.to_dict() for field_definition in self.fields], + "allow_unknown": self.allow_unknown, + "asset_types": list(self.asset_types), + "version": self.version, + "description": self.description, + "metadata": dict(self.metadata), + } + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "MetadataSchema": + return cls( + schema_id=data["schema_id"], + name=data["name"], + fields=tuple(MetadataFieldDefinition.from_dict(item) for item in data.get("fields", [])), + allow_unknown=bool(data.get("allow_unknown", True)), + asset_types=tuple(data.get("asset_types", [])), + version=str(data.get("version", "1")), + description=data.get("description"), + metadata=dict(data.get("metadata", {})), + ) + + @dataclass(frozen=True) class Classification: asset_type: str @@ -95,3 +360,19 @@ class MetadataRecord: confirmed=bool(data.get("confirmed", False)), created_at=data["created_at"], ) + + +def _matches_type(value: Any, value_type: MetadataValueType) -> bool: + if value_type == MetadataValueType.STRING: + return isinstance(value, str) + if value_type == MetadataValueType.INTEGER: + return isinstance(value, int) and not isinstance(value, bool) + if value_type == MetadataValueType.NUMBER: + return isinstance(value, (int, float)) and not isinstance(value, bool) + if value_type == MetadataValueType.BOOLEAN: + return isinstance(value, bool) + if value_type == MetadataValueType.LIST: + return isinstance(value, list) + if value_type == MetadataValueType.OBJECT: + return isinstance(value, dict) + return False diff --git a/src/kontextual_engine/services/asset_service.py b/src/kontextual_engine/services/asset_service.py index c7d0dbe..ba04ca4 100644 --- a/src/kontextual_engine/services/asset_service.py +++ b/src/kontextual_engine/services/asset_service.py @@ -17,6 +17,7 @@ from kontextual_engine.core import ( LifecycleState, mapping_digest, MetadataRecord, + MetadataSchema, OperationContext, PolicyDecision, RelationshipTargetKind, @@ -49,9 +50,11 @@ class AssetRegistryService: repository: AssetRegistryRepository, *, policy_gateway: PolicyGateway | None = None, + metadata_schemas: list[MetadataSchema] | tuple[MetadataSchema, ...] | None = None, ) -> None: self.repository = repository self.policy_gateway = policy_gateway or AllowAllPolicyGateway() + self.metadata_schemas = tuple(metadata_schemas or ()) def create_asset( self, @@ -79,6 +82,7 @@ class AssetRegistryService: existing = self._idempotent_lookup("asset.create", idempotency_key, request_hash) if existing: return self._asset_change_from_idempotency(existing) + self._validate_metadata_records(classification, list(metadata_records or [])) asset = KnowledgeAsset.create( title, @@ -145,6 +149,10 @@ class AssetRegistryService: asset = self.repository.get_asset(asset_id) decision = self._authorize(context, "asset.metadata.add", f"asset:{asset.id}") next_sequence = self._next_sequence(asset.id) + self._validate_metadata_records( + asset.classification, + self.repository.list_metadata_records(asset.id) + [record], + ) self.repository.save_metadata_record(asset.id, record) version = AssetVersion( asset_id=asset.id, @@ -404,6 +412,15 @@ class AssetRegistryService: versions = self.repository.list_versions(asset_id) return len(versions) + 1 + def _validate_metadata_records( + self, + classification: Classification, + records: list[MetadataRecord], + ) -> None: + for schema in self.metadata_schemas: + if schema.applies_to(classification): + schema.validate_or_raise(records) + def _idempotent_lookup( self, operation: str, diff --git a/tests/test_asset_registry.py b/tests/test_asset_registry.py index 48d0f51..79b4a72 100644 --- a/tests/test_asset_registry.py +++ b/tests/test_asset_registry.py @@ -13,7 +13,10 @@ from kontextual_engine import ( ContextEntityType, InMemoryAssetRegistryRepository, LifecycleState, + MetadataFieldDefinition, MetadataRecord, + MetadataSchema, + MetadataValueType, OperationContext, PolicyDecision, RepresentationKind, @@ -175,6 +178,52 @@ def test_asset_registry_relationships_create_versions_and_audit() -> None: assert repo.list_audit_events(target=f"asset:{source.asset.id}")[-1].operation == "asset.relationship.add" +def test_asset_registry_validates_metadata_schema_before_writes() -> None: + repo = InMemoryAssetRegistryRepository() + schema = MetadataSchema( + schema_id="schema-note-v1", + name="Note Metadata", + asset_types=("note",), + allow_unknown=False, + fields=( + MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True), + MetadataFieldDefinition("priority", MetadataValueType.INTEGER, min_value=1, max_value=5), + ), + ) + service = AssetRegistryService(repo, metadata_schemas=[schema]) + context = operation_context() + + with pytest.raises(ValidationError) as exc_info: + service.create_asset( + "Note", + Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-invalid-note", + metadata_records=[MetadataRecord("owner", "Platform Knowledge", confirmed=False)], + ) + + assert {issue["code"] for issue in exc_info.value.details["issues"]} == { + "metadata.confirmation_required" + } + assert repo.list_assets() == [] + + created = service.create_asset( + "Note", + Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-note", + metadata_records=[MetadataRecord("owner", "Platform Knowledge", confirmed=True)], + ) + + with pytest.raises(ValidationError) as update_exc: + service.add_metadata_record(created.asset.id, MetadataRecord("priority", 9), context) + + assert {issue["code"] for issue in update_exc.value.details["issues"]} == { + "metadata.value_too_large" + } + assert [record.key for record in repo.list_metadata_records(created.asset.id)] == ["owner"] + + def test_sqlite_asset_registry_survives_reinstantiation(tmp_path: Path) -> None: db_path = tmp_path / "registry.sqlite" repo = SQLiteAssetRegistryRepository(db_path) diff --git a/tests/test_core_architecture.py b/tests/test_core_architecture.py index b7293c8..8db59eb 100644 --- a/tests/test_core_architecture.py +++ b/tests/test_core_architecture.py @@ -9,7 +9,10 @@ from kontextual_engine.core import ( DerivedArtifactLineage, KnowledgeAsset, LifecycleState, + MetadataFieldDefinition, MetadataRecord, + MetadataSchema, + MetadataValueType, OperationContext, PolicyDecision, PolicyEffect, @@ -176,3 +179,34 @@ def test_metadata_records_distinguish_inferred_and_confirmed_values() -> None: assert inferred.to_dict()["confidence"] == 0.74 assert inferred.to_dict()["confirmed"] is False assert confirmed.to_dict()["confirmed"] is True + + +def test_metadata_schema_reports_structured_validation_issues() -> None: + schema = MetadataSchema( + schema_id="schema-document-v1", + name="Document Metadata", + asset_types=("document",), + allow_unknown=False, + fields=( + MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True), + MetadataFieldDefinition("confidence", MetadataValueType.NUMBER, min_value=0, max_value=1), + MetadataFieldDefinition("tags", MetadataValueType.LIST, allow_multiple=False), + ), + ) + records = [ + MetadataRecord("owner", "Platform Knowledge", confirmed=False), + MetadataRecord("confidence", 1.4), + MetadataRecord("unknown", "surprise"), + ] + + issues = schema.validate(records) + codes = {issue.code for issue in issues} + + assert codes == { + "metadata.confirmation_required", + "metadata.value_too_large", + "metadata.unknown_field", + } + assert schema.applies_to(Classification(asset_type="document")) is True + assert schema.applies_to(Classification(asset_type="dataset")) is False + assert MetadataSchema.from_dict(schema.to_dict()).fields[0].value_type == MetadataValueType.STRING diff --git a/workplans/KONT-WP-0005-asset-registry-governance-state.md b/workplans/KONT-WP-0005-asset-registry-governance-state.md index 3a0281a..fb87474 100644 --- a/workplans/KONT-WP-0005-asset-registry-governance-state.md +++ b/workplans/KONT-WP-0005-asset-registry-governance-state.md @@ -58,10 +58,12 @@ versions, and durable reload behavior. As of 2026-05-06, the registry core has a working asset service, in-memory and SQLite repositories, policy gateway boundary, audit events, versions, representations, metadata records, context entities, asset/context -relationships, and idempotent asset creation. Remaining work in this workplan -is concentrated on deeper metadata schema validation, policy assignment -persistence, restore/supersession operations, conflict semantics beyond -sequence/idempotency checks, and batch partial-failure envelopes. +relationships, idempotent asset creation, and custom metadata schema +validation before registry writes. Remaining work in this workplan is +concentrated on schema registry/policy assignment, standard metadata filtering +beyond lifecycle and asset type, restore/supersession operations, conflict +semantics beyond sequence/idempotency checks, and batch partial-failure +envelopes. ## G5.1 - Implement stable asset identity and source references