generated from coulomb/repo-seed
125 lines
4.1 KiB
Python
125 lines
4.1 KiB
Python
"""Format-agnostic ingestion contracts and first adapters."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Protocol
|
|
|
|
from .artifacts import Artifact, ArtifactMetadata, ArtifactType
|
|
from .errors import AdapterUnavailableError
|
|
|
|
|
|
@dataclass
|
|
class IngestionRequest:
|
|
collection_id: str
|
|
name: str
|
|
content: str | bytes | None = None
|
|
path: str | Path | None = None
|
|
media_type: str = "text/plain"
|
|
artifact_type: ArtifactType = ArtifactType.CONTENT
|
|
metadata: dict[str, Any] = field(default_factory=dict)
|
|
|
|
def read_text(self) -> str:
|
|
if self.content is not None:
|
|
if isinstance(self.content, bytes):
|
|
return self.content.decode("utf-8")
|
|
return self.content
|
|
if self.path is None:
|
|
return ""
|
|
return Path(self.path).read_text(encoding="utf-8")
|
|
|
|
|
|
@dataclass
|
|
class IngestionResult:
|
|
artifacts: list[Artifact]
|
|
normalized: dict[str, Any] = field(default_factory=dict)
|
|
adapter: str = ""
|
|
|
|
|
|
class IngestionAdapter(Protocol):
|
|
media_types: tuple[str, ...]
|
|
|
|
def ingest(self, request: IngestionRequest) -> IngestionResult: ...
|
|
|
|
|
|
class PlainTextIngestionAdapter:
|
|
media_types = ("text/plain",)
|
|
|
|
def ingest(self, request: IngestionRequest) -> IngestionResult:
|
|
text = request.read_text()
|
|
metadata = ArtifactMetadata(
|
|
media_type=request.media_type,
|
|
source_uri=str(request.path) if request.path else None,
|
|
custom=dict(request.metadata),
|
|
)
|
|
artifact = Artifact.create(
|
|
request.collection_id,
|
|
request.name,
|
|
text,
|
|
artifact_type=request.artifact_type,
|
|
metadata=metadata,
|
|
)
|
|
return IngestionResult(
|
|
artifacts=[artifact],
|
|
normalized={"text": text, "media_type": request.media_type},
|
|
adapter="plain-text",
|
|
)
|
|
|
|
|
|
class MarkdownIngestionAdapter:
|
|
"""Adapter boundary to markitect-tool; no markdown parsing lives here."""
|
|
|
|
media_types = ("text/markdown", "text/x-markdown")
|
|
|
|
def ingest(self, request: IngestionRequest) -> IngestionResult:
|
|
try:
|
|
from markitect_tool import parse_markdown
|
|
except Exception as exc: # pragma: no cover - exercised when optional dep absent
|
|
raise AdapterUnavailableError(
|
|
"markitect-tool is required for markdown ingestion",
|
|
details={"adapter": "markitect-tool", "media_type": request.media_type},
|
|
) from exc
|
|
|
|
text = request.read_text()
|
|
document = parse_markdown(text, source_path=str(request.path) if request.path else None)
|
|
metadata = ArtifactMetadata(
|
|
media_type=request.media_type,
|
|
source_uri=str(request.path) if request.path else None,
|
|
custom={
|
|
**request.metadata,
|
|
"frontmatter": dict(document.frontmatter),
|
|
"headings": [heading.__dict__ for heading in document.headings],
|
|
},
|
|
)
|
|
artifact = Artifact.create(
|
|
request.collection_id,
|
|
request.name,
|
|
text,
|
|
artifact_type=request.artifact_type,
|
|
metadata=metadata,
|
|
)
|
|
return IngestionResult(
|
|
artifacts=[artifact],
|
|
normalized={
|
|
"frontmatter": dict(document.frontmatter),
|
|
"headings": [heading.__dict__ for heading in document.headings],
|
|
"sections": len(document.sections),
|
|
},
|
|
adapter="markitect-tool",
|
|
)
|
|
|
|
|
|
class IngestionService:
|
|
def __init__(self, adapters: list[IngestionAdapter] | None = None) -> None:
|
|
self.adapters = adapters or [PlainTextIngestionAdapter(), MarkdownIngestionAdapter()]
|
|
|
|
def ingest(self, request: IngestionRequest) -> IngestionResult:
|
|
for adapter in self.adapters:
|
|
if request.media_type in adapter.media_types:
|
|
return adapter.ingest(request)
|
|
raise AdapterUnavailableError(
|
|
"No ingestion adapter registered for media type",
|
|
details={"media_type": request.media_type},
|
|
)
|