metadata schema validation

This commit is contained in:
2026-05-06 02:50:08 +02:00
parent 565a5643a3
commit c271385e35
8 changed files with 415 additions and 7 deletions

View File

@@ -13,7 +13,10 @@ from kontextual_engine import (
ContextEntityType,
InMemoryAssetRegistryRepository,
LifecycleState,
MetadataFieldDefinition,
MetadataRecord,
MetadataSchema,
MetadataValueType,
OperationContext,
PolicyDecision,
RepresentationKind,
@@ -175,6 +178,52 @@ def test_asset_registry_relationships_create_versions_and_audit() -> None:
assert repo.list_audit_events(target=f"asset:{source.asset.id}")[-1].operation == "asset.relationship.add"
def test_asset_registry_validates_metadata_schema_before_writes() -> None:
repo = InMemoryAssetRegistryRepository()
schema = MetadataSchema(
schema_id="schema-note-v1",
name="Note Metadata",
asset_types=("note",),
allow_unknown=False,
fields=(
MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True),
MetadataFieldDefinition("priority", MetadataValueType.INTEGER, min_value=1, max_value=5),
),
)
service = AssetRegistryService(repo, metadata_schemas=[schema])
context = operation_context()
with pytest.raises(ValidationError) as exc_info:
service.create_asset(
"Note",
Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-invalid-note",
metadata_records=[MetadataRecord("owner", "Platform Knowledge", confirmed=False)],
)
assert {issue["code"] for issue in exc_info.value.details["issues"]} == {
"metadata.confirmation_required"
}
assert repo.list_assets() == []
created = service.create_asset(
"Note",
Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-note",
metadata_records=[MetadataRecord("owner", "Platform Knowledge", confirmed=True)],
)
with pytest.raises(ValidationError) as update_exc:
service.add_metadata_record(created.asset.id, MetadataRecord("priority", 9), context)
assert {issue["code"] for issue in update_exc.value.details["issues"]} == {
"metadata.value_too_large"
}
assert [record.key for record in repo.list_metadata_records(created.asset.id)] == ["owner"]
def test_sqlite_asset_registry_survives_reinstantiation(tmp_path: Path) -> None:
db_path = tmp_path / "registry.sqlite"
repo = SQLiteAssetRegistryRepository(db_path)