diff --git a/docs/asset-registry-implementation.md b/docs/asset-registry-implementation.md index 10746d8..065cfdb 100644 --- a/docs/asset-registry-implementation.md +++ b/docs/asset-registry-implementation.md @@ -46,6 +46,11 @@ and SQLite repositories are adapters behind those ports. transition, and denied mutations. - Asset version records for create, content/representation changes, metadata changes, and lifecycle changes. +- Optimistic `expected_current_version_id` conflict checks on stale-sensitive + asset mutations. +- Append-only asset restore operations that create new auditable versions. +- Asset supersession operations that create `superseded_by` relationships, + retire the source asset by default, and record a supersession version. - Context entity persistence. - Relationship persistence for asset-to-asset and asset-to-context-entity links. @@ -79,8 +84,7 @@ idempotency key. ## Not Yet Implemented - Policy assignment storage and enterprise policy adapters. -- Conflict detection beyond version-sequence uniqueness. -- Restore and supersession service operations. +- Conflict detection beyond service-level optimistic version guards. - Batch partial-failure envelopes. These remain in scope for later `KONT-WP-0005` tasks or adjacent workplans. @@ -103,3 +107,5 @@ These remain in scope for later `KONT-WP-0005` tasks or adjacent workplans. - persistent metadata schema registry and assignment reload behavior, - classification and metadata-record asset filtering across memory and SQLite repositories. +- optimistic version conflict checks on asset mutations, +- restore and supersession as append-only versioned operations. diff --git a/src/kontextual_engine/services/asset_service.py b/src/kontextual_engine/services/asset_service.py index 5f8285f..7aeabdd 100644 --- a/src/kontextual_engine/services/asset_service.py +++ b/src/kontextual_engine/services/asset_service.py @@ -148,8 +148,15 @@ class AssetRegistryService: asset_id: str, record: MetadataRecord, context: OperationContext, + *, + expected_current_version_id: str | None = None, ) -> AssetChangeResult: asset = self.repository.get_asset(asset_id) + self._assert_expected_current_version( + asset, + expected_current_version_id, + operation="asset.metadata.add", + ) decision = self._authorize(context, "asset.metadata.add", f"asset:{asset.id}") next_sequence = self._next_sequence(asset.id) self._validate_metadata_records( @@ -235,8 +242,15 @@ class AssetRegistryService: asset_id: str, representation: AssetRepresentation, context: OperationContext, + *, + expected_current_version_id: str | None = None, ) -> AssetChangeResult: asset = self.repository.get_asset(asset_id) + self._assert_expected_current_version( + asset, + expected_current_version_id, + operation="asset.representation.add", + ) decision = self._authorize( context, "asset.representation.add", @@ -273,8 +287,15 @@ class AssetRegistryService: asset_id: str, lifecycle: LifecycleState, context: OperationContext, + *, + expected_current_version_id: str | None = None, ) -> AssetChangeResult: asset = self.repository.get_asset(asset_id) + self._assert_expected_current_version( + asset, + expected_current_version_id, + operation="asset.lifecycle.transition", + ) decision = self._authorize( context, "asset.lifecycle.transition", @@ -304,8 +325,162 @@ class AssetRegistryService: ) return AssetChangeResult(updated, version, event, decision) - def request_delete(self, asset_id: str, context: OperationContext) -> AssetChangeResult: - return self.transition_lifecycle(asset_id, LifecycleState.DELETE_REQUESTED, context) + def request_delete( + self, + asset_id: str, + context: OperationContext, + *, + expected_current_version_id: str | None = None, + ) -> AssetChangeResult: + return self.transition_lifecycle( + asset_id, + LifecycleState.DELETE_REQUESTED, + context, + expected_current_version_id=expected_current_version_id, + ) + + def restore_asset_version( + self, + asset_id: str, + target_version_id: str, + context: OperationContext, + *, + expected_current_version_id: str | None = None, + ) -> AssetChangeResult: + asset = self.repository.get_asset(asset_id) + self._assert_expected_current_version( + asset, + expected_current_version_id, + operation="asset.version.restore", + ) + target_version = self._version_by_id(asset.id, target_version_id) + decision = self._authorize( + context, + "asset.version.restore", + f"asset:{asset.id}", + resource_metadata={"target_version_id": target_version.version_id}, + ) + restored_lifecycle = ( + LifecycleState(target_version.lifecycle) + if target_version.lifecycle is not None + else asset.lifecycle + ) + updated = ( + asset.transition_lifecycle(restored_lifecycle) + if restored_lifecycle != asset.lifecycle + else asset + ) + version = AssetVersion( + asset_id=asset.id, + sequence=self._next_sequence(asset.id), + change_type=VersionChangeType.RESTORED, + representation_ids=target_version.representation_ids, + actor_id=context.actor.id, + parent_version_id=asset.current_version_id, + metadata_delta={ + "restored_from_version_id": target_version.version_id, + "restored_from_sequence": target_version.sequence, + "restored_from_change_type": target_version.change_type.value, + }, + lifecycle=updated.lifecycle.value, + ) + updated = updated.with_current_version(version.version_id) + self.repository.save_asset(updated) + self.repository.save_version(version) + event = self._audit( + "asset.version.restore", + f"asset:{asset.id}", + AuditOutcome.SUCCESS, + context, + decision, + details={ + "version_id": version.version_id, + "restored_from_version_id": target_version.version_id, + }, + ) + return AssetChangeResult(updated, version, event, decision) + + def supersede_asset( + self, + asset_id: str, + successor_asset_id: str, + context: OperationContext, + *, + reason: str | None = None, + retire: bool = True, + expected_current_version_id: str | None = None, + ) -> RelationshipChangeResult: + source_asset = self.repository.get_asset(asset_id) + successor_asset = self.repository.get_asset(successor_asset_id) + if source_asset.id == successor_asset.id: + raise ValidationError( + "Asset cannot supersede itself", + details={"asset_id": source_asset.id, "code": "asset.supersession.self"}, + ) + self._assert_expected_current_version( + source_asset, + expected_current_version_id, + operation="asset.supersede", + ) + decision = self._authorize( + context, + "asset.supersede", + f"asset:{source_asset.id}", + resource_metadata={"successor_asset_id": successor_asset.id}, + ) + provenance = {"operation": "asset.supersede"} + if reason: + provenance["reason"] = reason + relationship = CoreRelationship( + source_id=source_asset.id, + target_id=successor_asset.id, + predicate="superseded_by", + target_kind=RelationshipTargetKind.ASSET, + confidence=1.0, + actor_id=context.actor.id, + provenance=provenance, + ) + saved = self.repository.save_relationship(relationship) + updated_metadata = { + **source_asset.metadata, + "superseded_by": successor_asset.id, + } + metadata_delta: dict[str, Any] = {"superseded_by": successor_asset.id} + if reason: + updated_metadata["supersession_reason"] = reason + metadata_delta["supersession_reason"] = reason + updated_asset = ( + source_asset.transition_lifecycle(LifecycleState.RETIRED) + if retire + else source_asset + ) + updated_asset = replace(updated_asset, metadata=updated_metadata) + version = AssetVersion( + asset_id=source_asset.id, + sequence=self._next_sequence(source_asset.id), + change_type=VersionChangeType.SUPERSEDED, + actor_id=context.actor.id, + parent_version_id=source_asset.current_version_id, + metadata_delta=metadata_delta, + relationship_delta={"added": saved.to_dict()}, + lifecycle=updated_asset.lifecycle.value, + ) + updated_asset = updated_asset.with_current_version(version.version_id) + self.repository.save_asset(updated_asset) + self.repository.save_version(version) + event = self._audit( + "asset.supersede", + f"asset:{source_asset.id}", + AuditOutcome.SUCCESS, + context, + decision, + details={ + "version_id": version.version_id, + "relationship_id": saved.relationship_id, + "successor_asset_id": successor_asset.id, + }, + ) + return RelationshipChangeResult(saved, version, event, decision) def get_asset(self, asset_id: str) -> KnowledgeAsset: return self.repository.get_asset(asset_id) @@ -360,6 +535,7 @@ class AssetRegistryService: *, confidence: float | None = None, provenance: dict[str, str] | None = None, + expected_current_version_id: str | None = None, ) -> RelationshipChangeResult: relationship = CoreRelationship( source_id=source_asset_id, @@ -370,7 +546,11 @@ class AssetRegistryService: actor_id=context.actor.id, provenance=dict(provenance or {}), ) - return self._save_relationship(relationship, context) + return self._save_relationship( + relationship, + context, + expected_current_version_id=expected_current_version_id, + ) def link_asset_to_context_entity( self, @@ -381,6 +561,7 @@ class AssetRegistryService: *, confidence: float | None = None, provenance: dict[str, str] | None = None, + expected_current_version_id: str | None = None, ) -> RelationshipChangeResult: self.repository.save_context_entity(entity) relationship = CoreRelationship( @@ -392,14 +573,25 @@ class AssetRegistryService: actor_id=context.actor.id, provenance=dict(provenance or {}), ) - return self._save_relationship(relationship, context) + return self._save_relationship( + relationship, + context, + expected_current_version_id=expected_current_version_id, + ) def _save_relationship( self, relationship: CoreRelationship, context: OperationContext, + *, + expected_current_version_id: str | None = None, ) -> RelationshipChangeResult: source_asset = self.repository.get_asset(relationship.source_id) + self._assert_expected_current_version( + source_asset, + expected_current_version_id, + operation="asset.relationship.add", + ) decision = self._authorize( context, "asset.relationship.add", @@ -437,6 +629,27 @@ class AssetRegistryService: ) return RelationshipChangeResult(saved, version, event, decision) + def _assert_expected_current_version( + self, + asset: KnowledgeAsset, + expected_current_version_id: str | None, + *, + operation: str, + ) -> None: + if expected_current_version_id is None: + return + if asset.current_version_id != expected_current_version_id: + raise ValidationError( + "Asset version conflict", + details={ + "code": "asset.version_conflict", + "operation": operation, + "asset_id": asset.id, + "expected_current_version_id": expected_current_version_id, + "current_version_id": asset.current_version_id, + }, + ) + def _authorize( self, context: OperationContext, diff --git a/tests/test_asset_registry.py b/tests/test_asset_registry.py index 5495a88..5784698 100644 --- a/tests/test_asset_registry.py +++ b/tests/test_asset_registry.py @@ -149,6 +149,124 @@ def test_asset_registry_create_is_idempotent_for_same_key_and_payload() -> None: ) +def test_asset_registry_rejects_stale_expected_current_version_for_mutations() -> None: + repo = InMemoryAssetRegistryRepository() + service = AssetRegistryService(repo) + context = operation_context() + created = service.create_asset( + "Conflict Guard", + Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-conflict", + ) + updated = service.add_metadata_record( + created.asset.id, + MetadataRecord("owner", "Platform Knowledge", confirmed=True), + context, + expected_current_version_id=created.version.version_id, + ) + + with pytest.raises(ValidationError) as exc_info: + service.transition_lifecycle( + created.asset.id, + LifecycleState.RETIRED, + context, + expected_current_version_id=created.version.version_id, + ) + + assert exc_info.value.details["code"] == "asset.version_conflict" + assert exc_info.value.details["expected_current_version_id"] == created.version.version_id + assert exc_info.value.details["current_version_id"] == updated.version.version_id + assert repo.get_asset(created.asset.id).lifecycle == LifecycleState.ACTIVE + assert [version.sequence for version in repo.list_versions(created.asset.id)] == [1, 2] + + +def test_asset_registry_restore_creates_new_version_without_erasing_history() -> None: + repo = InMemoryAssetRegistryRepository() + service = AssetRegistryService(repo) + context = operation_context() + created = service.create_asset( + "Restorable Asset", + Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-restorable", + ) + retired = service.transition_lifecycle( + created.asset.id, + LifecycleState.RETIRED, + context, + expected_current_version_id=created.version.version_id, + ) + + restored = service.restore_asset_version( + created.asset.id, + created.version.version_id, + context, + expected_current_version_id=retired.version.version_id, + ) + + versions = repo.list_versions(created.asset.id) + + assert restored.version.change_type.value == "restored" + assert restored.version.parent_version_id == retired.version.version_id + assert restored.version.metadata_delta["restored_from_version_id"] == created.version.version_id + assert restored.version.metadata_delta["restored_from_sequence"] == 1 + assert restored.asset.lifecycle == LifecycleState.ACTIVE + assert repo.get_asset(created.asset.id).current_version_id == restored.version.version_id + assert [version.sequence for version in versions] == [1, 2, 3] + assert [version.change_type.value for version in versions] == [ + "created", + "lifecycle_changed", + "restored", + ] + assert [event.operation for event in repo.list_audit_events(target="asset:asset-restorable")] == [ + "asset.create", + "asset.lifecycle.transition", + "asset.version.restore", + ] + + +def test_asset_registry_supersede_creates_relationship_version_and_audit() -> None: + repo = InMemoryAssetRegistryRepository() + service = AssetRegistryService(repo) + context = operation_context() + source = service.create_asset( + "Old Guide", + Classification(asset_type="guide", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-old-guide", + ) + successor = service.create_asset( + "New Guide", + Classification(asset_type="guide", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-new-guide", + ) + + superseded = service.supersede_asset( + source.asset.id, + successor.asset.id, + context, + reason="New canonical guide", + expected_current_version_id=source.version.version_id, + ) + + source_asset = repo.get_asset(source.asset.id) + relationships = repo.list_relationships(source_id=source.asset.id) + + assert superseded.version.change_type.value == "superseded" + assert superseded.relationship.predicate == "superseded_by" + assert superseded.relationship.target_id == successor.asset.id + assert relationships == [superseded.relationship] + assert source_asset.lifecycle == LifecycleState.RETIRED + assert source_asset.metadata["superseded_by"] == successor.asset.id + assert source_asset.metadata["supersession_reason"] == "New canonical guide" + assert superseded.version.relationship_delta["added"]["relationship_id"] == superseded.relationship.relationship_id + assert superseded.version.metadata_delta["superseded_by"] == successor.asset.id + assert repo.get_asset(successor.asset.id).current_version_id == successor.version.version_id + assert repo.list_audit_events(target="asset:asset-old-guide")[-1].operation == "asset.supersede" + + def test_asset_registry_relationships_create_versions_and_audit() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) diff --git a/workplans/KONT-WP-0005-asset-registry-governance-state.md b/workplans/KONT-WP-0005-asset-registry-governance-state.md index edea8e9..666d44d 100644 --- a/workplans/KONT-WP-0005-asset-registry-governance-state.md +++ b/workplans/KONT-WP-0005-asset-registry-governance-state.md @@ -67,9 +67,8 @@ representations, metadata records, context entities, asset/context relationships, idempotent asset creation, and custom metadata schema validation before registry writes. It now also includes a durable metadata schema registry and assignment rules for policy-selected validation. Remaining -work in this workplan is concentrated on restore/supersession operations, -conflict semantics beyond sequence/idempotency checks, and batch -partial-failure envelopes. +work in this workplan is concentrated on broader audit/error coverage, durable +policy assignment details, and batch partial-failure envelopes. ## G5.1 - Implement stable asset identity and source references @@ -193,7 +192,7 @@ Acceptance: ```task id: KONT-WP-0005-T007 -status: in_progress +status: done priority: medium state_hub_task_id: "5288b136-05c1-449c-9215-f8b34db8b274" ```