IB-WP-0016-T01: spine-aware EPUB3 intake

Parse META-INF/container.xml and the OPF package document, then iterate
documents in spine reading order instead of archive-name sort. Classify
each spine item (body, cover, nav, toc, header, footer, notes, license,
auxiliary) and exclude non-body sections by default; include_non_body=True
opts them back in for inspection. Capture OPF book metadata (title,
creator, language, subjects, rights, identifier, source_url, modified)
onto every chunk and propagate it through source artifact provenance.
Preserve the legacy zip-without-OPF fallback for malformed EPUBs.

Real Lefevre EPUB now yields 148 body chunks in spine order (was 155
mixed, archive-sorted) with cover=1, header=1, footer=4 detected and
dropped. 78 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-17 13:52:24 +02:00
parent ead2f335f3
commit 5b6a63fb7a
5 changed files with 496 additions and 35 deletions

View File

@@ -4,21 +4,39 @@ import hashlib
import html
import re
import zipfile
from dataclasses import asdict, dataclass
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable
from xml.etree import ElementTree as ET
from .errors import InfospaceError
from .semantics import slugify
EXTRACTOR_VERSION = "generic-source-intake-v1"
EXTRACTOR_VERSION = "generic-source-intake-v2"
SUPPORTED_EXTENSIONS = {".md", ".markdown", ".txt", ".html", ".htm", ".epub"}
HTML_TITLE_RE = re.compile(r"<title[^>]*>(?P<title>.*?)</title>", re.I | re.S)
HTML_H1_RE = re.compile(r"<h1[^>]*>(?P<title>.*?)</h1>", re.I | re.S)
SCRIPT_STYLE_RE = re.compile(r"<(script|style)[^>]*>.*?</\1>", re.I | re.S)
TAG_RE = re.compile(r"<[^>]+>")
OPF_NS = "http://www.idpf.org/2007/opf"
DC_NS = "http://purl.org/dc/elements/1.1/"
CONTAINER_NS = "urn:oasis:names:tc:opendocument:xmlns:container"
SECTION_ROLE_BODY = "body"
SECTION_ROLE_COVER = "cover"
SECTION_ROLE_NAV = "nav"
SECTION_ROLE_TOC = "toc"
SECTION_ROLE_HEADER = "header"
SECTION_ROLE_FOOTER = "footer"
SECTION_ROLE_NOTES = "notes"
SECTION_ROLE_LICENSE = "license"
SECTION_ROLE_AUXILIARY = "auxiliary"
PG_START_MARKERS = ("*** START OF THE PROJECT GUTENBERG EBOOK", "*** START OF THIS PROJECT GUTENBERG EBOOK")
PG_END_MARKERS = ("*** END OF THE PROJECT GUTENBERG EBOOK", "*** END OF THIS PROJECT GUTENBERG EBOOK")
@dataclass(frozen=True)
class SourceChunk:
@@ -32,6 +50,9 @@ class SourceChunk:
chunk_count: int
imported_at: str
extractor_version: str = EXTRACTOR_VERSION
section_role: str = SECTION_ROLE_BODY
spine_index: int | None = None
book_metadata: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return asdict(self)
@@ -44,6 +65,23 @@ class _SourceDocument:
source_type: str
original_path: str
base_slug: str
section_role: str = SECTION_ROLE_BODY
spine_index: int | None = None
book_metadata: dict = field(default_factory=dict)
@dataclass(frozen=True)
class _EpubManifestItem:
item_id: str
href: str
media_type: str
properties: frozenset
@dataclass(frozen=True)
class _EpubSpineEntry:
item_id: str
linear: bool
def normalize_source(
@@ -51,6 +89,7 @@ def normalize_source(
*,
max_words: int = 800,
max_chunks: int | None = None,
include_non_body: bool = False,
) -> list[SourceChunk]:
source_path = Path(source)
if not source_path.exists():
@@ -59,7 +98,7 @@ def normalize_source(
f"Source path does not exist: {source_path}",
{"source": str(source_path)},
)
documents = list(_iter_documents(source_path))
documents = list(_iter_documents(source_path, include_non_body=include_non_body))
if not documents:
raise InfospaceError(
"unsupported_source",
@@ -91,6 +130,9 @@ def normalize_source(
chunk_index=index,
chunk_count=len(pieces),
imported_at=imported_at,
section_role=document.section_role,
spine_index=document.spine_index,
book_metadata=dict(document.book_metadata),
)
)
if max_chunks is not None and max_chunks > 0 and len(chunks) >= max_chunks:
@@ -98,11 +140,13 @@ def normalize_source(
return chunks
def _iter_documents(source_path: Path) -> Iterable[_SourceDocument]:
def _iter_documents(
source_path: Path, *, include_non_body: bool
) -> Iterable[_SourceDocument]:
if source_path.is_dir():
for path in sorted(source_path.rglob("*")):
if path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS:
yield from _iter_documents(path)
yield from _iter_documents(path, include_non_body=include_non_body)
return
suffix = source_path.suffix.lower()
@@ -113,7 +157,7 @@ def _iter_documents(source_path: Path) -> Iterable[_SourceDocument]:
elif suffix in (".html", ".htm"):
yield _html_document(source_path, source_type="html")
elif suffix == ".epub":
yield from _epub_documents(source_path)
yield from _epub_documents(source_path, include_non_body=include_non_body)
def _markdown_document(path: Path) -> _SourceDocument:
@@ -163,35 +207,18 @@ def _html_document(
)
def _epub_documents(path: Path) -> Iterable[_SourceDocument]:
def _epub_documents(
path: Path, *, include_non_body: bool
) -> Iterable[_SourceDocument]:
try:
with zipfile.ZipFile(path) as archive:
names = [
name
for name in sorted(archive.namelist())
if Path(name).suffix.lower() in {".html", ".htm", ".xhtml", ".txt", ".md"}
and not name.endswith("/")
]
for name in names:
raw = archive.read(name).decode("utf-8", errors="replace")
pseudo_path = Path(name)
if pseudo_path.suffix.lower() in {".txt", ".md"}:
title = _markdown_title(raw) or _title_from_path(pseudo_path)
markdown = _ensure_h1(_normalize_newlines(raw).strip() + "\n", title)
yield _SourceDocument(
title=title,
markdown=markdown,
source_type="epub",
original_path=f"{path}!{name}",
base_slug=slugify(title) or slugify(pseudo_path.stem) or "source",
)
else:
yield _html_document(
pseudo_path,
source_type="epub",
original_path=f"{path}!{name}",
text=raw,
)
opf_path = _resolve_opf_path(archive)
if opf_path is not None:
yield from _epub3_spine_documents(
archive, path, opf_path, include_non_body=include_non_body
)
else:
yield from _epub_legacy_documents(archive, path)
except zipfile.BadZipFile as exc:
raise InfospaceError(
"invalid_epub_source",
@@ -200,6 +227,243 @@ def _epub_documents(path: Path) -> Iterable[_SourceDocument]:
) from exc
def _resolve_opf_path(archive: zipfile.ZipFile) -> str | None:
try:
raw = archive.read("META-INF/container.xml")
except KeyError:
return None
try:
root = ET.fromstring(raw)
except ET.ParseError:
return None
rootfile = root.find(f"{{{CONTAINER_NS}}}rootfiles/{{{CONTAINER_NS}}}rootfile")
if rootfile is None:
return None
full_path = rootfile.attrib.get("full-path")
if not full_path:
return None
if full_path not in archive.namelist():
return None
return full_path
def _parse_opf(
archive: zipfile.ZipFile, opf_path: str
) -> tuple[dict, dict[str, _EpubManifestItem], list[_EpubSpineEntry]]:
raw = archive.read(opf_path).decode("utf-8", errors="replace")
root = ET.fromstring(raw)
metadata = _parse_opf_metadata(root)
base = _zip_dirname(opf_path)
manifest: dict[str, _EpubManifestItem] = {}
for item in root.findall(f"{{{OPF_NS}}}manifest/{{{OPF_NS}}}item"):
href = item.attrib.get("href", "")
item_id = item.attrib.get("id", "")
if not href or not item_id:
continue
manifest[item_id] = _EpubManifestItem(
item_id=item_id,
href=_join_zip_path(base, href),
media_type=item.attrib.get("media-type", ""),
properties=frozenset((item.attrib.get("properties") or "").split()),
)
spine: list[_EpubSpineEntry] = []
for entry in root.findall(f"{{{OPF_NS}}}spine/{{{OPF_NS}}}itemref"):
idref = entry.attrib.get("idref")
if not idref:
continue
spine.append(
_EpubSpineEntry(
item_id=idref,
linear=entry.attrib.get("linear", "yes") != "no",
)
)
return metadata, manifest, spine
def _parse_opf_metadata(opf_root: ET.Element) -> dict:
md = opf_root.find(f"{{{OPF_NS}}}metadata")
if md is None:
return {}
def _first_text(tag: str) -> str:
el = md.find(f"{{{DC_NS}}}{tag}")
return _collapse_ws(el.text) if el is not None and el.text else ""
def _all_text(tag: str) -> list[str]:
return [
_collapse_ws(el.text)
for el in md.findall(f"{{{DC_NS}}}{tag}")
if el is not None and el.text
]
out: dict = {}
title = _first_text("title")
if title:
out["title"] = title
creators = _all_text("creator")
if creators:
out["creator"] = creators[0]
if len(creators) > 1:
out["creators"] = creators
language = _first_text("language")
if language:
out["language"] = language
rights = _first_text("rights")
if rights:
out["rights"] = rights
subjects = _all_text("subject")
if subjects:
out["subjects"] = subjects
identifier = _first_text("identifier")
if identifier:
out["identifier"] = identifier
source_url = _first_text("source")
if source_url:
out["source_url"] = source_url
for meta in md.findall(f"{{{OPF_NS}}}meta"):
prop = meta.attrib.get("property", "")
text = _collapse_ws(meta.text) if meta.text else ""
if not text:
continue
if prop == "dcterms:modified":
out["modified"] = text
elif prop == "dcterms:source" and "source_url" not in out:
out["source_url"] = text
return out
def _epub3_spine_documents(
archive: zipfile.ZipFile,
source_path: Path,
opf_path: str,
*,
include_non_body: bool,
) -> Iterable[_SourceDocument]:
metadata, manifest, spine = _parse_opf(archive, opf_path)
book_title = metadata.get("title") or _title_from_path(source_path)
book_slug = slugify(book_title) or slugify(source_path.stem) or "ebook"
for spine_index, entry in enumerate(spine):
item = manifest.get(entry.item_id)
if item is None or not item.href:
continue
try:
raw = archive.read(item.href).decode("utf-8", errors="replace")
except KeyError:
continue
role = _classify_section(item, entry, raw)
if role != SECTION_ROLE_BODY and not include_non_body:
continue
suffix = Path(item.href).suffix.lower()
if suffix in {".txt", ".md"}:
title = _markdown_title(raw) or _title_from_path(Path(item.href))
markdown_body = _ensure_h1(_normalize_newlines(raw).strip() + "\n", title)
else:
title = _html_title(raw) or _title_from_path(Path(item.href))
text = _html_to_text(raw)
if text.lower().startswith(title.lower()):
text = text[len(title) :].strip()
markdown_body = f"# {title}\n\n{text}\n"
section_slug = (
slugify(title)
or slugify(Path(item.href).stem)
or f"section-{spine_index + 1:03d}"
)
base_slug = f"{book_slug}-{spine_index + 1:03d}-{section_slug}"
yield _SourceDocument(
title=title,
markdown=markdown_body,
source_type="epub",
original_path=f"{source_path}!{item.href}",
base_slug=base_slug,
section_role=role,
spine_index=spine_index,
book_metadata=metadata,
)
def _epub_legacy_documents(
archive: zipfile.ZipFile, source_path: Path
) -> Iterable[_SourceDocument]:
names = [
name
for name in sorted(archive.namelist())
if Path(name).suffix.lower() in {".html", ".htm", ".xhtml", ".txt", ".md"}
and not name.endswith("/")
]
for name in names:
raw = archive.read(name).decode("utf-8", errors="replace")
pseudo_path = Path(name)
if pseudo_path.suffix.lower() in {".txt", ".md"}:
title = _markdown_title(raw) or _title_from_path(pseudo_path)
markdown = _ensure_h1(_normalize_newlines(raw).strip() + "\n", title)
yield _SourceDocument(
title=title,
markdown=markdown,
source_type="epub",
original_path=f"{source_path}!{name}",
base_slug=slugify(title) or slugify(pseudo_path.stem) or "source",
)
else:
yield _html_document(
pseudo_path,
source_type="epub",
original_path=f"{source_path}!{name}",
text=raw,
)
def _classify_section(
item: _EpubManifestItem,
spine_entry: _EpubSpineEntry,
content: str,
) -> str:
name = Path(item.href).name.lower()
if "nav" in item.properties:
return SECTION_ROLE_NAV
if "cover-image" in item.properties:
return SECTION_ROLE_COVER
if name.startswith("cover") or "titlepage" in name:
return SECTION_ROLE_COVER
doc_title = re.sub(r"[^a-z0-9 ]+", "", _html_title(content).lower()).strip()
if doc_title in {"cover", "cover page", "title page", "titlepage"}:
return SECTION_ROLE_COVER
if name.startswith("nav"):
return SECTION_ROLE_NAV
if "toc" in name or "contents" in name:
return SECTION_ROLE_TOC
if "license" in name or "copyright" in name or "rights" in name:
return SECTION_ROLE_LICENSE
if "transcriber" in name or "notes" in name:
return SECTION_ROLE_NOTES
upper = content.upper()
if any(marker in upper for marker in PG_START_MARKERS):
return SECTION_ROLE_HEADER
if any(marker in upper for marker in PG_END_MARKERS):
return SECTION_ROLE_FOOTER
if "pgheader" in name or "pg-header" in name or "gutenberg-header" in name:
return SECTION_ROLE_HEADER
if "pgfooter" in name or "pg-footer" in name or "gutenberg-footer" in name:
return SECTION_ROLE_FOOTER
if not spine_entry.linear:
return SECTION_ROLE_AUXILIARY
return SECTION_ROLE_BODY
def _zip_dirname(zip_path: str) -> str:
normalized = zip_path.replace("\\", "/")
if "/" not in normalized:
return ""
return normalized.rsplit("/", 1)[0]
def _join_zip_path(base: str, href: str) -> str:
base = base.replace("\\", "/").strip("/")
href = href.replace("\\", "/").lstrip("/")
if not base or base == ".":
return href
return f"{base}/{href}"
def _chunk_markdown(markdown: str, *, max_words: int) -> list[str]:
text = markdown.strip()
if max_words <= 0: