generated from coulomb/repo-seed
903 lines
33 KiB
Python
903 lines
33 KiB
Python
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from kontextual_engine import (
|
|
Actor,
|
|
ActorType,
|
|
AssetRegistryService,
|
|
AssetRepresentation,
|
|
AssetVersion,
|
|
AuthorizationError,
|
|
AuditEvent,
|
|
AuditOutcome,
|
|
Classification,
|
|
ContextEntity,
|
|
ContextEntityType,
|
|
InMemoryAssetRegistryRepository,
|
|
IngestionJob,
|
|
LifecycleState,
|
|
MetadataFieldDefinition,
|
|
MetadataRecord,
|
|
MetadataSchema,
|
|
MetadataSchemaAssignment,
|
|
MetadataValueType,
|
|
OperationContext,
|
|
PolicyDecision,
|
|
RepresentationKind,
|
|
Sensitivity,
|
|
SourceReference,
|
|
SQLiteAssetRegistryRepository,
|
|
ValidationError,
|
|
VersionChangeType,
|
|
)
|
|
|
|
|
|
def test_asset_registry_service_creates_assets_with_versions_and_audit() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
source_ref = SourceReference(
|
|
source_system="repo",
|
|
path="docs/intent.md",
|
|
checksum="sha256:source",
|
|
)
|
|
representation = AssetRepresentation.from_content(
|
|
"asset-intent",
|
|
RepresentationKind.SOURCE,
|
|
"text/markdown",
|
|
"# Intent\n",
|
|
storage_ref="object://intent-source",
|
|
source_ref_id=source_ref.id,
|
|
)
|
|
metadata = MetadataRecord(
|
|
"topic",
|
|
"architecture",
|
|
provenance={"producer": "human"},
|
|
confirmed=True,
|
|
)
|
|
|
|
result = service.create_asset(
|
|
"Intent",
|
|
Classification(
|
|
asset_type="document",
|
|
sensitivity=Sensitivity.INTERNAL,
|
|
owner="Platform Knowledge",
|
|
),
|
|
context,
|
|
asset_id="asset-intent",
|
|
source_refs=[source_ref],
|
|
representations=[representation],
|
|
metadata_records=[metadata],
|
|
)
|
|
|
|
assert result.asset.id == "asset-intent"
|
|
assert result.asset.current_version_id == result.version.version_id
|
|
assert result.version.sequence == 1
|
|
assert result.audit_event.outcome.value == "success"
|
|
assert result.policy_decision.allowed is True
|
|
assert repo.get_asset("asset-intent").source_refs[0].path == "docs/intent.md"
|
|
assert repo.list_representations(asset_id="asset-intent")[0].storage_ref == "object://intent-source"
|
|
assert repo.list_metadata_records("asset-intent")[0].confirmed is True
|
|
assert repo.list_audit_events(target="asset:asset-intent")[0].operation == "asset.create"
|
|
|
|
|
|
def test_asset_registry_lifecycle_policy_denial_fails_closed_and_audits() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = AssetRegistryService(repo, policy_gateway=DenyLifecyclePolicy())
|
|
context = operation_context()
|
|
created = service.create_asset(
|
|
"Governed Asset",
|
|
Classification(asset_type="document", sensitivity=Sensitivity.CONFIDENTIAL),
|
|
context,
|
|
asset_id="asset-governed",
|
|
)
|
|
|
|
with pytest.raises(AuthorizationError) as exc_info:
|
|
service.transition_lifecycle(created.asset.id, LifecycleState.RETIRED, context)
|
|
|
|
events = repo.list_audit_events(target="asset:asset-governed")
|
|
|
|
assert exc_info.value.details["correlation_id"] == "corr-test"
|
|
assert exc_info.value.details["policy_decision"]["effect"] == "fail_closed"
|
|
assert [event.outcome.value for event in events] == ["success", "denied"]
|
|
assert repo.get_asset("asset-governed").lifecycle == LifecycleState.ACTIVE
|
|
|
|
|
|
def test_asset_registry_create_is_idempotent_for_same_key_and_payload() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
classification = Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC)
|
|
source_ref = SourceReference(source_system="repo", path="README.md")
|
|
representation = AssetRepresentation.from_content(
|
|
"asset-readme",
|
|
RepresentationKind.SOURCE,
|
|
"text/markdown",
|
|
"# Readme\n",
|
|
)
|
|
|
|
first = service.create_asset(
|
|
"Readme",
|
|
classification,
|
|
context,
|
|
asset_id="asset-readme",
|
|
source_refs=[source_ref],
|
|
representations=[representation],
|
|
idempotency_key="create-readme",
|
|
)
|
|
second = service.create_asset(
|
|
"Readme",
|
|
classification,
|
|
context,
|
|
asset_id="asset-readme",
|
|
source_refs=[source_ref],
|
|
representations=[representation],
|
|
idempotency_key="create-readme",
|
|
)
|
|
|
|
assert second.asset.id == first.asset.id
|
|
assert second.version.version_id == first.version.version_id
|
|
assert second.audit_event.event_id == first.audit_event.event_id
|
|
assert len(repo.list_versions("asset-readme")) == 1
|
|
assert len(repo.list_audit_events(target="asset:asset-readme")) == 1
|
|
|
|
with pytest.raises(ValidationError, match="Idempotency key"):
|
|
service.create_asset(
|
|
"Readme renamed",
|
|
classification,
|
|
context,
|
|
asset_id="asset-readme",
|
|
source_refs=[source_ref],
|
|
representations=[representation],
|
|
idempotency_key="create-readme",
|
|
)
|
|
|
|
|
|
def test_asset_registry_rejects_stale_expected_current_version_for_mutations() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
created = service.create_asset(
|
|
"Conflict Guard",
|
|
Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-conflict",
|
|
)
|
|
updated = service.add_metadata_record(
|
|
created.asset.id,
|
|
MetadataRecord("owner", "Platform Knowledge", confirmed=True),
|
|
context,
|
|
expected_current_version_id=created.version.version_id,
|
|
)
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
service.transition_lifecycle(
|
|
created.asset.id,
|
|
LifecycleState.RETIRED,
|
|
context,
|
|
expected_current_version_id=created.version.version_id,
|
|
)
|
|
|
|
assert exc_info.value.details["code"] == "asset.version_conflict"
|
|
assert exc_info.value.details["expected_current_version_id"] == created.version.version_id
|
|
assert exc_info.value.details["current_version_id"] == updated.version.version_id
|
|
assert repo.get_asset(created.asset.id).lifecycle == LifecycleState.ACTIVE
|
|
assert [version.sequence for version in repo.list_versions(created.asset.id)] == [1, 2]
|
|
|
|
|
|
def test_asset_registry_restore_creates_new_version_without_erasing_history() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
created = service.create_asset(
|
|
"Restorable Asset",
|
|
Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-restorable",
|
|
)
|
|
retired = service.transition_lifecycle(
|
|
created.asset.id,
|
|
LifecycleState.RETIRED,
|
|
context,
|
|
expected_current_version_id=created.version.version_id,
|
|
)
|
|
|
|
restored = service.restore_asset_version(
|
|
created.asset.id,
|
|
created.version.version_id,
|
|
context,
|
|
expected_current_version_id=retired.version.version_id,
|
|
)
|
|
|
|
versions = repo.list_versions(created.asset.id)
|
|
|
|
assert restored.version.change_type.value == "restored"
|
|
assert restored.version.parent_version_id == retired.version.version_id
|
|
assert restored.version.metadata_delta["restored_from_version_id"] == created.version.version_id
|
|
assert restored.version.metadata_delta["restored_from_sequence"] == 1
|
|
assert restored.asset.lifecycle == LifecycleState.ACTIVE
|
|
assert repo.get_asset(created.asset.id).current_version_id == restored.version.version_id
|
|
assert [version.sequence for version in versions] == [1, 2, 3]
|
|
assert [version.change_type.value for version in versions] == [
|
|
"created",
|
|
"lifecycle_changed",
|
|
"restored",
|
|
]
|
|
assert [event.operation for event in repo.list_audit_events(target="asset:asset-restorable")] == [
|
|
"asset.create",
|
|
"asset.lifecycle.transition",
|
|
"asset.version.restore",
|
|
]
|
|
|
|
|
|
def test_asset_registry_supersede_creates_relationship_version_and_audit() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
source = service.create_asset(
|
|
"Old Guide",
|
|
Classification(asset_type="guide", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-old-guide",
|
|
)
|
|
successor = service.create_asset(
|
|
"New Guide",
|
|
Classification(asset_type="guide", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-new-guide",
|
|
)
|
|
|
|
superseded = service.supersede_asset(
|
|
source.asset.id,
|
|
successor.asset.id,
|
|
context,
|
|
reason="New canonical guide",
|
|
expected_current_version_id=source.version.version_id,
|
|
)
|
|
|
|
source_asset = repo.get_asset(source.asset.id)
|
|
relationships = repo.list_relationships(source_id=source.asset.id)
|
|
|
|
assert superseded.version.change_type.value == "superseded"
|
|
assert superseded.relationship.predicate == "superseded_by"
|
|
assert superseded.relationship.target_id == successor.asset.id
|
|
assert relationships == [superseded.relationship]
|
|
assert source_asset.lifecycle == LifecycleState.RETIRED
|
|
assert source_asset.metadata["superseded_by"] == successor.asset.id
|
|
assert source_asset.metadata["supersession_reason"] == "New canonical guide"
|
|
assert superseded.version.relationship_delta["added"]["relationship_id"] == superseded.relationship.relationship_id
|
|
assert superseded.version.metadata_delta["superseded_by"] == successor.asset.id
|
|
assert repo.get_asset(successor.asset.id).current_version_id == successor.version.version_id
|
|
assert repo.list_audit_events(target="asset:asset-old-guide")[-1].operation == "asset.supersede"
|
|
|
|
|
|
def test_asset_registry_relationships_create_versions_and_audit() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
classification = Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL)
|
|
source = service.create_asset("Source", classification, context, asset_id="asset-source")
|
|
target = service.create_asset("Target", classification, context, asset_id="asset-target")
|
|
|
|
result = service.link_asset_to_asset(
|
|
source.asset.id,
|
|
target.asset.id,
|
|
"depends_on",
|
|
context,
|
|
confidence=0.91,
|
|
provenance={"producer": "test"},
|
|
)
|
|
|
|
relationships = repo.list_relationships(source_id=source.asset.id)
|
|
versions = repo.list_versions(source.asset.id)
|
|
|
|
assert relationships == [result.relationship]
|
|
assert result.relationship.target_id == target.asset.id
|
|
assert result.relationship.confidence == 0.91
|
|
assert versions[-1].change_type.value == "relationship_changed"
|
|
assert versions[-1].relationship_delta["added"]["predicate"] == "depends_on"
|
|
assert repo.get_asset(source.asset.id).current_version_id == result.version.version_id
|
|
assert repo.get_asset(target.asset.id).current_version_id == target.version.version_id
|
|
assert repo.list_audit_events(target=f"asset:{source.asset.id}")[-1].operation == "asset.relationship.add"
|
|
|
|
|
|
def test_asset_registry_validates_metadata_schema_before_writes() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
schema = MetadataSchema(
|
|
schema_id="schema-note-v1",
|
|
name="Note Metadata",
|
|
asset_types=("note",),
|
|
allow_unknown=False,
|
|
fields=(
|
|
MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True),
|
|
MetadataFieldDefinition("priority", MetadataValueType.INTEGER, min_value=1, max_value=5),
|
|
),
|
|
)
|
|
service = AssetRegistryService(repo, metadata_schemas=[schema])
|
|
context = operation_context()
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
service.create_asset(
|
|
"Note",
|
|
Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-invalid-note",
|
|
metadata_records=[MetadataRecord("owner", "Platform Knowledge", confirmed=False)],
|
|
)
|
|
|
|
assert {issue["code"] for issue in exc_info.value.details["issues"]} == {
|
|
"metadata.confirmation_required"
|
|
}
|
|
assert repo.list_assets() == []
|
|
|
|
created = service.create_asset(
|
|
"Note",
|
|
Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-note",
|
|
metadata_records=[MetadataRecord("owner", "Platform Knowledge", confirmed=True)],
|
|
)
|
|
|
|
with pytest.raises(ValidationError) as update_exc:
|
|
service.add_metadata_record(created.asset.id, MetadataRecord("priority", 9), context)
|
|
|
|
assert {issue["code"] for issue in update_exc.value.details["issues"]} == {
|
|
"metadata.value_too_large"
|
|
}
|
|
assert [record.key for record in repo.list_metadata_records(created.asset.id)] == ["owner"]
|
|
|
|
|
|
def test_asset_registry_metadata_batch_reports_partial_failures() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
schema = MetadataSchema(
|
|
schema_id="schema-batch-note-v1",
|
|
name="Batch Note Metadata",
|
|
asset_types=("batch-note",),
|
|
allow_unknown=False,
|
|
fields=(
|
|
MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True),
|
|
MetadataFieldDefinition("priority", MetadataValueType.INTEGER, allow_multiple=True, min_value=1, max_value=5),
|
|
),
|
|
)
|
|
service = AssetRegistryService(repo, metadata_schemas=[schema])
|
|
context = operation_context()
|
|
created = service.create_asset(
|
|
"Batch Note",
|
|
Classification(asset_type="batch-note", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-batch-note",
|
|
metadata_records=[MetadataRecord("owner", "Platform Knowledge", confirmed=True)],
|
|
)
|
|
|
|
result = service.add_metadata_records_batch(
|
|
created.asset.id,
|
|
(
|
|
MetadataRecord("priority", 3, record_id="meta-priority-ok"),
|
|
MetadataRecord("priority", 9, record_id="meta-priority-too-large"),
|
|
MetadataRecord("phase", "beta", record_id="meta-phase-unknown"),
|
|
),
|
|
context,
|
|
expected_current_version_id=created.version.version_id,
|
|
)
|
|
|
|
assert result.total == 3
|
|
assert result.succeeded == 1
|
|
assert result.failed == 2
|
|
assert result.partial is True
|
|
assert result.outcome == "partial"
|
|
assert [item.success for item in result.items] == [True, False, False]
|
|
assert result.items[0].result_ref["record_id"] == "meta-priority-ok"
|
|
assert result.items[1].error is not None
|
|
assert result.items[1].error.code == "kontextual.validation"
|
|
assert result.items[1].error.correlation_id == "corr-test"
|
|
assert "metadata schema" in result.items[1].error.remediation
|
|
assert {issue["code"] for issue in result.items[1].error.details["issues"]} == {
|
|
"metadata.value_too_large"
|
|
}
|
|
assert result.items[2].error is not None
|
|
assert {issue["code"] for issue in result.items[2].error.details["issues"]} == {
|
|
"metadata.unknown_field"
|
|
}
|
|
assert result.to_dict()["audit_event_id"] == result.audit_event_id
|
|
|
|
metadata_records = repo.list_metadata_records(created.asset.id)
|
|
versions = repo.list_versions(created.asset.id)
|
|
events = repo.list_audit_events(target=f"asset:{created.asset.id}")
|
|
|
|
assert [record.key for record in metadata_records] == ["owner", "priority"]
|
|
assert metadata_records[1].record_id == "meta-priority-ok"
|
|
assert [version.sequence for version in versions] == [1, 2]
|
|
assert versions[-1].metadata_delta == {"priority": 3}
|
|
assert [event.operation for event in events] == [
|
|
"asset.create",
|
|
"asset.metadata.add",
|
|
"asset.metadata.batch_add",
|
|
]
|
|
assert events[-1].outcome.value == "partial"
|
|
assert events[-1].correlation_id == "corr-test"
|
|
assert events[-1].details["total"] == 3
|
|
assert events[-1].details["succeeded"] == 1
|
|
assert events[-1].details["failed"] == 2
|
|
assert events[-1].details["failed_item_ids"] == [
|
|
"meta-priority-too-large",
|
|
"meta-phase-unknown",
|
|
]
|
|
|
|
|
|
def test_asset_registry_metadata_batch_rejects_stale_expected_version_before_writes() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
created = service.create_asset(
|
|
"Batch Conflict",
|
|
Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-batch-conflict",
|
|
)
|
|
service.add_metadata_record(
|
|
created.asset.id,
|
|
MetadataRecord("owner", "Platform Knowledge", confirmed=True),
|
|
context,
|
|
expected_current_version_id=created.version.version_id,
|
|
)
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
service.add_metadata_records_batch(
|
|
created.asset.id,
|
|
(MetadataRecord("topic", "architecture"),),
|
|
context,
|
|
expected_current_version_id=created.version.version_id,
|
|
)
|
|
|
|
assert exc_info.value.details["code"] == "asset.version_conflict"
|
|
assert exc_info.value.details["operation"] == "asset.metadata.batch_add"
|
|
assert [record.key for record in repo.list_metadata_records(created.asset.id)] == ["owner"]
|
|
assert [event.operation for event in repo.list_audit_events(target=f"asset:{created.asset.id}")] == [
|
|
"asset.create",
|
|
"asset.metadata.add",
|
|
]
|
|
|
|
|
|
def test_asset_registry_applies_persisted_metadata_schema_assignments() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
schema = MetadataSchema(
|
|
schema_id="schema-policy-note-v1",
|
|
name="Policy Note Metadata",
|
|
allow_unknown=False,
|
|
fields=(
|
|
MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True),
|
|
MetadataFieldDefinition("state", MetadataValueType.STRING, allowed_values=("draft", "approved")),
|
|
),
|
|
)
|
|
assignment = MetadataSchemaAssignment(
|
|
assignment_id="assignment-policy-note",
|
|
schema_id=schema.schema_id,
|
|
asset_types=("policy-note",),
|
|
sensitivities=(Sensitivity.INTERNAL,),
|
|
policy_ref="local://metadata-policy/policy-note",
|
|
)
|
|
|
|
service.register_metadata_schema(schema, context)
|
|
service.assign_metadata_schema(assignment, context)
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
service.create_asset(
|
|
"Policy Note",
|
|
Classification(asset_type="policy-note", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-policy-note-invalid",
|
|
metadata_records=[MetadataRecord("state", "published")],
|
|
)
|
|
|
|
assert {issue["code"] for issue in exc_info.value.details["issues"]} == {
|
|
"metadata.required_missing",
|
|
"metadata.value_not_allowed",
|
|
}
|
|
assert repo.list_assets() == []
|
|
|
|
created = service.create_asset(
|
|
"Policy Note",
|
|
Classification(asset_type="policy-note", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-policy-note",
|
|
metadata_records=[
|
|
MetadataRecord("owner", "Platform Knowledge", confirmed=True),
|
|
MetadataRecord("state", "approved", confirmed=True),
|
|
],
|
|
)
|
|
|
|
assert created.asset.id == "asset-policy-note"
|
|
assert service.list_metadata_schema_assignments()[0].policy_ref == "local://metadata-policy/policy-note"
|
|
|
|
|
|
def test_asset_registry_filters_assets_by_standard_metadata_and_records() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
|
|
service.create_asset(
|
|
"Architecture ADR",
|
|
Classification(
|
|
asset_type="document",
|
|
sensitivity=Sensitivity.PUBLIC,
|
|
topics=("architecture", "adr"),
|
|
owner="Platform Knowledge",
|
|
review_state="approved",
|
|
),
|
|
context,
|
|
asset_id="asset-adr",
|
|
metadata_records=[
|
|
MetadataRecord("status", "accepted", confirmed=True),
|
|
MetadataRecord("tags", ["governance", "markdown"], confirmed=True),
|
|
],
|
|
)
|
|
service.create_asset(
|
|
"Internal Risk Note",
|
|
Classification(
|
|
asset_type="note",
|
|
sensitivity=Sensitivity.CONFIDENTIAL,
|
|
topics=("risk",),
|
|
owner="Security",
|
|
review_state="draft",
|
|
),
|
|
context,
|
|
asset_id="asset-risk",
|
|
metadata_records=[MetadataRecord("status", "accepted", confirmed=False)],
|
|
)
|
|
service.create_asset(
|
|
"Architecture Brief",
|
|
Classification(
|
|
asset_type="document",
|
|
sensitivity=Sensitivity.INTERNAL,
|
|
topics=("architecture",),
|
|
owner="Platform Knowledge",
|
|
review_state="draft",
|
|
),
|
|
context,
|
|
asset_id="asset-brief",
|
|
metadata_records=[MetadataRecord("status", "draft", confirmed=True)],
|
|
)
|
|
|
|
assert [asset.id for asset in service.list_assets(sensitivity=Sensitivity.PUBLIC)] == ["asset-adr"]
|
|
assert [asset.id for asset in service.list_assets(owner="Security")] == ["asset-risk"]
|
|
assert [asset.id for asset in service.list_assets(topic="architecture")] == [
|
|
"asset-adr",
|
|
"asset-brief",
|
|
]
|
|
assert [asset.id for asset in service.list_assets(review_state="draft")] == [
|
|
"asset-brief",
|
|
"asset-risk",
|
|
]
|
|
assert [asset.id for asset in service.list_assets(metadata_filters={"status": "accepted"})] == [
|
|
"asset-adr",
|
|
"asset-risk",
|
|
]
|
|
assert [
|
|
asset.id
|
|
for asset in service.list_assets(
|
|
metadata_filters={"status": "accepted"},
|
|
confirmed_metadata_only=True,
|
|
)
|
|
] == ["asset-adr"]
|
|
assert [asset.id for asset in service.list_assets(metadata_filters={"tags": "markdown"})] == [
|
|
"asset-adr"
|
|
]
|
|
|
|
|
|
def test_sqlite_asset_registry_survives_reinstantiation(tmp_path: Path) -> None:
|
|
db_path = tmp_path / "registry.sqlite"
|
|
repo = SQLiteAssetRegistryRepository(db_path)
|
|
service = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
source_ref = SourceReference(source_system="repo", path="README.md", checksum="sha256:readme")
|
|
source = AssetRepresentation.from_content(
|
|
"asset-readme",
|
|
RepresentationKind.SOURCE,
|
|
"text/markdown",
|
|
"# Readme\n",
|
|
storage_ref="object://readme-source",
|
|
)
|
|
created = service.create_asset(
|
|
"Readme",
|
|
Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC),
|
|
context,
|
|
asset_id="asset-readme",
|
|
source_refs=[source_ref],
|
|
representations=[source],
|
|
)
|
|
service.add_metadata_record(
|
|
created.asset.id,
|
|
MetadataRecord("owner", "Platform Knowledge", confirmed=True),
|
|
context,
|
|
)
|
|
service.request_delete(created.asset.id, context)
|
|
|
|
reloaded = SQLiteAssetRegistryRepository(db_path)
|
|
asset = reloaded.get_asset("asset-readme")
|
|
|
|
assert asset.lifecycle == LifecycleState.DELETE_REQUESTED
|
|
assert asset.source_refs[0].path == "README.md"
|
|
assert [item.kind for item in reloaded.list_representations(asset_id=asset.id)] == [
|
|
RepresentationKind.SOURCE
|
|
]
|
|
assert [item.key for item in reloaded.list_metadata_records(asset.id)] == ["owner"]
|
|
assert [version.sequence for version in reloaded.list_versions(asset.id)] == [1, 2, 3]
|
|
assert [event.operation for event in reloaded.list_audit_events(target="asset:asset-readme")] == [
|
|
"asset.create",
|
|
"asset.metadata.add",
|
|
"asset.lifecycle.transition",
|
|
]
|
|
|
|
|
|
def test_sqlite_registry_persists_context_entities_relationships_and_idempotency(tmp_path: Path) -> None:
|
|
db_path = tmp_path / "registry.sqlite"
|
|
repo = SQLiteAssetRegistryRepository(db_path)
|
|
service = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
created = service.create_asset(
|
|
"Knowledge Policy",
|
|
Classification(asset_type="policy", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-policy",
|
|
idempotency_key="create-policy",
|
|
)
|
|
entity = ContextEntity(
|
|
entity_type=ContextEntityType.PROJECT,
|
|
name="Kontextual Engine",
|
|
entity_id="entity-kontextual",
|
|
)
|
|
linked = service.link_asset_to_context_entity(
|
|
created.asset.id,
|
|
entity,
|
|
"about_project",
|
|
context,
|
|
)
|
|
|
|
reloaded = SQLiteAssetRegistryRepository(db_path)
|
|
|
|
assert reloaded.get_idempotency_record("create-policy").result_refs["asset_id"] == "asset-policy"
|
|
assert reloaded.list_context_entities()[0].entity_id == "entity-kontextual"
|
|
assert reloaded.list_relationships(source_id="asset-policy")[0].relationship_id == linked.relationship.relationship_id
|
|
assert reloaded.list_versions("asset-policy")[-1].relationship_delta["added"]["target_kind"] == "context_entity"
|
|
assert [event.operation for event in reloaded.list_audit_events(target="asset:asset-policy")] == [
|
|
"asset.create",
|
|
"asset.relationship.add",
|
|
]
|
|
|
|
|
|
def test_sqlite_registry_persists_metadata_schemas_and_assignments(tmp_path: Path) -> None:
|
|
db_path = tmp_path / "registry.sqlite"
|
|
repo = SQLiteAssetRegistryRepository(db_path)
|
|
service = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
schema = MetadataSchema(
|
|
schema_id="schema-review-v1",
|
|
name="Review Metadata",
|
|
allow_unknown=False,
|
|
fields=(
|
|
MetadataFieldDefinition("reviewer", MetadataValueType.STRING, required=True, require_confirmed=True),
|
|
MetadataFieldDefinition("score", MetadataValueType.NUMBER, min_value=0, max_value=1),
|
|
),
|
|
)
|
|
|
|
service.register_metadata_schema(schema, context)
|
|
service.assign_metadata_schema(
|
|
MetadataSchemaAssignment(
|
|
assignment_id="assignment-review-documents",
|
|
schema_id=schema.schema_id,
|
|
asset_types=("review",),
|
|
),
|
|
context,
|
|
)
|
|
|
|
reloaded_service = AssetRegistryService(SQLiteAssetRegistryRepository(db_path))
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
reloaded_service.create_asset(
|
|
"Review",
|
|
Classification(asset_type="review", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-review-invalid",
|
|
metadata_records=[
|
|
MetadataRecord("reviewer", "Ada", confirmed=False),
|
|
MetadataRecord("score", 1.7),
|
|
],
|
|
)
|
|
|
|
assert {issue["code"] for issue in exc_info.value.details["issues"]} == {
|
|
"metadata.confirmation_required",
|
|
"metadata.value_too_large",
|
|
}
|
|
|
|
created = reloaded_service.create_asset(
|
|
"Review",
|
|
Classification(asset_type="review", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-review",
|
|
metadata_records=[
|
|
MetadataRecord("reviewer", "Ada", confirmed=True),
|
|
MetadataRecord("score", 0.92),
|
|
],
|
|
)
|
|
|
|
reloaded_repo = SQLiteAssetRegistryRepository(db_path)
|
|
assert created.asset.id == "asset-review"
|
|
assert reloaded_repo.get_metadata_schema("schema-review-v1").name == "Review Metadata"
|
|
assert reloaded_repo.get_metadata_schema_assignment("assignment-review-documents").schema_id == "schema-review-v1"
|
|
|
|
|
|
def test_sqlite_registry_filters_assets_after_reload(tmp_path: Path) -> None:
|
|
db_path = tmp_path / "registry.sqlite"
|
|
repo = SQLiteAssetRegistryRepository(db_path)
|
|
service = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
|
|
service.create_asset(
|
|
"Public Guide",
|
|
Classification(
|
|
asset_type="guide",
|
|
sensitivity=Sensitivity.PUBLIC,
|
|
topics=("markdown", "proxy"),
|
|
owner="Docs",
|
|
review_state="approved",
|
|
),
|
|
context,
|
|
asset_id="asset-guide",
|
|
metadata_records=[MetadataRecord("channel", "public", confirmed=True)],
|
|
)
|
|
service.create_asset(
|
|
"Internal Guide",
|
|
Classification(
|
|
asset_type="guide",
|
|
sensitivity=Sensitivity.INTERNAL,
|
|
topics=("markdown",),
|
|
owner="Docs",
|
|
review_state="draft",
|
|
),
|
|
context,
|
|
asset_id="asset-internal-guide",
|
|
metadata_records=[MetadataRecord("channel", "public", confirmed=False)],
|
|
)
|
|
|
|
reloaded = AssetRegistryService(SQLiteAssetRegistryRepository(db_path))
|
|
|
|
assert [asset.id for asset in reloaded.list_assets(asset_type="guide", owner="Docs")] == [
|
|
"asset-internal-guide",
|
|
"asset-guide",
|
|
]
|
|
assert [asset.id for asset in reloaded.list_assets(topic="proxy")] == ["asset-guide"]
|
|
assert [
|
|
asset.id
|
|
for asset in reloaded.list_assets(
|
|
metadata_filters={"channel": "public"},
|
|
confirmed_metadata_only=True,
|
|
)
|
|
] == ["asset-guide"]
|
|
|
|
|
|
def test_sqlite_registry_persists_metadata_batch_partial_audit_after_reload(tmp_path: Path) -> None:
|
|
db_path = tmp_path / "registry.sqlite"
|
|
repo = SQLiteAssetRegistryRepository(db_path)
|
|
schema = MetadataSchema(
|
|
schema_id="schema-batch-ticket-v1",
|
|
name="Batch Ticket Metadata",
|
|
asset_types=("ticket",),
|
|
allow_unknown=False,
|
|
fields=(
|
|
MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True),
|
|
MetadataFieldDefinition("severity", MetadataValueType.INTEGER, allow_multiple=True, min_value=1, max_value=5),
|
|
),
|
|
)
|
|
service = AssetRegistryService(repo, metadata_schemas=[schema])
|
|
context = operation_context()
|
|
created = service.create_asset(
|
|
"Batch Ticket",
|
|
Classification(asset_type="ticket", sensitivity=Sensitivity.INTERNAL),
|
|
context,
|
|
asset_id="asset-batch-ticket",
|
|
metadata_records=[MetadataRecord("owner", "Operations", confirmed=True)],
|
|
)
|
|
|
|
result = service.add_metadata_records_batch(
|
|
created.asset.id,
|
|
(
|
|
MetadataRecord("severity", 4, record_id="meta-severity-ok"),
|
|
MetadataRecord("severity", 9, record_id="meta-severity-too-large"),
|
|
),
|
|
context,
|
|
expected_current_version_id=created.version.version_id,
|
|
)
|
|
reloaded = SQLiteAssetRegistryRepository(db_path)
|
|
|
|
assert result.outcome == "partial"
|
|
assert result.audit_event_id is not None
|
|
assert [record.key for record in reloaded.list_metadata_records(created.asset.id)] == [
|
|
"owner",
|
|
"severity",
|
|
]
|
|
assert reloaded.list_metadata_records(created.asset.id)[1].record_id == "meta-severity-ok"
|
|
assert [version.sequence for version in reloaded.list_versions(created.asset.id)] == [1, 2]
|
|
assert reloaded.get_audit_event(result.audit_event_id).outcome == AuditOutcome.PARTIAL
|
|
assert reloaded.get_audit_event(result.audit_event_id).details["failed_item_ids"] == [
|
|
"meta-severity-too-large"
|
|
]
|
|
|
|
|
|
def test_sqlite_registry_enforces_representation_asset_reference(tmp_path: Path) -> None:
|
|
repo = SQLiteAssetRegistryRepository(tmp_path / "registry.sqlite")
|
|
representation = AssetRepresentation.from_content(
|
|
"missing-asset",
|
|
RepresentationKind.NORMALIZED,
|
|
"text/plain",
|
|
"normalized",
|
|
)
|
|
|
|
with pytest.raises(ValidationError, match="unknown asset"):
|
|
repo.save_representation(representation)
|
|
|
|
|
|
def test_sqlite_registry_enforces_durable_reference_integrity(tmp_path: Path) -> None:
|
|
repo = SQLiteAssetRegistryRepository(tmp_path / "registry.sqlite")
|
|
|
|
with pytest.raises(ValidationError, match="unknown asset"):
|
|
repo.save_version(
|
|
AssetVersion(
|
|
asset_id="asset-missing",
|
|
sequence=1,
|
|
change_type=VersionChangeType.CREATED,
|
|
)
|
|
)
|
|
|
|
with pytest.raises(ValidationError, match="unknown actor"):
|
|
repo.save_audit_event(
|
|
AuditEvent(
|
|
operation="asset.create",
|
|
target="asset:asset-missing",
|
|
outcome=AuditOutcome.SUCCESS,
|
|
actor_id="actor-missing",
|
|
correlation_id="corr-missing",
|
|
)
|
|
)
|
|
|
|
with pytest.raises(ValidationError, match="unknown actor"):
|
|
repo.save_ingestion_job(
|
|
IngestionJob.create(
|
|
input={"connector": "local_file", "source_uri": "missing.txt"},
|
|
actor_id="actor-missing",
|
|
correlation_id="corr-missing",
|
|
)
|
|
)
|
|
|
|
|
|
def operation_context() -> OperationContext:
|
|
actor = Actor.create(
|
|
ActorType.HUMAN,
|
|
actor_id="user-test",
|
|
display_name="Test User",
|
|
groups=["engineering"],
|
|
)
|
|
return OperationContext.create(actor, correlation_id="corr-test")
|
|
|
|
|
|
class DenyLifecyclePolicy:
|
|
def authorize(
|
|
self,
|
|
context: OperationContext,
|
|
action: str,
|
|
resource: str,
|
|
*,
|
|
resource_metadata: dict[str, str] | None = None,
|
|
) -> PolicyDecision:
|
|
if action == "asset.lifecycle.transition":
|
|
return PolicyDecision.fail_closed(
|
|
context.actor.id,
|
|
action,
|
|
resource,
|
|
reason="lifecycle transitions require review",
|
|
context={"resource_metadata": resource_metadata or {}},
|
|
)
|
|
return PolicyDecision.allow(context.actor.id, action, resource)
|