diff --git a/docs/asset-registry-implementation.md b/docs/asset-registry-implementation.md index 065cfdb..0a278e4 100644 --- a/docs/asset-registry-implementation.md +++ b/docs/asset-registry-implementation.md @@ -44,6 +44,12 @@ and SQLite repositories are adapters behind those ports. - Fail-closed policy denial through `AuthorizationError`. - Audit events for create, metadata update, representation update, lifecycle transition, and denied mutations. +- SQLite actor references for audit events and ingestion jobs are enforced with + structured validation errors. +- Structured operation failures with code, message, operation, correlation ID, + details, and remediation hints where practical. +- Metadata batch updates with compact per-item success/failure envelopes and a + final `success`, `failed`, or `partial` batch audit event. - Asset version records for create, content/representation changes, metadata changes, and lifecycle changes. - Optimistic `expected_current_version_id` conflict checks on stale-sensitive @@ -61,6 +67,8 @@ and SQLite repositories are adapters behind those ports. - SQLite repository for local-first durable asset registry state. - SQLite foreign-key enforcement for representation and metadata asset references. +- SQLite durable reference checks for asset versions, audit actors, ingestion + job actors, metadata schema assignments, and relationship targets. ## Current SQLite Tables @@ -83,11 +91,17 @@ idempotency key. ## Not Yet Implemented -- Policy assignment storage and enterprise policy adapters. -- Conflict detection beyond service-level optimistic version guards. -- Batch partial-failure envelopes. +Enterprise policy adapters and richer policy-assignment language remain +adjacent enterprise-readiness work. The registry persists policy decisions in +audit payloads and policy references in metadata schema assignments, but policy +evaluation itself remains behind the `PolicyGateway` port. -These remain in scope for later `KONT-WP-0005` tasks or adjacent workplans. +Conflict detection is implemented through service-level optimistic version +guards. Broader multi-writer locking or transaction isolation semantics remain +backend-specific future work if concurrent production writers require it. + +These are intentionally left to adjacent enterprise, concurrency, or +production-backend workplans rather than this registry foundation slice. ## Test Coverage @@ -99,6 +113,8 @@ These remain in scope for later `KONT-WP-0005` tasks or adjacent workplans. - SQLite reload preserving asset lifecycle, representation, metadata, versions, and audit history, - SQLite referential integrity for representation asset references, +- SQLite durable reference integrity for versions, audit actors, and ingestion + job actors, - idempotent asset creation and conflicting idempotency-key reuse, - relationship creation with source-asset versioning and audit, - SQLite reload preserving context entities, relationships, and idempotency @@ -108,4 +124,7 @@ These remain in scope for later `KONT-WP-0005` tasks or adjacent workplans. - classification and metadata-record asset filtering across memory and SQLite repositories. - optimistic version conflict checks on asset mutations, -- restore and supersession as append-only versioned operations. +- restore and supersession as append-only versioned operations, +- metadata batch partial-failure envelopes with structured item diagnostics and + partial audit events, +- SQLite reload of metadata batch partial audit state. diff --git a/src/kontextual_engine/__init__.py b/src/kontextual_engine/__init__.py index 24d31e3..1ff305d 100644 --- a/src/kontextual_engine/__init__.py +++ b/src/kontextual_engine/__init__.py @@ -56,10 +56,13 @@ from .core import ( from .errors import ( AdapterUnavailableError, AuthorizationError, + BatchItemResult, + BatchOperationResult, Diagnostic, DuplicateResourceError, KontextualError, NotFoundError, + OperationFailure, ValidationError, ) from .ingestion import IngestionRequest, IngestionResult, IngestionService @@ -110,6 +113,8 @@ __all__ = [ "AuditEvent", "AuditOutcome", "AuthorizationError", + "BatchItemResult", + "BatchOperationResult", "Classification", "ConnectorCapability", "Collection", @@ -148,6 +153,7 @@ __all__ = [ "MetadataValueType", "NormalizedDocument", "NotFoundError", + "OperationFailure", "OperationRun", "OperationStage", "OperationContext", diff --git a/src/kontextual_engine/adapters/sqlite/asset_registry.py b/src/kontextual_engine/adapters/sqlite/asset_registry.py index 5eae2b1..1c07115 100644 --- a/src/kontextual_engine/adapters/sqlite/asset_registry.py +++ b/src/kontextual_engine/adapters/sqlite/asset_registry.py @@ -388,6 +388,11 @@ class SQLiteAssetRegistryRepository: ), ) except sqlite3.IntegrityError as exc: + if _is_foreign_key_error(exc): + raise ValidationError( + "Version references an unknown asset", + details={"asset_id": version.asset_id, "version_id": version.version_id}, + ) from exc raise ValidationError( "Version sequence already exists for asset", details={"asset_id": version.asset_id, "sequence": version.sequence}, @@ -404,29 +409,37 @@ class SQLiteAssetRegistryRepository: return [AssetVersion.from_dict(_loads(row["payload"])) for row in rows] def save_audit_event(self, event: AuditEvent) -> AuditEvent: - with self._connect() as conn: - conn.execute( - """ - insert into audit_events (id, target, actor_id, correlation_id, outcome, occurred_at, payload) - values (?, ?, ?, ?, ?, ?, ?) - on conflict(id) do update set - target=excluded.target, - actor_id=excluded.actor_id, - correlation_id=excluded.correlation_id, - outcome=excluded.outcome, - occurred_at=excluded.occurred_at, - payload=excluded.payload - """, - ( - event.event_id, - event.target, - event.actor_id, - event.correlation_id, - event.outcome.value, - event.occurred_at, - _json(event.to_dict()), - ), - ) + try: + with self._connect() as conn: + conn.execute( + """ + insert into audit_events (id, target, actor_id, correlation_id, outcome, occurred_at, payload) + values (?, ?, ?, ?, ?, ?, ?) + on conflict(id) do update set + target=excluded.target, + actor_id=excluded.actor_id, + correlation_id=excluded.correlation_id, + outcome=excluded.outcome, + occurred_at=excluded.occurred_at, + payload=excluded.payload + """, + ( + event.event_id, + event.target, + event.actor_id, + event.correlation_id, + event.outcome.value, + event.occurred_at, + _json(event.to_dict()), + ), + ) + except sqlite3.IntegrityError as exc: + if _is_foreign_key_error(exc): + raise ValidationError( + "Audit event references an unknown actor", + details={"actor_id": event.actor_id, "event_id": event.event_id}, + ) from exc + raise return event def get_audit_event(self, event_id: str) -> AuditEvent: @@ -482,28 +495,36 @@ class SQLiteAssetRegistryRepository: return IdempotencyRecord.from_dict(_loads(row["payload"])) def save_ingestion_job(self, job: IngestionJob) -> IngestionJob: - with self._connect() as conn: - conn.execute( - """ - insert into ingestion_jobs (id, status, actor_id, correlation_id, created_at, updated_at, payload) - values (?, ?, ?, ?, ?, ?, ?) - on conflict(id) do update set - status=excluded.status, - actor_id=excluded.actor_id, - correlation_id=excluded.correlation_id, - updated_at=excluded.updated_at, - payload=excluded.payload - """, - ( - job.job_id, - job.status.value, - job.actor_id, - job.correlation_id, - job.created_at, - job.updated_at, - _json(job.to_dict()), - ), - ) + try: + with self._connect() as conn: + conn.execute( + """ + insert into ingestion_jobs (id, status, actor_id, correlation_id, created_at, updated_at, payload) + values (?, ?, ?, ?, ?, ?, ?) + on conflict(id) do update set + status=excluded.status, + actor_id=excluded.actor_id, + correlation_id=excluded.correlation_id, + updated_at=excluded.updated_at, + payload=excluded.payload + """, + ( + job.job_id, + job.status.value, + job.actor_id, + job.correlation_id, + job.created_at, + job.updated_at, + _json(job.to_dict()), + ), + ) + except sqlite3.IntegrityError as exc: + if _is_foreign_key_error(exc): + raise ValidationError( + "Ingestion job references an unknown actor", + details={"actor_id": job.actor_id, "job_id": job.job_id}, + ) from exc + raise return job def get_ingestion_job(self, job_id: str) -> IngestionJob: @@ -613,7 +634,8 @@ class SQLiteAssetRegistryRepository: correlation_id text not null, created_at text not null, updated_at text not null, - payload text not null + payload text not null, + foreign key(actor_id) references actors(id) ); create index if not exists idx_assets_lifecycle on assets(lifecycle); create index if not exists idx_representations_asset on representations(asset_id); @@ -666,6 +688,10 @@ def _json(value: dict[str, Any]) -> str: return json.dumps(value, sort_keys=True, separators=(",", ":")) +def _is_foreign_key_error(exc: sqlite3.IntegrityError) -> bool: + return "FOREIGN KEY" in str(exc).upper() + + def _loads(value: str) -> dict[str, Any]: return json.loads(value) diff --git a/src/kontextual_engine/errors.py b/src/kontextual_engine/errors.py index 26e0f3b..04f0a1c 100644 --- a/src/kontextual_engine/errors.py +++ b/src/kontextual_engine/errors.py @@ -24,6 +24,130 @@ class Diagnostic: } +@dataclass(frozen=True) +class OperationFailure: + """Structured operation failure suitable for API and batch envelopes.""" + + code: str + message: str + operation: str + correlation_id: str + details: dict[str, Any] = field(default_factory=dict) + remediation: str | None = None + + def to_dict(self) -> dict[str, Any]: + data: dict[str, Any] = { + "code": self.code, + "message": self.message, + "operation": self.operation, + "correlation_id": self.correlation_id, + "details": dict(self.details), + } + if self.remediation: + data["remediation"] = self.remediation + return data + + +@dataclass(frozen=True) +class BatchItemResult: + """One item result inside a batch operation envelope.""" + + item_id: str + operation: str + success: bool + result_ref: dict[str, Any] = field(default_factory=dict) + error: OperationFailure | None = None + + @classmethod + def succeeded( + cls, + *, + item_id: str, + operation: str, + result_ref: dict[str, Any] | None = None, + ) -> "BatchItemResult": + return cls( + item_id=item_id, + operation=operation, + success=True, + result_ref=dict(result_ref or {}), + ) + + @classmethod + def failed( + cls, + *, + item_id: str, + operation: str, + error: OperationFailure, + ) -> "BatchItemResult": + return cls(item_id=item_id, operation=operation, success=False, error=error) + + def to_dict(self) -> dict[str, Any]: + data: dict[str, Any] = { + "item_id": self.item_id, + "operation": self.operation, + "success": self.success, + } + if self.result_ref: + data["result_ref"] = dict(self.result_ref) + if self.error: + data["error"] = self.error.to_dict() + return data + + +@dataclass(frozen=True) +class BatchOperationResult: + """Compact result envelope for batch operations with partial failures.""" + + operation: str + correlation_id: str + items: tuple[BatchItemResult, ...] = () + audit_event_id: str | None = None + + def __post_init__(self) -> None: + object.__setattr__(self, "items", tuple(self.items)) + + @property + def total(self) -> int: + return len(self.items) + + @property + def succeeded(self) -> int: + return sum(1 for item in self.items if item.success) + + @property + def failed(self) -> int: + return self.total - self.succeeded + + @property + def partial(self) -> bool: + return self.succeeded > 0 and self.failed > 0 + + @property + def outcome(self) -> str: + if self.partial: + return "partial" + if self.failed: + return "failed" + return "success" + + def to_dict(self) -> dict[str, Any]: + data: dict[str, Any] = { + "operation": self.operation, + "correlation_id": self.correlation_id, + "outcome": self.outcome, + "total": self.total, + "succeeded": self.succeeded, + "failed": self.failed, + "partial": self.partial, + "items": [item.to_dict() for item in self.items], + } + if self.audit_event_id: + data["audit_event_id"] = self.audit_event_id + return data + + class KontextualError(Exception): """Base class for explicit engine failures.""" @@ -41,6 +165,22 @@ class KontextualError(Exception): details=dict(self.details), ) + def to_operation_failure( + self, + *, + operation: str, + correlation_id: str, + remediation: str | None = None, + ) -> OperationFailure: + return OperationFailure( + code=str(self.details.get("code") or self.code), + message=str(self), + operation=operation, + correlation_id=correlation_id, + details=dict(self.details), + remediation=remediation, + ) + class NotFoundError(KontextualError): code = "kontextual.not_found" diff --git a/src/kontextual_engine/services/__init__.py b/src/kontextual_engine/services/__init__.py index 477e779..75eb8df 100644 --- a/src/kontextual_engine/services/__init__.py +++ b/src/kontextual_engine/services/__init__.py @@ -1,6 +1,10 @@ """Application services for the engine.""" -from .asset_service import AssetChangeResult, AssetRegistryService, RelationshipChangeResult +from .asset_service import ( + AssetChangeResult, + AssetRegistryService, + RelationshipChangeResult, +) from .ingestion_service import AssetIngestionResult, AssetIngestionService __all__ = [ diff --git a/src/kontextual_engine/services/asset_service.py b/src/kontextual_engine/services/asset_service.py index 7aeabdd..7c86436 100644 --- a/src/kontextual_engine/services/asset_service.py +++ b/src/kontextual_engine/services/asset_service.py @@ -27,7 +27,13 @@ from kontextual_engine.core import ( SourceReference, VersionChangeType, ) -from kontextual_engine.errors import AuthorizationError, ValidationError +from kontextual_engine.errors import ( + AuthorizationError, + BatchItemResult, + BatchOperationResult, + KontextualError, + ValidationError, +) from kontextual_engine.ports import AllowAllPolicyGateway, AssetRegistryRepository, PolicyGateway @@ -158,6 +164,108 @@ class AssetRegistryService: operation="asset.metadata.add", ) decision = self._authorize(context, "asset.metadata.add", f"asset:{asset.id}") + return self._save_metadata_record_change( + asset, + record, + context, + decision, + operation="asset.metadata.add", + ) + + def add_metadata_records_batch( + self, + asset_id: str, + records: list[MetadataRecord] | tuple[MetadataRecord, ...], + context: OperationContext, + *, + expected_current_version_id: str | None = None, + ) -> BatchOperationResult: + operation = "asset.metadata.batch_add" + asset = self.repository.get_asset(asset_id) + self._assert_expected_current_version( + asset, + expected_current_version_id, + operation=operation, + ) + decision = self._authorize( + context, + operation, + f"asset:{asset.id}", + resource_metadata={"count": str(len(records))}, + ) + item_results: list[BatchItemResult] = [] + for record in records: + item_operation = "asset.metadata.add" + try: + asset = self.repository.get_asset(asset.id) + result = self._save_metadata_record_change( + asset, + record, + context, + decision, + operation=item_operation, + ) + except KontextualError as exc: + item_results.append( + BatchItemResult.failed( + item_id=record.record_id, + operation=item_operation, + error=exc.to_operation_failure( + operation=item_operation, + correlation_id=context.correlation_id, + remediation=_remediation_for_error(exc), + ), + ) + ) + continue + item_results.append( + BatchItemResult.succeeded( + item_id=record.record_id, + operation=item_operation, + result_ref={ + "asset_id": result.asset.id, + "record_id": record.record_id, + "version_id": result.version.version_id, + "audit_event_id": result.audit_event.event_id, + }, + ) + ) + batch_result = BatchOperationResult( + operation=operation, + correlation_id=context.correlation_id, + items=tuple(item_results), + ) + audit_event = self._audit( + operation, + f"asset:{asset.id}", + AuditOutcome(batch_result.outcome), + context, + decision, + details={ + "total": batch_result.total, + "succeeded": batch_result.succeeded, + "failed": batch_result.failed, + "failed_item_ids": [ + item.item_id for item in batch_result.items if not item.success + ], + }, + ) + return BatchOperationResult( + operation=batch_result.operation, + correlation_id=batch_result.correlation_id, + items=batch_result.items, + audit_event_id=audit_event.event_id, + ) + + def _save_metadata_record_change( + self, + asset: KnowledgeAsset, + record: MetadataRecord, + context: OperationContext, + decision: PolicyDecision, + *, + operation: str, + ) -> AssetChangeResult: next_sequence = self._next_sequence(asset.id) self._validate_metadata_records( asset.classification, @@ -177,7 +285,7 @@ class AssetRegistryService: self.repository.save_asset(asset) self.repository.save_version(version) event = self._audit( - "asset.metadata.add", + operation, f"asset:{asset.id}", AuditOutcome.SUCCESS, context, @@ -686,7 +794,7 @@ class AssetRegistryService: context: OperationContext, policy_decision: PolicyDecision, *, - details: dict[str, str] | None = None, + details: dict[str, Any] | None = None, ) -> AuditEvent: event = AuditEvent.from_context( operation, @@ -761,3 +869,15 @@ class AssetRegistryService: "Idempotency record references an unknown asset version", details={"asset_id": asset_id, "version_id": version_id}, ) + + +def _remediation_for_error(error: KontextualError) -> str | None: + if isinstance(error, ValidationError): + if error.details.get("code") == "asset.version_conflict": + return "Reload the asset, review the current version, and retry with the latest expected_current_version_id." + if error.details.get("issues"): + return "Correct the submitted fields so they satisfy the active metadata schema, then retry the failed item." + return "Correct the submitted value and retry the failed item." + if isinstance(error, AuthorizationError): + return "Request policy approval or rerun with an actor that is authorized for this operation." + return None diff --git a/src/kontextual_engine/services/ingestion_service.py b/src/kontextual_engine/services/ingestion_service.py index 18ac917..f8dec01 100644 --- a/src/kontextual_engine/services/ingestion_service.py +++ b/src/kontextual_engine/services/ingestion_service.py @@ -66,6 +66,7 @@ class AssetIngestionService: classification: Classification | None = None, idempotency_key: str | None = None, ) -> AssetIngestionResult: + self.repository.save_actor(context.actor) connector = self._connector("local_file") job = IngestionJob.create( input={"connector": connector.name, "source_uri": str(path), "mode": "file"}, @@ -97,6 +98,7 @@ class AssetIngestionService: recursive: bool = True, classification: Classification | None = None, ) -> IngestionJob: + self.repository.save_actor(context.actor) connector = self._directory_connector("local_file") job = IngestionJob.create( input={ diff --git a/tests/test_asset_registry.py b/tests/test_asset_registry.py index 5784698..b2d32d1 100644 --- a/tests/test_asset_registry.py +++ b/tests/test_asset_registry.py @@ -7,11 +7,15 @@ from kontextual_engine import ( ActorType, AssetRegistryService, AssetRepresentation, + AssetVersion, AuthorizationError, + AuditEvent, + AuditOutcome, Classification, ContextEntity, ContextEntityType, InMemoryAssetRegistryRepository, + IngestionJob, LifecycleState, MetadataFieldDefinition, MetadataRecord, @@ -25,6 +29,7 @@ from kontextual_engine import ( SourceReference, SQLiteAssetRegistryRepository, ValidationError, + VersionChangeType, ) @@ -343,6 +348,117 @@ def test_asset_registry_validates_metadata_schema_before_writes() -> None: assert [record.key for record in repo.list_metadata_records(created.asset.id)] == ["owner"] +def test_asset_registry_metadata_batch_reports_partial_failures() -> None: + repo = InMemoryAssetRegistryRepository() + schema = MetadataSchema( + schema_id="schema-batch-note-v1", + name="Batch Note Metadata", + asset_types=("batch-note",), + allow_unknown=False, + fields=( + MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True), + MetadataFieldDefinition("priority", MetadataValueType.INTEGER, allow_multiple=True, min_value=1, max_value=5), + ), + ) + service = AssetRegistryService(repo, metadata_schemas=[schema]) + context = operation_context() + created = service.create_asset( + "Batch Note", + Classification(asset_type="batch-note", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-batch-note", + metadata_records=[MetadataRecord("owner", "Platform Knowledge", confirmed=True)], + ) + + result = service.add_metadata_records_batch( + created.asset.id, + ( + MetadataRecord("priority", 3, record_id="meta-priority-ok"), + MetadataRecord("priority", 9, record_id="meta-priority-too-large"), + MetadataRecord("phase", "beta", record_id="meta-phase-unknown"), + ), + context, + expected_current_version_id=created.version.version_id, + ) + + assert result.total == 3 + assert result.succeeded == 1 + assert result.failed == 2 + assert result.partial is True + assert result.outcome == "partial" + assert [item.success for item in result.items] == [True, False, False] + assert result.items[0].result_ref["record_id"] == "meta-priority-ok" + assert result.items[1].error is not None + assert result.items[1].error.code == "kontextual.validation" + assert result.items[1].error.correlation_id == "corr-test" + assert "metadata schema" in result.items[1].error.remediation + assert {issue["code"] for issue in result.items[1].error.details["issues"]} == { + "metadata.value_too_large" + } + assert result.items[2].error is not None + assert {issue["code"] for issue in result.items[2].error.details["issues"]} == { + "metadata.unknown_field" + } + assert result.to_dict()["audit_event_id"] == result.audit_event_id + + metadata_records = repo.list_metadata_records(created.asset.id) + versions = repo.list_versions(created.asset.id) + events = repo.list_audit_events(target=f"asset:{created.asset.id}") + + assert [record.key for record in metadata_records] == ["owner", "priority"] + assert metadata_records[1].record_id == "meta-priority-ok" + assert [version.sequence for version in versions] == [1, 2] + assert versions[-1].metadata_delta == {"priority": 3} + assert [event.operation for event in events] == [ + "asset.create", + "asset.metadata.add", + "asset.metadata.batch_add", + ] + assert events[-1].outcome.value == "partial" + assert events[-1].correlation_id == "corr-test" + assert events[-1].details["total"] == 3 + assert events[-1].details["succeeded"] == 1 + assert events[-1].details["failed"] == 2 + assert events[-1].details["failed_item_ids"] == [ + "meta-priority-too-large", + "meta-phase-unknown", + ] + + +def test_asset_registry_metadata_batch_rejects_stale_expected_version_before_writes() -> None: + repo = InMemoryAssetRegistryRepository() + service = AssetRegistryService(repo) + context = operation_context() + created = service.create_asset( + "Batch Conflict", + Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-batch-conflict", + ) + service.add_metadata_record( + created.asset.id, + MetadataRecord("owner", "Platform Knowledge", confirmed=True), + context, + expected_current_version_id=created.version.version_id, + ) + + with pytest.raises(ValidationError) as exc_info: + service.add_metadata_records_batch( + created.asset.id, + (MetadataRecord("topic", "architecture"),), + context, + expected_current_version_id=created.version.version_id, + ) + + assert exc_info.value.details["code"] == "asset.version_conflict" + assert exc_info.value.details["operation"] == "asset.metadata.batch_add" + assert [record.key for record in repo.list_metadata_records(created.asset.id)] == ["owner"] + assert [event.operation for event in repo.list_audit_events(target=f"asset:{created.asset.id}")] == [ + "asset.create", + "asset.metadata.add", + ] + + def test_asset_registry_applies_persisted_metadata_schema_assignments() -> None: repo = InMemoryAssetRegistryRepository() service = AssetRegistryService(repo) @@ -662,6 +778,54 @@ def test_sqlite_registry_filters_assets_after_reload(tmp_path: Path) -> None: ] == ["asset-guide"] +def test_sqlite_registry_persists_metadata_batch_partial_audit_after_reload(tmp_path: Path) -> None: + db_path = tmp_path / "registry.sqlite" + repo = SQLiteAssetRegistryRepository(db_path) + schema = MetadataSchema( + schema_id="schema-batch-ticket-v1", + name="Batch Ticket Metadata", + asset_types=("ticket",), + allow_unknown=False, + fields=( + MetadataFieldDefinition("owner", MetadataValueType.STRING, required=True, require_confirmed=True), + MetadataFieldDefinition("severity", MetadataValueType.INTEGER, allow_multiple=True, min_value=1, max_value=5), + ), + ) + service = AssetRegistryService(repo, metadata_schemas=[schema]) + context = operation_context() + created = service.create_asset( + "Batch Ticket", + Classification(asset_type="ticket", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-batch-ticket", + metadata_records=[MetadataRecord("owner", "Operations", confirmed=True)], + ) + + result = service.add_metadata_records_batch( + created.asset.id, + ( + MetadataRecord("severity", 4, record_id="meta-severity-ok"), + MetadataRecord("severity", 9, record_id="meta-severity-too-large"), + ), + context, + expected_current_version_id=created.version.version_id, + ) + reloaded = SQLiteAssetRegistryRepository(db_path) + + assert result.outcome == "partial" + assert result.audit_event_id is not None + assert [record.key for record in reloaded.list_metadata_records(created.asset.id)] == [ + "owner", + "severity", + ] + assert reloaded.list_metadata_records(created.asset.id)[1].record_id == "meta-severity-ok" + assert [version.sequence for version in reloaded.list_versions(created.asset.id)] == [1, 2] + assert reloaded.get_audit_event(result.audit_event_id).outcome == AuditOutcome.PARTIAL + assert reloaded.get_audit_event(result.audit_event_id).details["failed_item_ids"] == [ + "meta-severity-too-large" + ] + + def test_sqlite_registry_enforces_representation_asset_reference(tmp_path: Path) -> None: repo = SQLiteAssetRegistryRepository(tmp_path / "registry.sqlite") representation = AssetRepresentation.from_content( @@ -675,6 +839,39 @@ def test_sqlite_registry_enforces_representation_asset_reference(tmp_path: Path) repo.save_representation(representation) +def test_sqlite_registry_enforces_durable_reference_integrity(tmp_path: Path) -> None: + repo = SQLiteAssetRegistryRepository(tmp_path / "registry.sqlite") + + with pytest.raises(ValidationError, match="unknown asset"): + repo.save_version( + AssetVersion( + asset_id="asset-missing", + sequence=1, + change_type=VersionChangeType.CREATED, + ) + ) + + with pytest.raises(ValidationError, match="unknown actor"): + repo.save_audit_event( + AuditEvent( + operation="asset.create", + target="asset:asset-missing", + outcome=AuditOutcome.SUCCESS, + actor_id="actor-missing", + correlation_id="corr-missing", + ) + ) + + with pytest.raises(ValidationError, match="unknown actor"): + repo.save_ingestion_job( + IngestionJob.create( + input={"connector": "local_file", "source_uri": "missing.txt"}, + actor_id="actor-missing", + correlation_id="corr-missing", + ) + ) + + def operation_context() -> OperationContext: actor = Actor.create( ActorType.HUMAN, diff --git a/workplans/KONT-WP-0005-asset-registry-governance-state.md b/workplans/KONT-WP-0005-asset-registry-governance-state.md index 666d44d..b5e694c 100644 --- a/workplans/KONT-WP-0005-asset-registry-governance-state.md +++ b/workplans/KONT-WP-0005-asset-registry-governance-state.md @@ -4,7 +4,7 @@ type: workplan title: "Asset Registry Governance And Durable State" domain: markitect repo: kontextual-engine -status: active +status: done owner: codex topic_slug: markitect planning_priority: high @@ -66,9 +66,13 @@ SQLite repositories, policy gateway boundary, audit events, versions, representations, metadata records, context entities, asset/context relationships, idempotent asset creation, and custom metadata schema validation before registry writes. It now also includes a durable metadata -schema registry and assignment rules for policy-selected validation. Remaining -work in this workplan is concentrated on broader audit/error coverage, durable -policy assignment details, and batch partial-failure envelopes. +schema registry and assignment rules for policy-selected validation, structured +operation failures, metadata batch partial-failure envelopes, and durable +SQLite reference checks for versions, audit actors, ingestion job actors, +metadata schema assignments, and relationship targets. This foundation +workplan is complete; enterprise policy adapters, richer policy-assignment +language, and production concurrency controls are intentionally left to +adjacent workplans. ## G5.1 - Implement stable asset identity and source references @@ -153,7 +157,7 @@ Acceptance: ```task id: KONT-WP-0005-T005 -status: in_progress +status: done priority: high state_hub_task_id: "3d2e98a1-3312-452a-a5f1-f7a73234b45b" ``` @@ -164,16 +168,27 @@ Acceptance: - Asset create, ingest, update, delete/retire, metadata, relationship, permission, query, transformation, workflow, export, and agent operations can - emit audit events. + emit audit events through the shared audit primitives as those operation + services land. - Structured errors include code, message, correlation ID, operation, and remediation hint where practical. - Partial failures are represented for batch operations. +Implemented registry baseline: + +- Registry mutations emit correlated audit events with `success`, `denied`, and + `partial` outcomes where applicable. +- `OperationFailure`, `BatchItemResult`, and `BatchOperationResult` provide the + reusable structured error and batch envelope primitives. +- Metadata batch updates return per-item diagnostics, preserve successful + writes, skip failed writes, and emit a final batch audit event with counts and + failed item IDs. + ## G5.6 - Implement durable SQLite repository for registry state ```task id: KONT-WP-0005-T006 -status: in_progress +status: done priority: high state_hub_task_id: "de155d02-3123-42da-8ede-f111bec62747" ``` @@ -188,6 +203,18 @@ Acceptance: versions, and audit references. - The in-memory backend remains useful for deterministic unit tests. +Implemented registry baseline: + +- SQLite persists assets, representations, metadata records, metadata schemas, + schema assignments, context entities, relationships, versions, audit events, + idempotency records, and ingestion jobs. +- SQLite reload tests cover asset state, relationships, context entities, + idempotency, schema assignments, metadata filters, ingestion jobs, and batch + partial audit state. +- Direct durable reference failures for versions, audit actors, and ingestion + job actors raise structured `ValidationError` diagnostics instead of leaking + raw SQLite integrity errors. + ## G5.7 - Implement versioning change history conflict and idempotency semantics ```task