richer normalized structure, permission context preservation

This commit is contained in:
2026-05-06 13:43:16 +02:00
parent a4a4759ac4
commit 24cb3c5b6a
10 changed files with 636 additions and 15 deletions

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import dataclass, replace
from pathlib import Path
from typing import Iterable
@@ -16,6 +16,7 @@ from kontextual_engine.adapters.markitect_tool import MarkitectMarkdownExtractor
from kontextual_engine.core import (
AssetRepresentation,
Classification,
ExtractionResult,
IngestionFailure,
IngestionIdentityPolicy,
IngestionJob,
@@ -178,14 +179,28 @@ class AssetIngestionService:
"failed": sum(1 for item in item_results if item["status"] == IngestionJobStatus.FAILED.value),
"quarantined": sum(1 for item in item_results if item["status"] == IngestionJobStatus.QUARANTINED.value),
"skipped": sum(1 for item in item_results if item["status"] == "skipped"),
"retriable": sum(1 for item in item_results if item.get("retry_state") == "retriable"),
"items": item_results,
}
failed_count = partial_results["failed"]
quarantined_count = partial_results["quarantined"]
if failures and output_asset_ids:
job = job.partially_completed(
output_asset_ids=tuple(output_asset_ids),
failures=tuple(failures),
partial_results=partial_results,
)
elif quarantined_count and not failed_count:
job = job.failed(
IngestionFailure(
code="ingestion.directory_quarantined",
message="Directory ingestion quarantined all non-skipped files",
retriable=False,
details=partial_results,
),
status=IngestionJobStatus.QUARANTINED,
partial_results=partial_results,
)
elif failures:
job = job.failed(
IngestionFailure(
@@ -224,6 +239,21 @@ class AssetIngestionService:
self.repository.save_ingestion_job(job)
extractor = self._extractor(payload.media_type)
extraction = extractor.extract(payload)
validation_failures = _validate_extraction(payload, extraction)
if validation_failures:
quarantined = _quarantined_job(
job,
validation_failures,
partial_results={
"action": "quarantined",
"connector": payload.connector_name,
"extractor": extraction.metadata.get("extractor"),
"source_digest": payload.content_digest,
"diagnostics": [failure.to_dict() for failure in validation_failures],
},
)
self.repository.save_ingestion_job(quarantined)
return AssetIngestionResult(quarantined, action="quarantined")
resolved_asset_id = asset_id or _stable_asset_id(payload, identity_policy)
existing_asset = _get_asset_or_none(self.repository, resolved_asset_id)
if existing_asset and skip_unchanged and _asset_has_source_reference(existing_asset, payload.source_ref):
@@ -254,6 +284,7 @@ class AssetIngestionService:
"connector": payload.connector_name,
"source_digest": payload.content_digest,
"source_size_bytes": payload.size_bytes,
"permission_context": dict(payload.permission_context),
**payload.metadata,
},
)
@@ -267,6 +298,7 @@ class AssetIngestionService:
metadata={
"extractor": extractor.name,
"normalized_hash": extraction.normalized.normalized_hash,
"permission_context": dict(payload.permission_context),
**extraction.metadata,
},
)
@@ -380,7 +412,7 @@ def _metadata_records(
extractor_name: str,
extraction_metadata: dict,
) -> list[MetadataRecord]:
return [
records = [
MetadataRecord("source_media_type", payload.media_type, provenance={"producer": payload.connector_name}),
MetadataRecord("source_digest", payload.content_digest, provenance={"producer": payload.connector_name}),
MetadataRecord("source_size_bytes", payload.size_bytes, provenance={"producer": payload.connector_name}),
@@ -388,6 +420,120 @@ def _metadata_records(
MetadataRecord("extractor", extractor_name, provenance={"producer": extractor_name}, confirmed=True),
MetadataRecord("extraction", dict(extraction_metadata), provenance={"producer": extractor_name}),
]
if payload.permission_context:
records.append(
MetadataRecord(
"source_permission_context",
dict(payload.permission_context),
provenance={"producer": payload.connector_name},
confirmed=True,
)
)
return records
def _validate_extraction(
payload: SourcePayload,
extraction: ExtractionResult,
) -> list[IngestionFailure]:
failures: list[IngestionFailure] = []
if payload.permission_context.get("ingest_allowed") is False:
failures.append(
IngestionFailure(
code="ingestion.permission_denied",
message="Source permission context does not allow ingestion",
retriable=False,
details={"permission_context": dict(payload.permission_context)},
)
)
if not payload.source_ref.source_system:
failures.append(
IngestionFailure(
code="ingestion.source_system_missing",
message="Source reference is missing source system",
retriable=False,
details={"source_uri": payload.source_uri},
)
)
if not payload.source_ref.checksum:
failures.append(
IngestionFailure(
code="ingestion.source_checksum_missing",
message="Source reference is missing checksum provenance",
retriable=False,
details={"source_uri": payload.source_uri},
)
)
elif payload.source_ref.checksum != payload.content_digest:
failures.append(
IngestionFailure(
code="ingestion.source_checksum_mismatch",
message="Source reference checksum does not match payload content",
retriable=True,
details={
"source_uri": payload.source_uri,
"source_checksum": payload.source_ref.checksum,
"payload_digest": payload.content_digest,
},
)
)
if not extraction.metadata.get("extractor"):
failures.append(
IngestionFailure(
code="ingestion.extractor_metadata_missing",
message="Extraction result is missing extractor provenance",
retriable=False,
details={"source_uri": payload.source_uri},
)
)
normalized = extraction.normalized
has_normalized_output = bool(
normalized.text.strip()
or normalized.structure
or normalized.tables
or normalized.links
or normalized.fields
or normalized.unsupported_elements
)
if not has_normalized_output:
failures.append(
IngestionFailure(
code="ingestion.normalized_empty",
message="Extraction produced no normalized content, structure, fields, or diagnostics",
retriable=False,
details={"source_uri": payload.source_uri, "media_type": payload.media_type},
)
)
if (
normalized.confidence is not None
and normalized.confidence < 0.2
and not normalized.unsupported_elements
):
failures.append(
IngestionFailure(
code="ingestion.extraction_low_confidence",
message="Extraction confidence is below trusted ingestion threshold",
retriable=False,
details={"confidence": normalized.confidence, "threshold": 0.2},
)
)
return failures
def _quarantined_job(
job: IngestionJob,
failures: list[IngestionFailure],
*,
partial_results: dict,
) -> IngestionJob:
quarantined = job.failed(
failures[0],
status=IngestionJobStatus.QUARANTINED,
partial_results=partial_results,
)
if len(failures) > 1:
quarantined = replace(quarantined, failures=tuple(failures))
return quarantined
def _failure_from_exception(exc: Exception) -> IngestionFailure: