Files
kontextual-engine/src/kontextual_engine/ingestion.py

125 lines
4.1 KiB
Python

"""Format-agnostic ingestion contracts and first adapters."""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Protocol
from .artifacts import Artifact, ArtifactMetadata, ArtifactType
from .errors import AdapterUnavailableError
@dataclass
class IngestionRequest:
collection_id: str
name: str
content: str | bytes | None = None
path: str | Path | None = None
media_type: str = "text/plain"
artifact_type: ArtifactType = ArtifactType.CONTENT
metadata: dict[str, Any] = field(default_factory=dict)
def read_text(self) -> str:
if self.content is not None:
if isinstance(self.content, bytes):
return self.content.decode("utf-8")
return self.content
if self.path is None:
return ""
return Path(self.path).read_text(encoding="utf-8")
@dataclass
class IngestionResult:
artifacts: list[Artifact]
normalized: dict[str, Any] = field(default_factory=dict)
adapter: str = ""
class IngestionAdapter(Protocol):
media_types: tuple[str, ...]
def ingest(self, request: IngestionRequest) -> IngestionResult: ...
class PlainTextIngestionAdapter:
media_types = ("text/plain",)
def ingest(self, request: IngestionRequest) -> IngestionResult:
text = request.read_text()
metadata = ArtifactMetadata(
media_type=request.media_type,
source_uri=str(request.path) if request.path else None,
custom=dict(request.metadata),
)
artifact = Artifact.create(
request.collection_id,
request.name,
text,
artifact_type=request.artifact_type,
metadata=metadata,
)
return IngestionResult(
artifacts=[artifact],
normalized={"text": text, "media_type": request.media_type},
adapter="plain-text",
)
class MarkdownIngestionAdapter:
"""Adapter boundary to markitect-tool; no markdown parsing lives here."""
media_types = ("text/markdown", "text/x-markdown")
def ingest(self, request: IngestionRequest) -> IngestionResult:
try:
from markitect_tool import parse_markdown
except Exception as exc: # pragma: no cover - exercised when optional dep absent
raise AdapterUnavailableError(
"markitect-tool is required for markdown ingestion",
details={"adapter": "markitect-tool", "media_type": request.media_type},
) from exc
text = request.read_text()
document = parse_markdown(text, source_path=str(request.path) if request.path else None)
metadata = ArtifactMetadata(
media_type=request.media_type,
source_uri=str(request.path) if request.path else None,
custom={
**request.metadata,
"frontmatter": dict(document.frontmatter),
"headings": [heading.__dict__ for heading in document.headings],
},
)
artifact = Artifact.create(
request.collection_id,
request.name,
text,
artifact_type=request.artifact_type,
metadata=metadata,
)
return IngestionResult(
artifacts=[artifact],
normalized={
"frontmatter": dict(document.frontmatter),
"headings": [heading.__dict__ for heading in document.headings],
"sections": len(document.sections),
},
adapter="markitect-tool",
)
class IngestionService:
def __init__(self, adapters: list[IngestionAdapter] | None = None) -> None:
self.adapters = adapters or [PlainTextIngestionAdapter(), MarkdownIngestionAdapter()]
def ingest(self, request: IngestionRequest) -> IngestionResult:
for adapter in self.adapters:
if request.media_type in adapter.media_types:
return adapter.ingest(request)
raise AdapterUnavailableError(
"No ingestion adapter registered for media type",
details={"media_type": request.media_type},
)