Structured OperationFailure, BatchItemResult, and BatchOperationResult envelopes

This commit is contained in:
2026-05-06 10:26:37 +02:00
parent df3b43d311
commit 48dffedc09
9 changed files with 603 additions and 62 deletions

View File

@@ -1,6 +1,10 @@
"""Application services for the engine."""
from .asset_service import AssetChangeResult, AssetRegistryService, RelationshipChangeResult
from .asset_service import (
AssetChangeResult,
AssetRegistryService,
RelationshipChangeResult,
)
from .ingestion_service import AssetIngestionResult, AssetIngestionService
__all__ = [

View File

@@ -27,7 +27,13 @@ from kontextual_engine.core import (
SourceReference,
VersionChangeType,
)
from kontextual_engine.errors import AuthorizationError, ValidationError
from kontextual_engine.errors import (
AuthorizationError,
BatchItemResult,
BatchOperationResult,
KontextualError,
ValidationError,
)
from kontextual_engine.ports import AllowAllPolicyGateway, AssetRegistryRepository, PolicyGateway
@@ -158,6 +164,108 @@ class AssetRegistryService:
operation="asset.metadata.add",
)
decision = self._authorize(context, "asset.metadata.add", f"asset:{asset.id}")
return self._save_metadata_record_change(
asset,
record,
context,
decision,
operation="asset.metadata.add",
)
def add_metadata_records_batch(
self,
asset_id: str,
records: list[MetadataRecord] | tuple[MetadataRecord, ...],
context: OperationContext,
*,
expected_current_version_id: str | None = None,
) -> BatchOperationResult:
operation = "asset.metadata.batch_add"
asset = self.repository.get_asset(asset_id)
self._assert_expected_current_version(
asset,
expected_current_version_id,
operation=operation,
)
decision = self._authorize(
context,
operation,
f"asset:{asset.id}",
resource_metadata={"count": str(len(records))},
)
item_results: list[BatchItemResult] = []
for record in records:
item_operation = "asset.metadata.add"
try:
asset = self.repository.get_asset(asset.id)
result = self._save_metadata_record_change(
asset,
record,
context,
decision,
operation=item_operation,
)
except KontextualError as exc:
item_results.append(
BatchItemResult.failed(
item_id=record.record_id,
operation=item_operation,
error=exc.to_operation_failure(
operation=item_operation,
correlation_id=context.correlation_id,
remediation=_remediation_for_error(exc),
),
)
)
continue
item_results.append(
BatchItemResult.succeeded(
item_id=record.record_id,
operation=item_operation,
result_ref={
"asset_id": result.asset.id,
"record_id": record.record_id,
"version_id": result.version.version_id,
"audit_event_id": result.audit_event.event_id,
},
)
)
batch_result = BatchOperationResult(
operation=operation,
correlation_id=context.correlation_id,
items=tuple(item_results),
)
audit_event = self._audit(
operation,
f"asset:{asset.id}",
AuditOutcome(batch_result.outcome),
context,
decision,
details={
"total": batch_result.total,
"succeeded": batch_result.succeeded,
"failed": batch_result.failed,
"failed_item_ids": [
item.item_id for item in batch_result.items if not item.success
],
},
)
return BatchOperationResult(
operation=batch_result.operation,
correlation_id=batch_result.correlation_id,
items=batch_result.items,
audit_event_id=audit_event.event_id,
)
def _save_metadata_record_change(
self,
asset: KnowledgeAsset,
record: MetadataRecord,
context: OperationContext,
decision: PolicyDecision,
*,
operation: str,
) -> AssetChangeResult:
next_sequence = self._next_sequence(asset.id)
self._validate_metadata_records(
asset.classification,
@@ -177,7 +285,7 @@ class AssetRegistryService:
self.repository.save_asset(asset)
self.repository.save_version(version)
event = self._audit(
"asset.metadata.add",
operation,
f"asset:{asset.id}",
AuditOutcome.SUCCESS,
context,
@@ -686,7 +794,7 @@ class AssetRegistryService:
context: OperationContext,
policy_decision: PolicyDecision,
*,
details: dict[str, str] | None = None,
details: dict[str, Any] | None = None,
) -> AuditEvent:
event = AuditEvent.from_context(
operation,
@@ -761,3 +869,15 @@ class AssetRegistryService:
"Idempotency record references an unknown asset version",
details={"asset_id": asset_id, "version_id": version_id},
)
def _remediation_for_error(error: KontextualError) -> str | None:
if isinstance(error, ValidationError):
if error.details.get("code") == "asset.version_conflict":
return "Reload the asset, review the current version, and retry with the latest expected_current_version_id."
if error.details.get("issues"):
return "Correct the submitted fields so they satisfy the active metadata schema, then retry the failed item."
return "Correct the submitted value and retry the failed item."
if isinstance(error, AuthorizationError):
return "Request policy approval or rerun with an actor that is authorized for this operation."
return None

View File

@@ -66,6 +66,7 @@ class AssetIngestionService:
classification: Classification | None = None,
idempotency_key: str | None = None,
) -> AssetIngestionResult:
self.repository.save_actor(context.actor)
connector = self._connector("local_file")
job = IngestionJob.create(
input={"connector": connector.name, "source_uri": str(path), "mode": "file"},
@@ -97,6 +98,7 @@ class AssetIngestionService:
recursive: bool = True,
classification: Classification | None = None,
) -> IngestionJob:
self.repository.save_actor(context.actor)
connector = self._directory_connector("local_file")
job = IngestionJob.create(
input={