Files
kontextual-engine/tests/test_asset_registry.py

465 lines
17 KiB
Python

from pathlib import Path
import pytest
from kontextual_engine import (
Actor,
ActorType,
AssetRegistryService,
AssetRepresentation,
AuthorizationError,
Classification,
ContextEntity,
ContextEntityType,
InMemoryAssetRegistryRepository,
LifecycleState,
MetadataFieldDefinition,
MetadataRecord,
MetadataSchema,
MetadataSchemaAssignment,
MetadataValueType,
OperationContext,
PolicyDecision,
RepresentationKind,
Sensitivity,
SourceReference,
SQLiteAssetRegistryRepository,
ValidationError,
)
def test_asset_registry_service_creates_assets_with_versions_and_audit() -> None:
repo = InMemoryAssetRegistryRepository()
service = AssetRegistryService(repo)
context = operation_context()
source_ref = SourceReference(
source_system="repo",
path="docs/intent.md",
checksum="sha256:source",
)
representation = AssetRepresentation.from_content(
"asset-intent",
RepresentationKind.SOURCE,
"text/markdown",
"# Intent\n",
storage_ref="object://intent-source",
source_ref_id=source_ref.id,
)
metadata = MetadataRecord(
"topic",
"architecture",
provenance={"producer": "human"},
confirmed=True,
)
result = service.create_asset(
"Intent",
Classification(
asset_type="document",
sensitivity=Sensitivity.INTERNAL,
owner="Platform Knowledge",
),
context,
asset_id="asset-intent",
source_refs=[source_ref],
representations=[representation],
metadata_records=[metadata],
)
assert result.asset.id == "asset-intent"
assert result.asset.current_version_id == result.version.version_id
assert result.version.sequence == 1
assert result.audit_event.outcome.value == "success"
assert result.policy_decision.allowed is True
assert repo.get_asset("asset-intent").source_refs[0].path == "docs/intent.md"
assert repo.list_representations(asset_id="asset-intent")[0].storage_ref == "object://intent-source"
assert repo.list_metadata_records("asset-intent")[0].confirmed is True
assert repo.list_audit_events(target="asset:asset-intent")[0].operation == "asset.create"
def test_asset_registry_lifecycle_policy_denial_fails_closed_and_audits() -> None:
repo = InMemoryAssetRegistryRepository()
service = AssetRegistryService(repo, policy_gateway=DenyLifecyclePolicy())
context = operation_context()
created = service.create_asset(
"Governed Asset",
Classification(asset_type="document", sensitivity=Sensitivity.CONFIDENTIAL),
context,
asset_id="asset-governed",
)
with pytest.raises(AuthorizationError) as exc_info:
service.transition_lifecycle(created.asset.id, LifecycleState.RETIRED, context)
events = repo.list_audit_events(target="asset:asset-governed")
assert exc_info.value.details["correlation_id"] == "corr-test"
assert exc_info.value.details["policy_decision"]["effect"] == "fail_closed"
assert [event.outcome.value for event in events] == ["success", "denied"]
assert repo.get_asset("asset-governed").lifecycle == LifecycleState.ACTIVE
def test_asset_registry_create_is_idempotent_for_same_key_and_payload() -> None:
repo = InMemoryAssetRegistryRepository()
service = AssetRegistryService(repo)
context = operation_context()
classification = Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC)
source_ref = SourceReference(source_system="repo", path="README.md")
representation = AssetRepresentation.from_content(
"asset-readme",
RepresentationKind.SOURCE,
"text/markdown",
"# Readme\n",
)
first = service.create_asset(
"Readme",
classification,
context,
asset_id="asset-readme",
source_refs=[source_ref],
representations=[representation],
idempotency_key="create-readme",
)
second = service.create_asset(
"Readme",
classification,
context,
asset_id="asset-readme",
source_refs=[source_ref],
representations=[representation],
idempotency_key="create-readme",
)
assert second.asset.id == first.asset.id
assert second.version.version_id == first.version.version_id
assert second.audit_event.event_id == first.audit_event.event_id
assert len(repo.list_versions("asset-readme")) == 1
assert len(repo.list_audit_events(target="asset:asset-readme")) == 1
with pytest.raises(ValidationError, match="Idempotency key"):
service.create_asset(
"Readme renamed",
classification,
context,
asset_id="asset-readme",
source_refs=[source_ref],
representations=[representation],
idempotency_key="create-readme",
)
def test_asset_registry_relationships_create_versions_and_audit() -> None:
repo = InMemoryAssetRegistryRepository()
service = AssetRegistryService(repo)
context = operation_context()
classification = Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL)
source = service.create_asset("Source", classification, context, asset_id="asset-source")
target = service.create_asset("Target", classification, context, asset_id="asset-target")
result = service.link_asset_to_asset(
source.asset.id,
target.asset.id,
"depends_on",
context,
confidence=0.91,
provenance={"producer": "test"},
)
relationships = repo.list_relationships(source_id=source.asset.id)
versions = repo.list_versions(source.asset.id)
assert relationships == [result.relationship]
assert result.relationship.target_id == target.asset.id
assert result.relationship.confidence == 0.91
assert versions[-1].change_type.value == "relationship_changed"
assert versions[-1].relationship_delta["added"]["predicate"] == "depends_on"
assert repo.get_asset(source.asset.id).current_version_id == result.version.version_id
assert repo.get_asset(target.asset.id).current_version_id == target.version.version_id
assert repo.list_audit_events(target=f"asset:{source.asset.id}")[-1].operation == "asset.relationship.add"
def test_asset_registry_validates_metadata_schema_before_writes() -> None:
repo = InMemoryAssetRegistryRepository()
schema = MetadataSchema(
schema_id="schema-note-v1",
name="Note Metadata",
asset_types=("note",),
allow_unknown=False,
fields=(
MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True),
MetadataFieldDefinition("priority", MetadataValueType.INTEGER, min_value=1, max_value=5),
),
)
service = AssetRegistryService(repo, metadata_schemas=[schema])
context = operation_context()
with pytest.raises(ValidationError) as exc_info:
service.create_asset(
"Note",
Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-invalid-note",
metadata_records=[MetadataRecord("owner", "Platform Knowledge", confirmed=False)],
)
assert {issue["code"] for issue in exc_info.value.details["issues"]} == {
"metadata.confirmation_required"
}
assert repo.list_assets() == []
created = service.create_asset(
"Note",
Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-note",
metadata_records=[MetadataRecord("owner", "Platform Knowledge", confirmed=True)],
)
with pytest.raises(ValidationError) as update_exc:
service.add_metadata_record(created.asset.id, MetadataRecord("priority", 9), context)
assert {issue["code"] for issue in update_exc.value.details["issues"]} == {
"metadata.value_too_large"
}
assert [record.key for record in repo.list_metadata_records(created.asset.id)] == ["owner"]
def test_asset_registry_applies_persisted_metadata_schema_assignments() -> None:
repo = InMemoryAssetRegistryRepository()
service = AssetRegistryService(repo)
context = operation_context()
schema = MetadataSchema(
schema_id="schema-policy-note-v1",
name="Policy Note Metadata",
allow_unknown=False,
fields=(
MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True),
MetadataFieldDefinition("state", MetadataValueType.STRING, allowed_values=("draft", "approved")),
),
)
assignment = MetadataSchemaAssignment(
assignment_id="assignment-policy-note",
schema_id=schema.schema_id,
asset_types=("policy-note",),
sensitivities=(Sensitivity.INTERNAL,),
policy_ref="local://metadata-policy/policy-note",
)
service.register_metadata_schema(schema, context)
service.assign_metadata_schema(assignment, context)
with pytest.raises(ValidationError) as exc_info:
service.create_asset(
"Policy Note",
Classification(asset_type="policy-note", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-policy-note-invalid",
metadata_records=[MetadataRecord("state", "published")],
)
assert {issue["code"] for issue in exc_info.value.details["issues"]} == {
"metadata.required_missing",
"metadata.value_not_allowed",
}
assert repo.list_assets() == []
created = service.create_asset(
"Policy Note",
Classification(asset_type="policy-note", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-policy-note",
metadata_records=[
MetadataRecord("owner", "Platform Knowledge", confirmed=True),
MetadataRecord("state", "approved", confirmed=True),
],
)
assert created.asset.id == "asset-policy-note"
assert service.list_metadata_schema_assignments()[0].policy_ref == "local://metadata-policy/policy-note"
def test_sqlite_asset_registry_survives_reinstantiation(tmp_path: Path) -> None:
db_path = tmp_path / "registry.sqlite"
repo = SQLiteAssetRegistryRepository(db_path)
service = AssetRegistryService(repo)
context = operation_context()
source_ref = SourceReference(source_system="repo", path="README.md", checksum="sha256:readme")
source = AssetRepresentation.from_content(
"asset-readme",
RepresentationKind.SOURCE,
"text/markdown",
"# Readme\n",
storage_ref="object://readme-source",
)
created = service.create_asset(
"Readme",
Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC),
context,
asset_id="asset-readme",
source_refs=[source_ref],
representations=[source],
)
service.add_metadata_record(
created.asset.id,
MetadataRecord("owner", "Platform Knowledge", confirmed=True),
context,
)
service.request_delete(created.asset.id, context)
reloaded = SQLiteAssetRegistryRepository(db_path)
asset = reloaded.get_asset("asset-readme")
assert asset.lifecycle == LifecycleState.DELETE_REQUESTED
assert asset.source_refs[0].path == "README.md"
assert [item.kind for item in reloaded.list_representations(asset_id=asset.id)] == [
RepresentationKind.SOURCE
]
assert [item.key for item in reloaded.list_metadata_records(asset.id)] == ["owner"]
assert [version.sequence for version in reloaded.list_versions(asset.id)] == [1, 2, 3]
assert [event.operation for event in reloaded.list_audit_events(target="asset:asset-readme")] == [
"asset.create",
"asset.metadata.add",
"asset.lifecycle.transition",
]
def test_sqlite_registry_persists_context_entities_relationships_and_idempotency(tmp_path: Path) -> None:
db_path = tmp_path / "registry.sqlite"
repo = SQLiteAssetRegistryRepository(db_path)
service = AssetRegistryService(repo)
context = operation_context()
created = service.create_asset(
"Knowledge Policy",
Classification(asset_type="policy", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-policy",
idempotency_key="create-policy",
)
entity = ContextEntity(
entity_type=ContextEntityType.PROJECT,
name="Kontextual Engine",
entity_id="entity-kontextual",
)
linked = service.link_asset_to_context_entity(
created.asset.id,
entity,
"about_project",
context,
)
reloaded = SQLiteAssetRegistryRepository(db_path)
assert reloaded.get_idempotency_record("create-policy").result_refs["asset_id"] == "asset-policy"
assert reloaded.list_context_entities()[0].entity_id == "entity-kontextual"
assert reloaded.list_relationships(source_id="asset-policy")[0].relationship_id == linked.relationship.relationship_id
assert reloaded.list_versions("asset-policy")[-1].relationship_delta["added"]["target_kind"] == "context_entity"
assert [event.operation for event in reloaded.list_audit_events(target="asset:asset-policy")] == [
"asset.create",
"asset.relationship.add",
]
def test_sqlite_registry_persists_metadata_schemas_and_assignments(tmp_path: Path) -> None:
db_path = tmp_path / "registry.sqlite"
repo = SQLiteAssetRegistryRepository(db_path)
service = AssetRegistryService(repo)
context = operation_context()
schema = MetadataSchema(
schema_id="schema-review-v1",
name="Review Metadata",
allow_unknown=False,
fields=(
MetadataFieldDefinition("reviewer", MetadataValueType.STRING, required=True, require_confirmed=True),
MetadataFieldDefinition("score", MetadataValueType.NUMBER, min_value=0, max_value=1),
),
)
service.register_metadata_schema(schema, context)
service.assign_metadata_schema(
MetadataSchemaAssignment(
assignment_id="assignment-review-documents",
schema_id=schema.schema_id,
asset_types=("review",),
),
context,
)
reloaded_service = AssetRegistryService(SQLiteAssetRegistryRepository(db_path))
with pytest.raises(ValidationError) as exc_info:
reloaded_service.create_asset(
"Review",
Classification(asset_type="review", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-review-invalid",
metadata_records=[
MetadataRecord("reviewer", "Ada", confirmed=False),
MetadataRecord("score", 1.7),
],
)
assert {issue["code"] for issue in exc_info.value.details["issues"]} == {
"metadata.confirmation_required",
"metadata.value_too_large",
}
created = reloaded_service.create_asset(
"Review",
Classification(asset_type="review", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-review",
metadata_records=[
MetadataRecord("reviewer", "Ada", confirmed=True),
MetadataRecord("score", 0.92),
],
)
reloaded_repo = SQLiteAssetRegistryRepository(db_path)
assert created.asset.id == "asset-review"
assert reloaded_repo.get_metadata_schema("schema-review-v1").name == "Review Metadata"
assert reloaded_repo.get_metadata_schema_assignment("assignment-review-documents").schema_id == "schema-review-v1"
def test_sqlite_registry_enforces_representation_asset_reference(tmp_path: Path) -> None:
repo = SQLiteAssetRegistryRepository(tmp_path / "registry.sqlite")
representation = AssetRepresentation.from_content(
"missing-asset",
RepresentationKind.NORMALIZED,
"text/plain",
"normalized",
)
with pytest.raises(ValidationError, match="unknown asset"):
repo.save_representation(representation)
def operation_context() -> OperationContext:
actor = Actor.create(
ActorType.HUMAN,
actor_id="user-test",
display_name="Test User",
groups=["engineering"],
)
return OperationContext.create(actor, correlation_id="corr-test")
class DenyLifecyclePolicy:
def authorize(
self,
context: OperationContext,
action: str,
resource: str,
*,
resource_metadata: dict[str, str] | None = None,
) -> PolicyDecision:
if action == "asset.lifecycle.transition":
return PolicyDecision.fail_closed(
context.actor.id,
action,
resource,
reason="lifecycle transitions require review",
context={"resource_metadata": resource_metadata or {}},
)
return PolicyDecision.allow(context.actor.id, action, resource)