IB-WP-0016-T02: chapter-aware chunking and stable IDs

Resolve chapter labels from EPUB nav entries (when present) and from the
first in-document h1/h2/h3 heading, parse roman-numeral and "Chapter N"
labels into numeric chapter indices, and generate stable IDs of the form
chapter-NN with -part-NNN suffix when a chapter exceeds max_words. The
chunker now operates on cleaned body text, distributes id="Page_*" page
anchors per part via inline markers extracted before splitting, and
supports a configurable overlap_words evidence window between adjacent
parts of the same chapter. Reclassify body sections whose chapter label
matches contents/transcriber-notes/license/colophon tokens so they leave
the body stream by default. Strip <head>...</head> from HTML body
extraction to stop the <title> tag from duplicating heading text in the
chunk markdown.

Real Lefevre EPUB now detects all 24 roman-numeral chapters with stable
chapter-NN IDs, distributes Page_N anchors across multi-part chapters,
and reclassifies Contents and Transcriber's Notes out of body
(role histogram body=67, cover=1, header=1, toc=1, notes=1, footer=2).
82 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-17 15:52:47 +02:00
parent ef19aa6de7
commit b9173b6569
5 changed files with 449 additions and 36 deletions

View File

@@ -13,16 +13,30 @@ from xml.etree import ElementTree as ET
from .errors import InfospaceError
from .semantics import slugify
EXTRACTOR_VERSION = "generic-source-intake-v2"
EXTRACTOR_VERSION = "generic-source-intake-v3"
SUPPORTED_EXTENSIONS = {".md", ".markdown", ".txt", ".html", ".htm", ".epub"}
HTML_TITLE_RE = re.compile(r"<title[^>]*>(?P<title>.*?)</title>", re.I | re.S)
HTML_H1_RE = re.compile(r"<h1[^>]*>(?P<title>.*?)</h1>", re.I | re.S)
HTML_FIRST_HEADING_RE = re.compile(
r"<(?P<tag>h[1-6])[^>]*>(?P<title>.*?)</(?P=tag)>", re.I | re.S
)
SCRIPT_STYLE_RE = re.compile(r"<(script|style)[^>]*>.*?</\1>", re.I | re.S)
TAG_RE = re.compile(r"<[^>]+>")
ANCHOR_OPEN_TAG_RE = re.compile(
r"""<(?P<name>[a-zA-Z][a-zA-Z0-9:-]*)[^>]*\bid=(?:"(?P<danchor>(?:Page_|page-|pg-|p-)[^"]*)"|'(?P<sanchor>(?:Page_|page-|pg-|p-)[^']*)')[^>]*>""",
re.I,
)
ANCHOR_MARKER_RE = re.compile(r"⟦anchor:(?P<anchor>[^⟧]+)⟧")
ROMAN_NUMERAL_RE = re.compile(r"^([MDCLXVI]+)\.?$", re.I)
CHAPTER_NUMBER_RE = re.compile(
r"^chapter\s+(?P<value>[ivxlcdm]+|\d+)\b",
re.I,
)
OPF_NS = "http://www.idpf.org/2007/opf"
DC_NS = "http://purl.org/dc/elements/1.1/"
CONTAINER_NS = "urn:oasis:names:tc:opendocument:xmlns:container"
XHTML_NS = "http://www.w3.org/1999/xhtml"
SECTION_ROLE_BODY = "body"
SECTION_ROLE_COVER = "cover"
@@ -34,6 +48,24 @@ SECTION_ROLE_NOTES = "notes"
SECTION_ROLE_LICENSE = "license"
SECTION_ROLE_AUXILIARY = "auxiliary"
TOC_LABEL_TOKENS = {"contents", "table of contents", "toc"}
NOTES_LABEL_TOKENS = {
"transcribers notes",
"transcriber notes",
"transcribers note",
"transcribers comments",
"editors notes",
"editor notes",
}
LICENSE_LABEL_TOKENS = {
"license",
"project gutenberg license",
"the project gutenberg license",
"license terms",
"copyright",
"colophon",
}
PG_START_MARKERS = ("*** START OF THE PROJECT GUTENBERG EBOOK", "*** START OF THIS PROJECT GUTENBERG EBOOK")
PG_END_MARKERS = ("*** END OF THE PROJECT GUTENBERG EBOOK", "*** END OF THIS PROJECT GUTENBERG EBOOK")
@@ -53,6 +85,9 @@ class SourceChunk:
section_role: str = SECTION_ROLE_BODY
spine_index: int | None = None
book_metadata: dict = field(default_factory=dict)
chapter_label: str | None = None
chapter_number: int | None = None
page_anchors: tuple = ()
def to_dict(self) -> dict:
return asdict(self)
@@ -68,6 +103,8 @@ class _SourceDocument:
section_role: str = SECTION_ROLE_BODY
spine_index: int | None = None
book_metadata: dict = field(default_factory=dict)
chapter_label: str | None = None
chapter_number: int | None = None
@dataclass(frozen=True)
@@ -90,6 +127,7 @@ def normalize_source(
max_words: int = 800,
max_chunks: int | None = None,
include_non_body: bool = False,
overlap_words: int = 0,
) -> list[SourceChunk]:
source_path = Path(source)
if not source_path.exists():
@@ -112,27 +150,31 @@ def normalize_source(
chunks: list[SourceChunk] = []
used_ids: set[str] = set()
for document in documents:
pieces = _chunk_markdown(document.markdown, max_words=max_words)
for index, piece in enumerate(pieces):
title = document.title if len(pieces) == 1 else f"{document.title} Part {index + 1}"
pieces = _split_document(document, max_words=max_words, overlap_words=overlap_words)
for index, (part_title, part_markdown, part_anchors) in enumerate(pieces):
base_id = (
document.base_slug if len(pieces) == 1 else f"{document.base_slug}-part-{index + 1:03d}"
document.base_slug
if len(pieces) == 1
else f"{document.base_slug}-part-{index + 1:03d}"
)
chunk_id = _dedupe_chunk_id(base_id, used_ids)
chunks.append(
SourceChunk(
chunk_id=chunk_id,
title=title,
markdown=piece,
title=part_title,
markdown=part_markdown,
source_type=document.source_type,
original_path=document.original_path,
digest=_digest_text(piece),
digest=_digest_text(part_markdown),
chunk_index=index,
chunk_count=len(pieces),
imported_at=imported_at,
section_role=document.section_role,
spine_index=document.spine_index,
book_metadata=dict(document.book_metadata),
chapter_label=document.chapter_label,
chapter_number=document.chapter_number,
page_anchors=tuple(part_anchors),
)
)
if max_chunks is not None and max_chunks > 0 and len(chunks) >= max_chunks:
@@ -342,6 +384,9 @@ def _epub3_spine_documents(
metadata, manifest, spine = _parse_opf(archive, opf_path)
book_title = metadata.get("title") or _title_from_path(source_path)
book_slug = slugify(book_title) or slugify(source_path.stem) or "ebook"
nav_labels = _load_nav_labels(archive, manifest)
chapter_counter = 0
used_chapter_numbers: set[int] = set()
for spine_index, entry in enumerate(spine):
item = manifest.get(entry.item_id)
if item is None or not item.href:
@@ -351,24 +396,59 @@ def _epub3_spine_documents(
except KeyError:
continue
role = _classify_section(item, entry, raw)
nav_label = nav_labels.get(item.href, "")
heading_label = _first_heading_text(raw) if Path(item.href).suffix.lower() not in {".txt", ".md"} else ""
chapter_label = nav_label or heading_label or None
# Reclassify body sections whose chapter label matches known noise tokens
if role == SECTION_ROLE_BODY and chapter_label:
normalized_label = re.sub(r"[^a-z0-9 ]+", "", chapter_label.lower()).strip()
if normalized_label in TOC_LABEL_TOKENS:
role = SECTION_ROLE_TOC
elif normalized_label in NOTES_LABEL_TOKENS:
role = SECTION_ROLE_NOTES
elif normalized_label in LICENSE_LABEL_TOKENS:
role = SECTION_ROLE_LICENSE
if role != SECTION_ROLE_BODY and not include_non_body:
continue
chapter_number: int | None = None
if role == SECTION_ROLE_BODY and chapter_label:
chapter_number = _parse_chapter_number(chapter_label)
if role == SECTION_ROLE_BODY and chapter_number is None:
# Fall back to sequential body counter when chapter label exists but
# is not a roman/arabic numeral (e.g. "Preface").
if chapter_label:
chapter_counter += 1
# Use sequential only when no other body has claimed this slot;
# roman-numeral chapters take precedence and may overlap, so we
# leave chapter_number=None for non-numeric labels and let the
# slug fall back to the label slug.
if chapter_number is not None:
if chapter_number in used_chapter_numbers:
# Duplicate numeric label across the book — keep label, drop the
# numeric slot so the slug falls back to label-based naming.
chapter_number = None
else:
used_chapter_numbers.add(chapter_number)
suffix = Path(item.href).suffix.lower()
if suffix in {".txt", ".md"}:
title = _markdown_title(raw) or _title_from_path(Path(item.href))
title = chapter_label or _markdown_title(raw) or _title_from_path(Path(item.href))
markdown_body = _ensure_h1(_normalize_newlines(raw).strip() + "\n", title)
else:
title = _html_title(raw) or _title_from_path(Path(item.href))
text = _html_to_text(raw)
title = chapter_label or _html_title(raw) or _title_from_path(Path(item.href))
marked = _inject_anchor_markers(raw)
text = _html_to_text(marked)
if text.lower().startswith(title.lower()):
text = text[len(title) :].strip()
markdown_body = f"# {title}\n\n{text}\n"
section_slug = (
slugify(title)
or slugify(Path(item.href).stem)
or f"section-{spine_index + 1:03d}"
base_slug = _chapter_base_slug(
role=role,
chapter_number=chapter_number,
chapter_label=chapter_label,
book_slug=book_slug,
spine_index=spine_index,
href=item.href,
title=title,
)
base_slug = f"{book_slug}-{spine_index + 1:03d}-{section_slug}"
yield _SourceDocument(
title=title,
markdown=markdown_body,
@@ -378,9 +458,119 @@ def _epub3_spine_documents(
section_role=role,
spine_index=spine_index,
book_metadata=metadata,
chapter_label=chapter_label,
chapter_number=chapter_number,
)
def _chapter_base_slug(
*,
role: str,
chapter_number: int | None,
chapter_label: str | None,
book_slug: str,
spine_index: int,
href: str,
title: str,
) -> str:
if role == SECTION_ROLE_BODY and chapter_number is not None:
return f"chapter-{chapter_number:02d}"
if role == SECTION_ROLE_BODY and chapter_label:
return f"chapter-{slugify(chapter_label) or f'section-{spine_index + 1:03d}'}"
section_slug = (
slugify(title)
or slugify(Path(href).stem)
or f"section-{spine_index + 1:03d}"
)
return f"{book_slug}-{spine_index + 1:03d}-{section_slug}"
def _load_nav_labels(
archive: zipfile.ZipFile, manifest: dict[str, _EpubManifestItem]
) -> dict[str, str]:
nav_item = next(
(item for item in manifest.values() if "nav" in item.properties),
None,
)
if nav_item is None:
return {}
try:
raw = archive.read(nav_item.href).decode("utf-8", errors="replace")
except KeyError:
return {}
base = _zip_dirname(nav_item.href)
labels: dict[str, str] = {}
pattern = re.compile(
r"""<a[^>]*\bhref=(?:"(?P<dhref>[^"#]+)(?:#[^"]*)?"|'(?P<shref>[^'#]+)(?:#[^']*)?')[^>]*>(?P<label>.*?)</a>""",
re.I | re.S,
)
for match in pattern.finditer(raw):
href_attr = match.group("dhref") or match.group("shref") or ""
if not href_attr:
continue
label = _collapse_ws(_html_to_text(match.group("label")))
if not label:
continue
resolved = _join_zip_path(base, href_attr)
labels.setdefault(resolved, label)
return labels
def _first_heading_text(html_raw: str) -> str:
match = HTML_FIRST_HEADING_RE.search(html_raw)
if not match:
return ""
return _collapse_ws(_html_to_text(match.group("title")))
def _parse_chapter_number(label: str) -> int | None:
stripped = label.strip()
if not stripped:
return None
if stripped.isdigit():
return int(stripped)
roman_match = ROMAN_NUMERAL_RE.match(stripped)
if roman_match:
value = _roman_to_int(roman_match.group(1).upper())
if value > 0:
return value
chapter_match = CHAPTER_NUMBER_RE.match(stripped)
if chapter_match:
token = chapter_match.group("value")
if token.isdigit():
return int(token)
value = _roman_to_int(token.upper())
if value > 0:
return value
return None
def _roman_to_int(value: str) -> int:
table = {"I": 1, "V": 5, "X": 10, "L": 50, "C": 100, "D": 500, "M": 1000}
total = 0
prev = 0
for ch in reversed(value):
cur = table.get(ch, 0)
if cur == 0:
return 0
if cur < prev:
total -= cur
else:
total += cur
prev = cur
return total
def _inject_anchor_markers(raw: str) -> str:
def repl(match: re.Match) -> str:
anchor = match.group("danchor") or match.group("sanchor") or ""
if not anchor:
return match.group(0)
return f"{match.group(0)} ⟦anchor:{anchor}"
return ANCHOR_OPEN_TAG_RE.sub(repl, raw)
def _epub_legacy_documents(
archive: zipfile.ZipFile, source_path: Path
) -> Iterable[_SourceDocument]:
@@ -464,20 +654,72 @@ def _join_zip_path(base: str, href: str) -> str:
return f"{base}/{href}"
def _chunk_markdown(markdown: str, *, max_words: int) -> list[str]:
text = markdown.strip()
if max_words <= 0:
return [text + "\n"]
words = text.split()
if len(words) <= max_words:
return [text + "\n"]
chunks: list[str] = []
heading = _markdown_title(text) or "Source"
body_words = re.sub(r"(?m)^# .+?\n+", "", text, count=1).split()
for start in range(0, len(body_words), max_words):
part = " ".join(body_words[start : start + max_words]).strip()
chunks.append(f"# {heading} Part {len(chunks) + 1}\n\n{part}\n")
return chunks
def _split_document(
document: _SourceDocument,
*,
max_words: int,
overlap_words: int,
) -> list[tuple[str, str, list[str]]]:
text = document.markdown.strip()
heading = _markdown_title(text) or document.title or "Source"
body_with_markers = re.sub(r"(?m)^# .+?\n+", "", text, count=1).strip()
clean_body, anchor_positions = _extract_anchor_positions(body_with_markers)
words = clean_body.split()
if max_words <= 0 or len(words) <= max_words:
anchors = [name for name, _idx in anchor_positions]
return [(document.title, _compose_chunk(heading, clean_body), anchors)]
overlap = max(0, min(overlap_words, max_words - 1))
step = max_words - overlap if overlap > 0 else max_words
parts: list[tuple[str, str, list[str]]] = []
start = 0
while start < len(words):
end = min(start + max_words, len(words))
slice_words = words[start:end]
if not slice_words:
break
part_index = len(parts) + 1
part_text = " ".join(slice_words).strip()
part_anchors = _anchors_in_range(anchor_positions, start, end)
part_title = f"{document.title} Part {part_index}"
parts.append((part_title, _compose_chunk(heading, part_text), part_anchors))
if end >= len(words):
break
start += step
return parts
def _compose_chunk(heading: str, body: str) -> str:
body = body.strip()
if not body:
return f"# {heading}\n"
return f"# {heading}\n\n{body}\n"
def _extract_anchor_positions(text: str) -> tuple[str, list[tuple[str, int]]]:
parts: list[str] = []
anchors: list[tuple[str, int]] = []
cursor = 0
for match in ANCHOR_MARKER_RE.finditer(text):
prefix = text[cursor : match.start()]
parts.append(prefix)
word_index = sum(len(part.split()) for part in parts)
anchors.append((match.group("anchor"), word_index))
cursor = match.end()
parts.append(text[cursor:])
cleaned = re.sub(r"[ \t]{2,}", " ", "".join(parts)).strip()
return cleaned, anchors
def _anchors_in_range(
anchor_positions: list[tuple[str, int]], start: int, end: int
) -> list[str]:
seen: set[str] = set()
found: list[str] = []
for name, idx in anchor_positions:
if start <= idx < end and name not in seen:
seen.add(name)
found.append(name)
return found
def _html_title(raw: str) -> str:
@@ -488,7 +730,8 @@ def _html_title(raw: str) -> str:
def _html_to_text(raw: str) -> str:
cleaned = SCRIPT_STYLE_RE.sub(" ", raw)
cleaned = re.sub(r"<head\b[^>]*>.*?</head>", " ", raw, flags=re.I | re.S)
cleaned = SCRIPT_STYLE_RE.sub(" ", cleaned)
cleaned = re.sub(r"</(p|div|section|article|h[1-6]|li)>", "\n", cleaned, flags=re.I)
cleaned = TAG_RE.sub(" ", cleaned)
cleaned = html.unescape(cleaned)