first ingestion/normalization slice

This commit is contained in:
2026-05-06 02:35:40 +02:00
parent 286ebc3cb6
commit 565a5643a3
19 changed files with 1231 additions and 10 deletions

View File

@@ -1,5 +1,12 @@
"""Application services for the engine."""
from .asset_service import AssetChangeResult, AssetRegistryService, RelationshipChangeResult
from .ingestion_service import AssetIngestionResult, AssetIngestionService
__all__ = ["AssetChangeResult", "AssetRegistryService", "RelationshipChangeResult"]
__all__ = [
"AssetChangeResult",
"AssetIngestionResult",
"AssetIngestionService",
"AssetRegistryService",
"RelationshipChangeResult",
]

View File

@@ -0,0 +1,304 @@
"""Application service for governed asset ingestion."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
from kontextual_engine.adapters.builtin_extractors import PlainTextExtractor
from kontextual_engine.adapters.local_files import LocalFileConnector
from kontextual_engine.adapters.markitect_tool import MarkitectMarkdownExtractor
from kontextual_engine.core import (
AssetRepresentation,
Classification,
IngestionFailure,
IngestionJob,
IngestionJobStatus,
KnowledgeAsset,
MetadataRecord,
OperationContext,
RepresentationKind,
Sensitivity,
SourcePayload,
mapping_digest,
)
from kontextual_engine.errors import AdapterUnavailableError, KontextualError
from kontextual_engine.ports import AssetRegistryRepository, DirectorySourceConnector, FormatExtractor, SourceConnector
from .asset_service import AssetChangeResult, AssetRegistryService
@dataclass(frozen=True)
class AssetIngestionResult:
job: IngestionJob
asset: KnowledgeAsset | None = None
asset_change: AssetChangeResult | None = None
class AssetIngestionService:
def __init__(
self,
repository: AssetRegistryRepository,
*,
asset_service: AssetRegistryService | None = None,
connectors: Iterable[SourceConnector] | None = None,
extractors: Iterable[FormatExtractor] | None = None,
) -> None:
self.repository = repository
self.asset_service = asset_service or AssetRegistryService(repository)
self.connectors = {connector.name: connector for connector in (connectors or [LocalFileConnector()])}
self.extractors = list(extractors or [PlainTextExtractor(), MarkitectMarkdownExtractor()])
def connector_capabilities(self) -> list[dict]:
return [connector.capabilities().to_dict() for connector in self.connectors.values()]
def extractor_capabilities(self) -> list[dict]:
return [extractor.capabilities().to_dict() for extractor in self.extractors]
def ingest_file(
self,
path: str | Path,
context: OperationContext,
*,
asset_id: str | None = None,
title: str | None = None,
classification: Classification | None = None,
idempotency_key: str | None = None,
) -> AssetIngestionResult:
connector = self._connector("local_file")
job = IngestionJob.create(
input={"connector": connector.name, "source_uri": str(path), "mode": "file"},
actor_id=context.actor.id,
correlation_id=context.correlation_id,
)
self.repository.save_ingestion_job(job)
try:
payload = connector.fetch(str(path))
return self._ingest_payload(
job,
payload,
context,
asset_id=asset_id,
title=title,
classification=classification,
idempotency_key=idempotency_key,
)
except Exception as exc:
failed = job.failed(_failure_from_exception(exc))
self.repository.save_ingestion_job(failed)
return AssetIngestionResult(failed)
def ingest_directory(
self,
path: str | Path,
context: OperationContext,
*,
recursive: bool = True,
classification: Classification | None = None,
) -> IngestionJob:
connector = self._directory_connector("local_file")
job = IngestionJob.create(
input={
"connector": connector.name,
"source_uri": str(path),
"mode": "directory",
"recursive": recursive,
},
actor_id=context.actor.id,
correlation_id=context.correlation_id,
)
job = job.running()
self.repository.save_ingestion_job(job)
output_asset_ids: list[str] = []
failures: list[IngestionFailure] = []
item_results: list[dict] = []
files = connector.iter_files(str(path), recursive=recursive)
for source_uri in files:
result = self.ingest_file(source_uri, context, classification=classification)
item = {
"source_uri": source_uri,
"job_id": result.job.job_id,
"status": result.job.status.value,
}
if result.asset is not None:
output_asset_ids.append(result.asset.id)
item["asset_id"] = result.asset.id
if result.job.failures:
failures.extend(result.job.failures)
item["failures"] = [failure.to_dict() for failure in result.job.failures]
item_results.append(item)
partial_results = {
"files_total": len(files),
"succeeded": sum(1 for item in item_results if item["status"] == IngestionJobStatus.COMPLETED.value),
"failed": sum(1 for item in item_results if item["status"] == IngestionJobStatus.FAILED.value),
"quarantined": sum(1 for item in item_results if item["status"] == IngestionJobStatus.QUARANTINED.value),
"skipped": 0,
"items": item_results,
}
if failures and output_asset_ids:
job = job.partially_completed(
output_asset_ids=tuple(output_asset_ids),
failures=tuple(failures),
partial_results=partial_results,
)
elif failures:
job = job.failed(
IngestionFailure(
code="ingestion.directory_failed",
message="Directory ingestion failed for all files",
retriable=True,
details=partial_results,
),
partial_results=partial_results,
)
else:
job = job.completed(output_asset_ids=tuple(output_asset_ids), partial_results=partial_results)
self.repository.save_ingestion_job(job)
return job
def get_job(self, job_id: str) -> IngestionJob:
return self.repository.get_ingestion_job(job_id)
def list_jobs(self, *, status: IngestionJobStatus | None = None) -> list[IngestionJob]:
return self.repository.list_ingestion_jobs(status=status)
def _ingest_payload(
self,
job: IngestionJob,
payload: SourcePayload,
context: OperationContext,
*,
asset_id: str | None,
title: str | None,
classification: Classification | None,
idempotency_key: str | None,
) -> AssetIngestionResult:
job = job.running(source_ref=payload.source_ref)
self.repository.save_ingestion_job(job)
extractor = self._extractor(payload.media_type)
extraction = extractor.extract(payload)
resolved_asset_id = asset_id or _stable_asset_id(payload)
source_representation = AssetRepresentation.from_content(
resolved_asset_id,
RepresentationKind.SOURCE,
payload.media_type,
payload.content,
storage_ref=payload.source_uri,
producer=payload.connector_name,
source_ref_id=payload.source_ref.id,
metadata={
"connector": payload.connector_name,
"source_digest": payload.content_digest,
"source_size_bytes": payload.size_bytes,
**payload.metadata,
},
)
normalized_representation = AssetRepresentation.from_content(
resolved_asset_id,
RepresentationKind.NORMALIZED,
extraction.normalized.media_type,
extraction.normalized.to_json(),
producer=extractor.name,
source_ref_id=payload.source_ref.id,
metadata={
"extractor": extractor.name,
"normalized_hash": extraction.normalized.normalized_hash,
**extraction.metadata,
},
)
asset_change = self.asset_service.create_asset(
title or payload.title,
classification or Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL),
context,
asset_id=resolved_asset_id,
source_refs=[payload.source_ref],
representations=[source_representation, normalized_representation],
metadata_records=_metadata_records(payload, extractor.name, extraction.metadata),
idempotency_key=idempotency_key,
)
completed = job.completed(
output_asset_ids=(asset_change.asset.id,),
partial_results={
"connector": payload.connector_name,
"extractor": extractor.name,
"source_digest": payload.content_digest,
"representations": [
source_representation.representation_id,
normalized_representation.representation_id,
],
"diagnostics": [diagnostic.to_dict() for diagnostic in extraction.diagnostics],
},
)
self.repository.save_ingestion_job(completed)
return AssetIngestionResult(completed, asset_change.asset, asset_change)
def _connector(self, name: str) -> SourceConnector:
try:
return self.connectors[name]
except KeyError as exc:
raise AdapterUnavailableError("Source connector is not registered", details={"connector": name}) from exc
def _directory_connector(self, name: str) -> DirectorySourceConnector:
connector = self._connector(name)
if not hasattr(connector, "iter_files"):
raise AdapterUnavailableError(
"Source connector does not support directory iteration",
details={"connector": name},
)
return connector # type: ignore[return-value]
def _extractor(self, media_type: str) -> FormatExtractor:
for extractor in self.extractors:
if extractor.supports(media_type):
return extractor
raise AdapterUnavailableError(
"No extractor registered for media type",
details={"media_type": media_type},
)
def _stable_asset_id(payload: SourcePayload) -> str:
digest = mapping_digest(
{
"source_system": payload.source_ref.source_system,
"path": payload.source_ref.path,
"uri": payload.source_ref.uri,
"external_id": payload.source_ref.external_id,
"connector_ref": payload.source_ref.connector_ref,
}
)
return f"asset-{digest.removeprefix('sha256:')[:20]}"
def _metadata_records(
payload: SourcePayload,
extractor_name: str,
extraction_metadata: dict,
) -> list[MetadataRecord]:
return [
MetadataRecord("source_media_type", payload.media_type, provenance={"producer": payload.connector_name}),
MetadataRecord("source_digest", payload.content_digest, provenance={"producer": payload.connector_name}),
MetadataRecord("source_size_bytes", payload.size_bytes, provenance={"producer": payload.connector_name}),
MetadataRecord("connector", payload.connector_name, provenance={"producer": payload.connector_name}, confirmed=True),
MetadataRecord("extractor", extractor_name, provenance={"producer": extractor_name}, confirmed=True),
MetadataRecord("extraction", dict(extraction_metadata), provenance={"producer": extractor_name}),
]
def _failure_from_exception(exc: Exception) -> IngestionFailure:
if isinstance(exc, KontextualError):
return IngestionFailure(
code=exc.code,
message=str(exc),
retriable=isinstance(exc, AdapterUnavailableError),
details=dict(exc.details),
)
return IngestionFailure(
code="ingestion.unexpected",
message=str(exc),
retriable=False,
details={"exception_type": type(exc).__name__},
)