Versioning and restoration of assets

This commit is contained in:
2026-05-06 09:00:07 +02:00
parent b8d087538e
commit df3b43d311
4 changed files with 346 additions and 10 deletions

View File

@@ -148,8 +148,15 @@ class AssetRegistryService:
asset_id: str,
record: MetadataRecord,
context: OperationContext,
*,
expected_current_version_id: str | None = None,
) -> AssetChangeResult:
asset = self.repository.get_asset(asset_id)
self._assert_expected_current_version(
asset,
expected_current_version_id,
operation="asset.metadata.add",
)
decision = self._authorize(context, "asset.metadata.add", f"asset:{asset.id}")
next_sequence = self._next_sequence(asset.id)
self._validate_metadata_records(
@@ -235,8 +242,15 @@ class AssetRegistryService:
asset_id: str,
representation: AssetRepresentation,
context: OperationContext,
*,
expected_current_version_id: str | None = None,
) -> AssetChangeResult:
asset = self.repository.get_asset(asset_id)
self._assert_expected_current_version(
asset,
expected_current_version_id,
operation="asset.representation.add",
)
decision = self._authorize(
context,
"asset.representation.add",
@@ -273,8 +287,15 @@ class AssetRegistryService:
asset_id: str,
lifecycle: LifecycleState,
context: OperationContext,
*,
expected_current_version_id: str | None = None,
) -> AssetChangeResult:
asset = self.repository.get_asset(asset_id)
self._assert_expected_current_version(
asset,
expected_current_version_id,
operation="asset.lifecycle.transition",
)
decision = self._authorize(
context,
"asset.lifecycle.transition",
@@ -304,8 +325,162 @@ class AssetRegistryService:
)
return AssetChangeResult(updated, version, event, decision)
def request_delete(self, asset_id: str, context: OperationContext) -> AssetChangeResult:
return self.transition_lifecycle(asset_id, LifecycleState.DELETE_REQUESTED, context)
def request_delete(
self,
asset_id: str,
context: OperationContext,
*,
expected_current_version_id: str | None = None,
) -> AssetChangeResult:
return self.transition_lifecycle(
asset_id,
LifecycleState.DELETE_REQUESTED,
context,
expected_current_version_id=expected_current_version_id,
)
def restore_asset_version(
self,
asset_id: str,
target_version_id: str,
context: OperationContext,
*,
expected_current_version_id: str | None = None,
) -> AssetChangeResult:
asset = self.repository.get_asset(asset_id)
self._assert_expected_current_version(
asset,
expected_current_version_id,
operation="asset.version.restore",
)
target_version = self._version_by_id(asset.id, target_version_id)
decision = self._authorize(
context,
"asset.version.restore",
f"asset:{asset.id}",
resource_metadata={"target_version_id": target_version.version_id},
)
restored_lifecycle = (
LifecycleState(target_version.lifecycle)
if target_version.lifecycle is not None
else asset.lifecycle
)
updated = (
asset.transition_lifecycle(restored_lifecycle)
if restored_lifecycle != asset.lifecycle
else asset
)
version = AssetVersion(
asset_id=asset.id,
sequence=self._next_sequence(asset.id),
change_type=VersionChangeType.RESTORED,
representation_ids=target_version.representation_ids,
actor_id=context.actor.id,
parent_version_id=asset.current_version_id,
metadata_delta={
"restored_from_version_id": target_version.version_id,
"restored_from_sequence": target_version.sequence,
"restored_from_change_type": target_version.change_type.value,
},
lifecycle=updated.lifecycle.value,
)
updated = updated.with_current_version(version.version_id)
self.repository.save_asset(updated)
self.repository.save_version(version)
event = self._audit(
"asset.version.restore",
f"asset:{asset.id}",
AuditOutcome.SUCCESS,
context,
decision,
details={
"version_id": version.version_id,
"restored_from_version_id": target_version.version_id,
},
)
return AssetChangeResult(updated, version, event, decision)
def supersede_asset(
self,
asset_id: str,
successor_asset_id: str,
context: OperationContext,
*,
reason: str | None = None,
retire: bool = True,
expected_current_version_id: str | None = None,
) -> RelationshipChangeResult:
source_asset = self.repository.get_asset(asset_id)
successor_asset = self.repository.get_asset(successor_asset_id)
if source_asset.id == successor_asset.id:
raise ValidationError(
"Asset cannot supersede itself",
details={"asset_id": source_asset.id, "code": "asset.supersession.self"},
)
self._assert_expected_current_version(
source_asset,
expected_current_version_id,
operation="asset.supersede",
)
decision = self._authorize(
context,
"asset.supersede",
f"asset:{source_asset.id}",
resource_metadata={"successor_asset_id": successor_asset.id},
)
provenance = {"operation": "asset.supersede"}
if reason:
provenance["reason"] = reason
relationship = CoreRelationship(
source_id=source_asset.id,
target_id=successor_asset.id,
predicate="superseded_by",
target_kind=RelationshipTargetKind.ASSET,
confidence=1.0,
actor_id=context.actor.id,
provenance=provenance,
)
saved = self.repository.save_relationship(relationship)
updated_metadata = {
**source_asset.metadata,
"superseded_by": successor_asset.id,
}
metadata_delta: dict[str, Any] = {"superseded_by": successor_asset.id}
if reason:
updated_metadata["supersession_reason"] = reason
metadata_delta["supersession_reason"] = reason
updated_asset = (
source_asset.transition_lifecycle(LifecycleState.RETIRED)
if retire
else source_asset
)
updated_asset = replace(updated_asset, metadata=updated_metadata)
version = AssetVersion(
asset_id=source_asset.id,
sequence=self._next_sequence(source_asset.id),
change_type=VersionChangeType.SUPERSEDED,
actor_id=context.actor.id,
parent_version_id=source_asset.current_version_id,
metadata_delta=metadata_delta,
relationship_delta={"added": saved.to_dict()},
lifecycle=updated_asset.lifecycle.value,
)
updated_asset = updated_asset.with_current_version(version.version_id)
self.repository.save_asset(updated_asset)
self.repository.save_version(version)
event = self._audit(
"asset.supersede",
f"asset:{source_asset.id}",
AuditOutcome.SUCCESS,
context,
decision,
details={
"version_id": version.version_id,
"relationship_id": saved.relationship_id,
"successor_asset_id": successor_asset.id,
},
)
return RelationshipChangeResult(saved, version, event, decision)
def get_asset(self, asset_id: str) -> KnowledgeAsset:
return self.repository.get_asset(asset_id)
@@ -360,6 +535,7 @@ class AssetRegistryService:
*,
confidence: float | None = None,
provenance: dict[str, str] | None = None,
expected_current_version_id: str | None = None,
) -> RelationshipChangeResult:
relationship = CoreRelationship(
source_id=source_asset_id,
@@ -370,7 +546,11 @@ class AssetRegistryService:
actor_id=context.actor.id,
provenance=dict(provenance or {}),
)
return self._save_relationship(relationship, context)
return self._save_relationship(
relationship,
context,
expected_current_version_id=expected_current_version_id,
)
def link_asset_to_context_entity(
self,
@@ -381,6 +561,7 @@ class AssetRegistryService:
*,
confidence: float | None = None,
provenance: dict[str, str] | None = None,
expected_current_version_id: str | None = None,
) -> RelationshipChangeResult:
self.repository.save_context_entity(entity)
relationship = CoreRelationship(
@@ -392,14 +573,25 @@ class AssetRegistryService:
actor_id=context.actor.id,
provenance=dict(provenance or {}),
)
return self._save_relationship(relationship, context)
return self._save_relationship(
relationship,
context,
expected_current_version_id=expected_current_version_id,
)
def _save_relationship(
self,
relationship: CoreRelationship,
context: OperationContext,
*,
expected_current_version_id: str | None = None,
) -> RelationshipChangeResult:
source_asset = self.repository.get_asset(relationship.source_id)
self._assert_expected_current_version(
source_asset,
expected_current_version_id,
operation="asset.relationship.add",
)
decision = self._authorize(
context,
"asset.relationship.add",
@@ -437,6 +629,27 @@ class AssetRegistryService:
)
return RelationshipChangeResult(saved, version, event, decision)
def _assert_expected_current_version(
self,
asset: KnowledgeAsset,
expected_current_version_id: str | None,
*,
operation: str,
) -> None:
if expected_current_version_id is None:
return
if asset.current_version_id != expected_current_version_id:
raise ValidationError(
"Asset version conflict",
details={
"code": "asset.version_conflict",
"operation": operation,
"asset_id": asset.id,
"expected_current_version_id": expected_current_version_id,
"current_version_id": asset.current_version_id,
},
)
def _authorize(
self,
context: OperationContext,