From 925b36521d8848f2f615fcec9274808c352944d9 Mon Sep 17 00:00:00 2001 From: tegwick Date: Thu, 14 May 2026 22:46:51 +0200 Subject: [PATCH] epub3 inbound filter --- README.md | 23 +- pyproject.toml | 30 + src/markitect_filter/__init__.py | 5 + src/markitect_filter/adapters.py | 51 ++ src/markitect_filter/epub3.py | 602 +++++++++++++++++++ tests/test_epub3_adapter.py | 206 +++++++ workplans/MKTF-WP-0001-epub3-read-adapter.md | 56 ++ 7 files changed, 971 insertions(+), 2 deletions(-) create mode 100644 pyproject.toml create mode 100644 src/markitect_filter/__init__.py create mode 100644 src/markitect_filter/adapters.py create mode 100644 src/markitect_filter/epub3.py create mode 100644 tests/test_epub3_adapter.py create mode 100644 workplans/MKTF-WP-0001-epub3-read-adapter.md diff --git a/README.md b/README.md index fcd7b8f..f9370e5 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,22 @@ -# repo-seed +# markitect-filter -A git repository template to bootstrap coulomb projects from. \ No newline at end of file +`markitect-filter` provides concrete source-format adapters for converting +external document formats into canonical Markitect Markdown representations. + +The first adapter is `source.epub3`, a read-only EPUB3 adapter that implements +the `markitect-tool` source adapter contract. + +## Development + +Run tests from this checkout: + +```bash +PYTHONPATH=src:/home/worsch/markitect-tool/src python3 -m pytest +``` + +The EPUB3 adapter is registered through: + +```toml +[project.entry-points."markitect_tool.source_adapters"] +epub3 = "markitect_filter.adapters:epub3_adapter_descriptor" +``` diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..74b1104 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,30 @@ +[build-system] +requires = ["setuptools>=69"] +build-backend = "setuptools.build_meta" + +[project] +name = "markitect-filter" +version = "0.1.0" +description = "Source-format adapters for Markitect normalized Markdown" +readme = "README.md" +requires-python = ">=3.12" +license = { text = "MIT" } +dependencies = [ + "markitect-tool @ file:///home/worsch/markitect-tool", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8", +] +epub3 = [] + +[project.entry-points."markitect_tool.source_adapters"] +epub3 = "markitect_filter.adapters:epub3_adapter_descriptor" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +pythonpath = ["src", "../markitect-tool/src"] diff --git a/src/markitect_filter/__init__.py b/src/markitect_filter/__init__.py new file mode 100644 index 0000000..3ee7b74 --- /dev/null +++ b/src/markitect_filter/__init__.py @@ -0,0 +1,5 @@ +"""Concrete source-format adapters for Markitect.""" + +from markitect_filter.adapters import epub3_adapter_descriptor + +__all__ = ["epub3_adapter_descriptor"] diff --git a/src/markitect_filter/adapters.py b/src/markitect_filter/adapters.py new file mode 100644 index 0000000..29e236c --- /dev/null +++ b/src/markitect_filter/adapters.py @@ -0,0 +1,51 @@ +"""Adapter descriptors exposed through Markitect entry points.""" + +from __future__ import annotations + +from markitect_tool.source import SourceAdapterDescriptor + + +def epub3_adapter_descriptor() -> SourceAdapterDescriptor: + """Return the lightweight EPUB3 read adapter descriptor.""" + + def factory(): + from markitect_filter.epub3 import Epub3ReadAdapter + + return Epub3ReadAdapter() + + return SourceAdapterDescriptor( + id="source.epub3", + version="1", + name="EPUB3", + summary="Read EPUB3 packages into canonical Markitect Markdown.", + operations=["read"], + media_types=["application/epub+zip"], + extensions=[".epub"], + factory=factory, + option_schema={ + "type": "object", + "properties": { + "skip_boilerplate": { + "type": "boolean", + "default": True, + "description": "Skip cover, nav, toc, headers, footers, and similar boilerplate when detected.", + } + }, + "additionalProperties": False, + }, + safety={ + "reads_files": True, + "writes_files": False, + "network": False, + "external_process": False, + }, + quality_profile={ + "text_extraction": "stdlib-xhtml", + "images": "metadata-only", + "styles": "ignored", + }, + metadata={ + "format": "EPUB3", + "dependency_profile": "stdlib", + }, + ) diff --git a/src/markitect_filter/epub3.py b/src/markitect_filter/epub3.py new file mode 100644 index 0000000..5db4fcf --- /dev/null +++ b/src/markitect_filter/epub3.py @@ -0,0 +1,602 @@ +"""EPUB3 read adapter implementation.""" + +from __future__ import annotations + +import posixpath +import re +import zipfile +from dataclasses import dataclass +from pathlib import Path +from typing import Any +from xml.etree import ElementTree as ET + +from markitect_tool.diagnostics import Diagnostic, SourceLocation, has_error +from markitect_tool.source import ( + NormalizationQuality, + NormalizedMarkdownDocument, + NormalizedMarkdownSegment, + SourceAdapterMatch, + SourceAdapterMatchRequest, + SourceAsset, + SourceInspectRequest, + SourceInspectResult, + SourceMetadata, + SourceProvenance, + SourceReadRequest, + SourceReadResult, + normalization_cache_key, +) + +from markitect_filter.adapters import epub3_adapter_descriptor + + +XHTML_MEDIA_TYPES = { + "application/xhtml+xml", + "text/html", +} +BOILERPLATE_HINTS = { + "cover", + "nav", + "toc", + "table-of-contents", + "titlepage", + "copyright", + "license", + "transcriber", + "header", + "footer", +} + + +@dataclass(frozen=True) +class EpubPackage: + rootfile_path: str + metadata: SourceMetadata + manifest: dict[str, dict[str, str]] + spine: list[str] + nav_labels: dict[str, str] + diagnostics: list[Diagnostic] + + +@dataclass(frozen=True) +class ExtractedSegment: + segment: NormalizedMarkdownSegment + diagnostics: list[Diagnostic] + + +class Epub3ReadAdapter: + """Read EPUB3 packages into normalized Markitect Markdown.""" + + def __init__(self) -> None: + self.descriptor = epub3_adapter_descriptor() + + def can_read(self, request: SourceAdapterMatchRequest) -> SourceAdapterMatch: + asset = request.asset + if asset.media_type == "application/epub+zip": + return SourceAdapterMatch( + adapter_id=self.descriptor.id, + matched=True, + confidence=100, + reason="media_type", + ) + if asset.extension == ".epub": + return SourceAdapterMatch( + adapter_id=self.descriptor.id, + matched=True, + confidence=80, + reason="extension", + ) + return SourceAdapterMatch( + adapter_id=self.descriptor.id, + matched=False, + confidence=0, + reason="unsupported", + ) + + def inspect(self, request: SourceInspectRequest) -> SourceInspectResult: + package = _load_package(request.asset) + diagnostics = package.diagnostics if package else [_malformed(request.asset, "Unable to read EPUB package.")] + metadata = package.metadata if package else SourceMetadata() + return SourceInspectResult( + asset=request.asset, + adapter=_adapter_info(request.options), + metadata=metadata, + capabilities=["read"], + quality=NormalizationQuality( + lossiness="unknown" if has_error(diagnostics) else "low", + confidence=0.9 if not has_error(diagnostics) else 0.0, + warnings=_warning_count(diagnostics), + ), + diagnostics=diagnostics, + valid=not has_error(diagnostics), + ) + + def read(self, request: SourceReadRequest) -> SourceReadResult: + package = _load_package(request.asset) + if package is None or has_error(package.diagnostics): + diagnostics = package.diagnostics if package else [_malformed(request.asset, "Unable to read EPUB package.")] + return SourceReadResult(diagnostics=diagnostics, valid=False) + + diagnostics = list(package.diagnostics) + extracted: list[ExtractedSegment] = [] + skip_boilerplate = bool(request.options.get("skip_boilerplate", True)) + try: + with zipfile.ZipFile(Path(request.asset.path or request.asset.uri)) as archive: + for order, item_id in enumerate(package.spine): + item = package.manifest.get(item_id) + if item is None: + diagnostics.append( + _warning( + request.asset, + "source.epub3.missing_spine_item", + f"Spine item `{item_id}` is missing from the manifest.", + details={"idref": item_id}, + ) + ) + continue + media_type = item.get("media_type", "") + href = item.get("href", "") + package_path = _resolve_package_path(package.rootfile_path, href) + if media_type not in XHTML_MEDIA_TYPES: + diagnostics.append( + _warning( + request.asset, + "source.epub3.unsupported_media", + f"Skipped unsupported spine media type `{media_type}`.", + details={"href": href, "media_type": media_type}, + ) + ) + continue + if skip_boilerplate and _is_boilerplate(item): + diagnostics.append( + _warning( + request.asset, + "source.epub3.skipped_boilerplate", + f"Skipped boilerplate spine item `{href}`.", + details={"href": href}, + ) + ) + continue + extracted.append( + _extract_segment( + archive, + request.asset, + package_path, + href, + order=len(extracted), + nav_label=package.nav_labels.get(href) + or package.nav_labels.get(package_path), + ) + ) + except (OSError, zipfile.BadZipFile) as exc: + return SourceReadResult( + diagnostics=[_malformed(request.asset, "EPUB is not a readable ZIP package.", {"error": str(exc)})], + valid=False, + ) + + segments = [item.segment for item in extracted if item.segment.markdown.strip()] + for item in extracted: + diagnostics.extend(item.diagnostics) + if not segments: + diagnostics.append(_malformed(request.asset, "EPUB did not produce any Markdown segments.")) + return SourceReadResult(diagnostics=diagnostics, valid=False) + + markdown = "\n\n".join(segment.markdown.strip() for segment in segments) + quality = NormalizationQuality( + lossiness="low" if _warning_count(diagnostics) else "none", + confidence=0.9 if not has_error(diagnostics) else 0.0, + skipped_items=sum(1 for diagnostic in diagnostics if diagnostic.code == "source.epub3.skipped_boilerplate"), + warnings=_warning_count(diagnostics), + metadata={"extraction": "epub3-stdlib-xhtml"}, + ) + adapter = _adapter_info(request.options) + document = NormalizedMarkdownDocument( + document_id=_document_id(request.asset, package.metadata), + asset=request.asset, + metadata=package.metadata, + markdown=markdown, + segments=segments, + quality=quality, + diagnostics=diagnostics, + provenance=[ + SourceProvenance( + source_uri=request.asset.uri, + source_path=request.asset.path, + digest=request.asset.digest, + metadata={"rootfile": package.rootfile_path}, + ) + ], + adapter=adapter, + cache_key=normalization_cache_key( + asset=request.asset, + adapter_id=self.descriptor.id, + adapter_version=self.descriptor.version, + options=request.options, + ), + ) + return SourceReadResult(document=document, diagnostics=[], valid=not has_error(diagnostics)) + + +def _load_package(asset: SourceAsset) -> EpubPackage | None: + diagnostics: list[Diagnostic] = [] + try: + with zipfile.ZipFile(Path(asset.path or asset.uri)) as archive: + rootfile_path = _read_container(archive, asset, diagnostics) + if not rootfile_path: + return EpubPackage("", SourceMetadata(), {}, [], {}, diagnostics) + package_xml = _read_xml(archive, rootfile_path, asset, diagnostics) + if package_xml is None: + return EpubPackage(rootfile_path, SourceMetadata(), {}, [], {}, diagnostics) + metadata = _extract_metadata(package_xml) + manifest = _extract_manifest(package_xml) + spine = _extract_spine(package_xml) + nav_labels = _extract_nav_labels(archive, rootfile_path, manifest, asset, diagnostics) + if not spine: + diagnostics.append(_malformed(asset, "EPUB package does not declare a spine.")) + return EpubPackage(rootfile_path, metadata, manifest, spine, nav_labels, diagnostics) + except (OSError, zipfile.BadZipFile) as exc: + return EpubPackage( + "", + SourceMetadata(), + {}, + [], + {}, + [_malformed(asset, "EPUB is not a readable ZIP package.", {"error": str(exc)})], + ) + + +def _read_container( + archive: zipfile.ZipFile, + asset: SourceAsset, + diagnostics: list[Diagnostic], +) -> str | None: + container = _read_xml(archive, "META-INF/container.xml", asset, diagnostics) + if container is None: + diagnostics.append(_malformed(asset, "EPUB is missing META-INF/container.xml.")) + return None + for element in container.iter(): + if _local_name(element.tag) == "rootfile": + full_path = element.attrib.get("full-path") + if full_path: + return full_path + diagnostics.append(_malformed(asset, "EPUB container does not declare a rootfile.")) + return None + + +def _read_xml( + archive: zipfile.ZipFile, + path: str, + asset: SourceAsset, + diagnostics: list[Diagnostic], +) -> ET.Element | None: + try: + return ET.fromstring(archive.read(path)) + except KeyError: + diagnostics.append( + _malformed(asset, f"EPUB package entry `{path}` is missing.", {"path": path}) + ) + except ET.ParseError as exc: + diagnostics.append( + _malformed(asset, f"EPUB XML entry `{path}` is malformed.", {"path": path, "error": str(exc)}) + ) + return None + + +def _extract_metadata(package_xml: ET.Element) -> SourceMetadata: + raw: dict[str, Any] = {} + titles: list[str] = [] + creators: list[str] = [] + identifiers: dict[str, str] = {} + language = None + rights = None + publisher = None + publication_date = None + for element in package_xml.iter(): + name = _local_name(element.tag) + text = _clean_text("".join(element.itertext())) + if not text: + continue + if name == "title": + titles.append(text) + elif name == "creator": + creators.append(text) + elif name == "language" and language is None: + language = text + elif name == "rights" and rights is None: + rights = text + elif name == "publisher" and publisher is None: + publisher = text + elif name == "date" and publication_date is None: + publication_date = text + elif name == "identifier": + identifier_key = element.attrib.get("id") or f"identifier-{len(identifiers) + 1}" + identifiers[identifier_key] = text + elif name == "meta": + key = element.attrib.get("property") or element.attrib.get("name") + value = text or element.attrib.get("content") + if key and value: + raw[key] = value + return SourceMetadata( + title=titles[0] if titles else None, + creators=creators, + language=language, + rights=rights, + publication_date=publication_date, + publisher=publisher, + identifiers=identifiers, + raw=raw, + ) + + +def _extract_manifest(package_xml: ET.Element) -> dict[str, dict[str, str]]: + manifest: dict[str, dict[str, str]] = {} + for element in package_xml.iter(): + if _local_name(element.tag) != "item": + continue + item_id = element.attrib.get("id") + href = element.attrib.get("href") + if not item_id or not href: + continue + manifest[item_id] = { + "id": item_id, + "href": href, + "media_type": element.attrib.get("media-type", ""), + "properties": element.attrib.get("properties", ""), + } + return manifest + + +def _extract_spine(package_xml: ET.Element) -> list[str]: + spine: list[str] = [] + in_spine = False + for element in package_xml.iter(): + name = _local_name(element.tag) + if name == "spine": + in_spine = True + for child in list(element): + if _local_name(child.tag) == "itemref" and child.attrib.get("idref"): + spine.append(child.attrib["idref"]) + break + return spine if in_spine else [] + + +def _extract_nav_labels( + archive: zipfile.ZipFile, + rootfile_path: str, + manifest: dict[str, dict[str, str]], + asset: SourceAsset, + diagnostics: list[Diagnostic], +) -> dict[str, str]: + nav_item = next( + ( + item + for item in manifest.values() + if "nav" in item.get("properties", "").split() + ), + None, + ) + if nav_item is None: + return {} + nav_path = _resolve_package_path(rootfile_path, nav_item["href"]) + nav_xml = _read_xml(archive, nav_path, asset, diagnostics) + if nav_xml is None: + return {} + labels: dict[str, str] = {} + for element in nav_xml.iter(): + if _local_name(element.tag) != "a": + continue + href = element.attrib.get("href") + label = _clean_text("".join(element.itertext())) + if href and label: + labels[href.split("#", 1)[0]] = label + labels[_resolve_package_path(rootfile_path, href.split("#", 1)[0])] = label + return labels + + +def _extract_segment( + archive: zipfile.ZipFile, + asset: SourceAsset, + package_path: str, + href: str, + *, + order: int, + nav_label: str | None, +) -> ExtractedSegment: + diagnostics: list[Diagnostic] = [] + document_xml = _read_xml(archive, package_path, asset, diagnostics) + if document_xml is None: + return ExtractedSegment( + NormalizedMarkdownSegment( + segment_id=f"seg-{order + 1:04d}", + order=order, + markdown="", + ), + diagnostics, + ) + body = _first_descendant(document_xml, "body") + if body is None: + body = document_xml + anchors = _anchors(body) + blocks = _element_blocks(body) + markdown = "\n\n".join(block for block in blocks if block.strip()).strip() + heading, heading_level = _first_heading(body) + heading = heading or nav_label + segment_id = _segment_id(anchors, order) + provenance = SourceProvenance( + source_uri=asset.uri, + source_path=asset.path, + source_href=href, + package_path=package_path, + anchor=anchors[0] if anchors else None, + section=heading, + ) + return ExtractedSegment( + NormalizedMarkdownSegment( + segment_id=segment_id, + order=order, + markdown=markdown, + heading=heading, + heading_level=heading_level, + anchors=anchors, + provenance=[provenance], + metadata={"package_path": package_path, "href": href}, + ), + diagnostics, + ) + + +def _element_blocks(element: ET.Element) -> list[str]: + blocks: list[str] = [] + for child in list(element): + name = _local_name(child.tag) + if name in {"script", "style", "head"}: + continue + if name in {"h1", "h2", "h3", "h4", "h5", "h6"}: + text = _inline_text(child) + if text: + blocks.append(f"{'#' * int(name[1])} {text}") + elif name == "p": + text = _inline_text(child) + if text: + blocks.append(text) + elif name in {"ul", "ol"}: + for item in child: + if _local_name(item.tag) == "li": + text = _inline_text(item) + if text: + blocks.append(f"- {text}") + elif name == "blockquote": + text = _inline_text(child) + if text: + blocks.append("\n".join(f"> {line}" for line in text.splitlines())) + elif name in {"section", "article", "div", "main", "body"}: + blocks.extend(_element_blocks(child)) + elif name in {"br", "hr"}: + continue + else: + blocks.extend(_element_blocks(child)) + text = _direct_text(child) + if text: + blocks.append(text) + return blocks + + +def _inline_text(element: ET.Element) -> str: + return _clean_text(" ".join(part for part in element.itertext())) + + +def _direct_text(element: ET.Element) -> str: + values = [element.text or ""] + for child in list(element): + values.append(child.tail or "") + return _clean_text(" ".join(values)) + + +def _first_heading(element: ET.Element) -> tuple[str | None, int | None]: + for descendant in element.iter(): + name = _local_name(descendant.tag) + if name in {"h1", "h2", "h3", "h4", "h5", "h6"}: + text = _inline_text(descendant) + if text: + return text, int(name[1]) + return None, None + + +def _first_descendant(element: ET.Element, local_name: str) -> ET.Element | None: + for descendant in element.iter(): + if _local_name(descendant.tag) == local_name: + return descendant + return None + + +def _anchors(element: ET.Element) -> list[str]: + anchors: list[str] = [] + for descendant in element.iter(): + value = descendant.attrib.get("id") or descendant.attrib.get("name") + if value and value not in anchors: + anchors.append(value) + return anchors + + +def _is_boilerplate(item: dict[str, str]) -> bool: + haystack = " ".join( + [ + item.get("id", ""), + item.get("href", ""), + item.get("properties", ""), + ] + ).lower() + return any(hint in haystack for hint in BOILERPLATE_HINTS) + + +def _resolve_package_path(rootfile_path: str, href: str) -> str: + base = posixpath.dirname(rootfile_path) + return posixpath.normpath(posixpath.join(base, href)) + + +def _segment_id(anchors: list[str], order: int) -> str: + if anchors: + slug = re.sub(r"[^a-z0-9._-]+", "-", anchors[0].lower()).strip("-") + if slug: + return slug + return f"seg-{order + 1:04d}" + + +def _document_id(asset: SourceAsset, metadata: SourceMetadata) -> str: + identifier = next(iter(metadata.identifiers.values()), None) + if identifier: + return f"source.epub3:{identifier}" + if asset.digest: + return f"source.epub3:{asset.digest.removeprefix('sha256:')}" + return f"source.epub3:{asset.name or asset.uri}" + + +def _adapter_info(options: dict[str, Any]) -> dict[str, Any]: + return { + "id": "source.epub3", + "version": "1", + "options": options, + } + + +def _warning( + asset: SourceAsset, + code: str, + message: str, + *, + details: dict[str, Any] | None = None, +) -> Diagnostic: + return Diagnostic( + severity="warning", + code=code, + message=message, + source=SourceLocation(path=asset.path) if asset.path else None, + details=details or {}, + ) + + +def _malformed( + asset: SourceAsset, + message: str, + details: dict[str, Any] | None = None, +) -> Diagnostic: + return Diagnostic( + severity="error", + code="source.malformed", + message=message, + source=SourceLocation(path=asset.path) if asset.path else None, + details=details or {}, + ) + + +def _warning_count(diagnostics: list[Diagnostic]) -> int: + return sum(1 for diagnostic in diagnostics if diagnostic.severity == "warning") + + +def _local_name(tag: str) -> str: + return tag.rsplit("}", 1)[-1] if "}" in tag else tag + + +def _clean_text(text: str) -> str: + cleaned = re.sub(r"\s+", " ", text).strip() + return re.sub(r"\s+([.,;:!?])", r"\1", cleaned) diff --git a/tests/test_epub3_adapter.py b/tests/test_epub3_adapter.py new file mode 100644 index 0000000..2f753d0 --- /dev/null +++ b/tests/test_epub3_adapter.py @@ -0,0 +1,206 @@ +from pathlib import Path +import zipfile + +from markitect_tool.source import ( + SourceAdapterMatchRequest, + SourceAdapterRegistry, + SourceAsset, + SourceInspectRequest, + SourceReadRequest, + discover_source_adapters, + inspect_source, + normalize_source, +) + +from markitect_filter.adapters import epub3_adapter_descriptor + + +class FakeEntryPoint: + name = "epub3" + + def load(self): + return epub3_adapter_descriptor + + +def test_epub3_descriptor_matches_contract(): + descriptor = epub3_adapter_descriptor() + + assert descriptor.id == "source.epub3" + assert descriptor.operations == ["read"] + assert descriptor.media_types == ["application/epub+zip"] + assert descriptor.extensions == [".epub"] + assert descriptor.safety["network"] is False + assert descriptor.option_schema["properties"]["skip_boilerplate"]["default"] is True + + +def test_epub3_adapter_matches_epub_assets(tmp_path: Path): + epub_path = _write_epub(tmp_path) + asset = SourceAsset.from_path(epub_path, media_type="application/epub+zip") + adapter = epub3_adapter_descriptor().instantiate() + + match = adapter.can_read(SourceAdapterMatchRequest(asset=asset)) + + assert match.matched + assert match.confidence == 100 + + +def test_epub3_adapter_inspects_metadata(tmp_path: Path): + epub_path = _write_epub(tmp_path) + asset = SourceAsset.from_path(epub_path, media_type="application/epub+zip") + adapter = epub3_adapter_descriptor().instantiate() + + result = adapter.inspect(SourceInspectRequest(asset=asset)) + + assert result.is_valid + assert result.metadata.title == "Test Book" + assert result.metadata.creators == ["Ada Lovelace"] + assert result.metadata.language == "en" + assert result.metadata.identifiers["bookid"] == "urn:test-book" + assert result.quality.lossiness == "low" + + +def test_epub3_adapter_normalizes_spine_to_markdown(tmp_path: Path): + epub_path = _write_epub(tmp_path) + asset = SourceAsset.from_path(epub_path, media_type="application/epub+zip") + adapter = epub3_adapter_descriptor().instantiate() + + result = adapter.read(SourceReadRequest(asset=asset, options={"skip_boilerplate": True})) + + assert result.is_valid + assert result.document is not None + assert result.document.document_id == "source.epub3:urn:test-book" + assert result.document.metadata.title == "Test Book" + assert result.document.markdown == ( + "# Opening\n\n" + "First paragraph with emphasis.\n\n" + "- First point\n\n" + "- Second point\n\n" + "## Continuation\n\n" + "Second chapter text." + ) + assert [segment.segment_id for segment in result.document.segments] == [ + "opening", + "continuation", + ] + assert result.document.segments[0].provenance[0].package_path == "EPUB/chapter1.xhtml" + assert result.document.quality.lossiness == "none" + + +def test_markitect_api_can_use_epub3_registry(tmp_path: Path): + epub_path = _write_epub(tmp_path) + registry = SourceAdapterRegistry([epub3_adapter_descriptor()]) + + inspected = inspect_source(epub_path, registry=registry) + normalized = normalize_source(epub_path, registry=registry) + + assert inspected.is_valid + assert inspected.metadata.title == "Test Book" + assert normalized.is_valid + assert normalized.document is not None + assert normalized.document.segments[1].heading == "Continuation" + + +def test_epub3_adapter_reports_malformed_missing_container(tmp_path: Path): + epub_path = tmp_path / "broken.epub" + with zipfile.ZipFile(epub_path, "w") as archive: + archive.writestr("mimetype", "application/epub+zip") + asset = SourceAsset.from_path(epub_path, media_type="application/epub+zip") + adapter = epub3_adapter_descriptor().instantiate() + + result = adapter.read(SourceReadRequest(asset=asset)) + + assert not result.is_valid + assert result.diagnostics[0].code == "source.malformed" + assert "container.xml" in result.diagnostics[0].message + + +def test_epub3_entry_point_discovery_shape(): + registry = discover_source_adapters([FakeEntryPoint()]) + + assert registry.get("source.epub3").name == "EPUB3" + + +def _write_epub(tmp_path: Path) -> Path: + epub_path = tmp_path / "test-book.epub" + with zipfile.ZipFile(epub_path, "w") as archive: + archive.writestr("mimetype", "application/epub+zip") + archive.writestr( + "META-INF/container.xml", + """ + + + + + +""", + ) + archive.writestr( + "EPUB/content.opf", + """ + + + urn:test-book + Test Book + Ada Lovelace + en + Markitect Fixtures + 2026-05-14 + + + + + + + + + + + +""", + ) + archive.writestr( + "EPUB/nav.xhtml", + """ + + + + + +""", + ) + archive.writestr( + "EPUB/chapter1.xhtml", + """ + + +
+

Opening

+

First paragraph with emphasis.

+ +
+ + +""", + ) + archive.writestr( + "EPUB/chapter2.xhtml", + """ + + +
+

Continuation

+

Second chapter text.

+
+ + +""", + ) + return epub_path diff --git a/workplans/MKTF-WP-0001-epub3-read-adapter.md b/workplans/MKTF-WP-0001-epub3-read-adapter.md new file mode 100644 index 0000000..2b75af3 --- /dev/null +++ b/workplans/MKTF-WP-0001-epub3-read-adapter.md @@ -0,0 +1,56 @@ +--- +id: MKTF-WP-0001 +type: workplan +title: "EPUB3 Read Adapter" +domain: markitect +status: done +owner: markitect-filter +topic_slug: markitect +planning_priority: complete +planning_order: 10 +depends_on_workplans: + - MKTT-WP-0018 +created: "2026-05-14" +updated: "2026-05-14" +--- + +# MKTF-WP-0001: EPUB3 Read Adapter + +## Purpose + +Implement the first concrete `markitect-filter` source adapter: +`source.epub3`, a read-only EPUB3 adapter that satisfies the +`markitect-tool` source adapter contract. + +## Implemented Scope + +- Python package scaffold with `pyproject.toml`. +- Entry point group registration: + `markitect_tool.source_adapters`. +- Lightweight `epub3_adapter_descriptor`. +- Stdlib-only EPUB3 package reading with `zipfile` and `ElementTree`. +- `META-INF/container.xml` rootfile discovery. +- OPF metadata, manifest, and spine extraction. +- EPUB nav label extraction. +- XHTML body extraction into ordered Markdown segments. +- Source provenance with package paths, hrefs, anchors, and section labels. +- Structured diagnostics for malformed EPUBs, skipped boilerplate, missing + spine items, unsupported media, and malformed XML. +- Tests for descriptor shape, matching, inspection, normalization, malformed + packages, Markitect API registry use, and entry point shape. + +## Non-Goals + +- PDF, DOCX, ODT, OCR, or browser extraction. +- Write/export adapters. +- Network fetching. +- Styling-preserving conversion. +- Image extraction beyond future metadata/attachment handling. + +## Validation + +Run from `markitect-filter`: + +```bash +PYTHONPATH=src:/home/worsch/markitect-tool/src python3 -m pytest +```