generated from coulomb/repo-seed
epub3 inbound filter
This commit is contained in:
5
src/markitect_filter/__init__.py
Normal file
5
src/markitect_filter/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""Concrete source-format adapters for Markitect."""
|
||||
|
||||
from markitect_filter.adapters import epub3_adapter_descriptor
|
||||
|
||||
__all__ = ["epub3_adapter_descriptor"]
|
||||
51
src/markitect_filter/adapters.py
Normal file
51
src/markitect_filter/adapters.py
Normal file
@@ -0,0 +1,51 @@
|
||||
"""Adapter descriptors exposed through Markitect entry points."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from markitect_tool.source import SourceAdapterDescriptor
|
||||
|
||||
|
||||
def epub3_adapter_descriptor() -> SourceAdapterDescriptor:
|
||||
"""Return the lightweight EPUB3 read adapter descriptor."""
|
||||
|
||||
def factory():
|
||||
from markitect_filter.epub3 import Epub3ReadAdapter
|
||||
|
||||
return Epub3ReadAdapter()
|
||||
|
||||
return SourceAdapterDescriptor(
|
||||
id="source.epub3",
|
||||
version="1",
|
||||
name="EPUB3",
|
||||
summary="Read EPUB3 packages into canonical Markitect Markdown.",
|
||||
operations=["read"],
|
||||
media_types=["application/epub+zip"],
|
||||
extensions=[".epub"],
|
||||
factory=factory,
|
||||
option_schema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"skip_boilerplate": {
|
||||
"type": "boolean",
|
||||
"default": True,
|
||||
"description": "Skip cover, nav, toc, headers, footers, and similar boilerplate when detected.",
|
||||
}
|
||||
},
|
||||
"additionalProperties": False,
|
||||
},
|
||||
safety={
|
||||
"reads_files": True,
|
||||
"writes_files": False,
|
||||
"network": False,
|
||||
"external_process": False,
|
||||
},
|
||||
quality_profile={
|
||||
"text_extraction": "stdlib-xhtml",
|
||||
"images": "metadata-only",
|
||||
"styles": "ignored",
|
||||
},
|
||||
metadata={
|
||||
"format": "EPUB3",
|
||||
"dependency_profile": "stdlib",
|
||||
},
|
||||
)
|
||||
602
src/markitect_filter/epub3.py
Normal file
602
src/markitect_filter/epub3.py
Normal file
@@ -0,0 +1,602 @@
|
||||
"""EPUB3 read adapter implementation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import posixpath
|
||||
import re
|
||||
import zipfile
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
from markitect_tool.diagnostics import Diagnostic, SourceLocation, has_error
|
||||
from markitect_tool.source import (
|
||||
NormalizationQuality,
|
||||
NormalizedMarkdownDocument,
|
||||
NormalizedMarkdownSegment,
|
||||
SourceAdapterMatch,
|
||||
SourceAdapterMatchRequest,
|
||||
SourceAsset,
|
||||
SourceInspectRequest,
|
||||
SourceInspectResult,
|
||||
SourceMetadata,
|
||||
SourceProvenance,
|
||||
SourceReadRequest,
|
||||
SourceReadResult,
|
||||
normalization_cache_key,
|
||||
)
|
||||
|
||||
from markitect_filter.adapters import epub3_adapter_descriptor
|
||||
|
||||
|
||||
XHTML_MEDIA_TYPES = {
|
||||
"application/xhtml+xml",
|
||||
"text/html",
|
||||
}
|
||||
BOILERPLATE_HINTS = {
|
||||
"cover",
|
||||
"nav",
|
||||
"toc",
|
||||
"table-of-contents",
|
||||
"titlepage",
|
||||
"copyright",
|
||||
"license",
|
||||
"transcriber",
|
||||
"header",
|
||||
"footer",
|
||||
}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class EpubPackage:
|
||||
rootfile_path: str
|
||||
metadata: SourceMetadata
|
||||
manifest: dict[str, dict[str, str]]
|
||||
spine: list[str]
|
||||
nav_labels: dict[str, str]
|
||||
diagnostics: list[Diagnostic]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ExtractedSegment:
|
||||
segment: NormalizedMarkdownSegment
|
||||
diagnostics: list[Diagnostic]
|
||||
|
||||
|
||||
class Epub3ReadAdapter:
|
||||
"""Read EPUB3 packages into normalized Markitect Markdown."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.descriptor = epub3_adapter_descriptor()
|
||||
|
||||
def can_read(self, request: SourceAdapterMatchRequest) -> SourceAdapterMatch:
|
||||
asset = request.asset
|
||||
if asset.media_type == "application/epub+zip":
|
||||
return SourceAdapterMatch(
|
||||
adapter_id=self.descriptor.id,
|
||||
matched=True,
|
||||
confidence=100,
|
||||
reason="media_type",
|
||||
)
|
||||
if asset.extension == ".epub":
|
||||
return SourceAdapterMatch(
|
||||
adapter_id=self.descriptor.id,
|
||||
matched=True,
|
||||
confidence=80,
|
||||
reason="extension",
|
||||
)
|
||||
return SourceAdapterMatch(
|
||||
adapter_id=self.descriptor.id,
|
||||
matched=False,
|
||||
confidence=0,
|
||||
reason="unsupported",
|
||||
)
|
||||
|
||||
def inspect(self, request: SourceInspectRequest) -> SourceInspectResult:
|
||||
package = _load_package(request.asset)
|
||||
diagnostics = package.diagnostics if package else [_malformed(request.asset, "Unable to read EPUB package.")]
|
||||
metadata = package.metadata if package else SourceMetadata()
|
||||
return SourceInspectResult(
|
||||
asset=request.asset,
|
||||
adapter=_adapter_info(request.options),
|
||||
metadata=metadata,
|
||||
capabilities=["read"],
|
||||
quality=NormalizationQuality(
|
||||
lossiness="unknown" if has_error(diagnostics) else "low",
|
||||
confidence=0.9 if not has_error(diagnostics) else 0.0,
|
||||
warnings=_warning_count(diagnostics),
|
||||
),
|
||||
diagnostics=diagnostics,
|
||||
valid=not has_error(diagnostics),
|
||||
)
|
||||
|
||||
def read(self, request: SourceReadRequest) -> SourceReadResult:
|
||||
package = _load_package(request.asset)
|
||||
if package is None or has_error(package.diagnostics):
|
||||
diagnostics = package.diagnostics if package else [_malformed(request.asset, "Unable to read EPUB package.")]
|
||||
return SourceReadResult(diagnostics=diagnostics, valid=False)
|
||||
|
||||
diagnostics = list(package.diagnostics)
|
||||
extracted: list[ExtractedSegment] = []
|
||||
skip_boilerplate = bool(request.options.get("skip_boilerplate", True))
|
||||
try:
|
||||
with zipfile.ZipFile(Path(request.asset.path or request.asset.uri)) as archive:
|
||||
for order, item_id in enumerate(package.spine):
|
||||
item = package.manifest.get(item_id)
|
||||
if item is None:
|
||||
diagnostics.append(
|
||||
_warning(
|
||||
request.asset,
|
||||
"source.epub3.missing_spine_item",
|
||||
f"Spine item `{item_id}` is missing from the manifest.",
|
||||
details={"idref": item_id},
|
||||
)
|
||||
)
|
||||
continue
|
||||
media_type = item.get("media_type", "")
|
||||
href = item.get("href", "")
|
||||
package_path = _resolve_package_path(package.rootfile_path, href)
|
||||
if media_type not in XHTML_MEDIA_TYPES:
|
||||
diagnostics.append(
|
||||
_warning(
|
||||
request.asset,
|
||||
"source.epub3.unsupported_media",
|
||||
f"Skipped unsupported spine media type `{media_type}`.",
|
||||
details={"href": href, "media_type": media_type},
|
||||
)
|
||||
)
|
||||
continue
|
||||
if skip_boilerplate and _is_boilerplate(item):
|
||||
diagnostics.append(
|
||||
_warning(
|
||||
request.asset,
|
||||
"source.epub3.skipped_boilerplate",
|
||||
f"Skipped boilerplate spine item `{href}`.",
|
||||
details={"href": href},
|
||||
)
|
||||
)
|
||||
continue
|
||||
extracted.append(
|
||||
_extract_segment(
|
||||
archive,
|
||||
request.asset,
|
||||
package_path,
|
||||
href,
|
||||
order=len(extracted),
|
||||
nav_label=package.nav_labels.get(href)
|
||||
or package.nav_labels.get(package_path),
|
||||
)
|
||||
)
|
||||
except (OSError, zipfile.BadZipFile) as exc:
|
||||
return SourceReadResult(
|
||||
diagnostics=[_malformed(request.asset, "EPUB is not a readable ZIP package.", {"error": str(exc)})],
|
||||
valid=False,
|
||||
)
|
||||
|
||||
segments = [item.segment for item in extracted if item.segment.markdown.strip()]
|
||||
for item in extracted:
|
||||
diagnostics.extend(item.diagnostics)
|
||||
if not segments:
|
||||
diagnostics.append(_malformed(request.asset, "EPUB did not produce any Markdown segments."))
|
||||
return SourceReadResult(diagnostics=diagnostics, valid=False)
|
||||
|
||||
markdown = "\n\n".join(segment.markdown.strip() for segment in segments)
|
||||
quality = NormalizationQuality(
|
||||
lossiness="low" if _warning_count(diagnostics) else "none",
|
||||
confidence=0.9 if not has_error(diagnostics) else 0.0,
|
||||
skipped_items=sum(1 for diagnostic in diagnostics if diagnostic.code == "source.epub3.skipped_boilerplate"),
|
||||
warnings=_warning_count(diagnostics),
|
||||
metadata={"extraction": "epub3-stdlib-xhtml"},
|
||||
)
|
||||
adapter = _adapter_info(request.options)
|
||||
document = NormalizedMarkdownDocument(
|
||||
document_id=_document_id(request.asset, package.metadata),
|
||||
asset=request.asset,
|
||||
metadata=package.metadata,
|
||||
markdown=markdown,
|
||||
segments=segments,
|
||||
quality=quality,
|
||||
diagnostics=diagnostics,
|
||||
provenance=[
|
||||
SourceProvenance(
|
||||
source_uri=request.asset.uri,
|
||||
source_path=request.asset.path,
|
||||
digest=request.asset.digest,
|
||||
metadata={"rootfile": package.rootfile_path},
|
||||
)
|
||||
],
|
||||
adapter=adapter,
|
||||
cache_key=normalization_cache_key(
|
||||
asset=request.asset,
|
||||
adapter_id=self.descriptor.id,
|
||||
adapter_version=self.descriptor.version,
|
||||
options=request.options,
|
||||
),
|
||||
)
|
||||
return SourceReadResult(document=document, diagnostics=[], valid=not has_error(diagnostics))
|
||||
|
||||
|
||||
def _load_package(asset: SourceAsset) -> EpubPackage | None:
|
||||
diagnostics: list[Diagnostic] = []
|
||||
try:
|
||||
with zipfile.ZipFile(Path(asset.path or asset.uri)) as archive:
|
||||
rootfile_path = _read_container(archive, asset, diagnostics)
|
||||
if not rootfile_path:
|
||||
return EpubPackage("", SourceMetadata(), {}, [], {}, diagnostics)
|
||||
package_xml = _read_xml(archive, rootfile_path, asset, diagnostics)
|
||||
if package_xml is None:
|
||||
return EpubPackage(rootfile_path, SourceMetadata(), {}, [], {}, diagnostics)
|
||||
metadata = _extract_metadata(package_xml)
|
||||
manifest = _extract_manifest(package_xml)
|
||||
spine = _extract_spine(package_xml)
|
||||
nav_labels = _extract_nav_labels(archive, rootfile_path, manifest, asset, diagnostics)
|
||||
if not spine:
|
||||
diagnostics.append(_malformed(asset, "EPUB package does not declare a spine."))
|
||||
return EpubPackage(rootfile_path, metadata, manifest, spine, nav_labels, diagnostics)
|
||||
except (OSError, zipfile.BadZipFile) as exc:
|
||||
return EpubPackage(
|
||||
"",
|
||||
SourceMetadata(),
|
||||
{},
|
||||
[],
|
||||
{},
|
||||
[_malformed(asset, "EPUB is not a readable ZIP package.", {"error": str(exc)})],
|
||||
)
|
||||
|
||||
|
||||
def _read_container(
|
||||
archive: zipfile.ZipFile,
|
||||
asset: SourceAsset,
|
||||
diagnostics: list[Diagnostic],
|
||||
) -> str | None:
|
||||
container = _read_xml(archive, "META-INF/container.xml", asset, diagnostics)
|
||||
if container is None:
|
||||
diagnostics.append(_malformed(asset, "EPUB is missing META-INF/container.xml."))
|
||||
return None
|
||||
for element in container.iter():
|
||||
if _local_name(element.tag) == "rootfile":
|
||||
full_path = element.attrib.get("full-path")
|
||||
if full_path:
|
||||
return full_path
|
||||
diagnostics.append(_malformed(asset, "EPUB container does not declare a rootfile."))
|
||||
return None
|
||||
|
||||
|
||||
def _read_xml(
|
||||
archive: zipfile.ZipFile,
|
||||
path: str,
|
||||
asset: SourceAsset,
|
||||
diagnostics: list[Diagnostic],
|
||||
) -> ET.Element | None:
|
||||
try:
|
||||
return ET.fromstring(archive.read(path))
|
||||
except KeyError:
|
||||
diagnostics.append(
|
||||
_malformed(asset, f"EPUB package entry `{path}` is missing.", {"path": path})
|
||||
)
|
||||
except ET.ParseError as exc:
|
||||
diagnostics.append(
|
||||
_malformed(asset, f"EPUB XML entry `{path}` is malformed.", {"path": path, "error": str(exc)})
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def _extract_metadata(package_xml: ET.Element) -> SourceMetadata:
|
||||
raw: dict[str, Any] = {}
|
||||
titles: list[str] = []
|
||||
creators: list[str] = []
|
||||
identifiers: dict[str, str] = {}
|
||||
language = None
|
||||
rights = None
|
||||
publisher = None
|
||||
publication_date = None
|
||||
for element in package_xml.iter():
|
||||
name = _local_name(element.tag)
|
||||
text = _clean_text("".join(element.itertext()))
|
||||
if not text:
|
||||
continue
|
||||
if name == "title":
|
||||
titles.append(text)
|
||||
elif name == "creator":
|
||||
creators.append(text)
|
||||
elif name == "language" and language is None:
|
||||
language = text
|
||||
elif name == "rights" and rights is None:
|
||||
rights = text
|
||||
elif name == "publisher" and publisher is None:
|
||||
publisher = text
|
||||
elif name == "date" and publication_date is None:
|
||||
publication_date = text
|
||||
elif name == "identifier":
|
||||
identifier_key = element.attrib.get("id") or f"identifier-{len(identifiers) + 1}"
|
||||
identifiers[identifier_key] = text
|
||||
elif name == "meta":
|
||||
key = element.attrib.get("property") or element.attrib.get("name")
|
||||
value = text or element.attrib.get("content")
|
||||
if key and value:
|
||||
raw[key] = value
|
||||
return SourceMetadata(
|
||||
title=titles[0] if titles else None,
|
||||
creators=creators,
|
||||
language=language,
|
||||
rights=rights,
|
||||
publication_date=publication_date,
|
||||
publisher=publisher,
|
||||
identifiers=identifiers,
|
||||
raw=raw,
|
||||
)
|
||||
|
||||
|
||||
def _extract_manifest(package_xml: ET.Element) -> dict[str, dict[str, str]]:
|
||||
manifest: dict[str, dict[str, str]] = {}
|
||||
for element in package_xml.iter():
|
||||
if _local_name(element.tag) != "item":
|
||||
continue
|
||||
item_id = element.attrib.get("id")
|
||||
href = element.attrib.get("href")
|
||||
if not item_id or not href:
|
||||
continue
|
||||
manifest[item_id] = {
|
||||
"id": item_id,
|
||||
"href": href,
|
||||
"media_type": element.attrib.get("media-type", ""),
|
||||
"properties": element.attrib.get("properties", ""),
|
||||
}
|
||||
return manifest
|
||||
|
||||
|
||||
def _extract_spine(package_xml: ET.Element) -> list[str]:
|
||||
spine: list[str] = []
|
||||
in_spine = False
|
||||
for element in package_xml.iter():
|
||||
name = _local_name(element.tag)
|
||||
if name == "spine":
|
||||
in_spine = True
|
||||
for child in list(element):
|
||||
if _local_name(child.tag) == "itemref" and child.attrib.get("idref"):
|
||||
spine.append(child.attrib["idref"])
|
||||
break
|
||||
return spine if in_spine else []
|
||||
|
||||
|
||||
def _extract_nav_labels(
|
||||
archive: zipfile.ZipFile,
|
||||
rootfile_path: str,
|
||||
manifest: dict[str, dict[str, str]],
|
||||
asset: SourceAsset,
|
||||
diagnostics: list[Diagnostic],
|
||||
) -> dict[str, str]:
|
||||
nav_item = next(
|
||||
(
|
||||
item
|
||||
for item in manifest.values()
|
||||
if "nav" in item.get("properties", "").split()
|
||||
),
|
||||
None,
|
||||
)
|
||||
if nav_item is None:
|
||||
return {}
|
||||
nav_path = _resolve_package_path(rootfile_path, nav_item["href"])
|
||||
nav_xml = _read_xml(archive, nav_path, asset, diagnostics)
|
||||
if nav_xml is None:
|
||||
return {}
|
||||
labels: dict[str, str] = {}
|
||||
for element in nav_xml.iter():
|
||||
if _local_name(element.tag) != "a":
|
||||
continue
|
||||
href = element.attrib.get("href")
|
||||
label = _clean_text("".join(element.itertext()))
|
||||
if href and label:
|
||||
labels[href.split("#", 1)[0]] = label
|
||||
labels[_resolve_package_path(rootfile_path, href.split("#", 1)[0])] = label
|
||||
return labels
|
||||
|
||||
|
||||
def _extract_segment(
|
||||
archive: zipfile.ZipFile,
|
||||
asset: SourceAsset,
|
||||
package_path: str,
|
||||
href: str,
|
||||
*,
|
||||
order: int,
|
||||
nav_label: str | None,
|
||||
) -> ExtractedSegment:
|
||||
diagnostics: list[Diagnostic] = []
|
||||
document_xml = _read_xml(archive, package_path, asset, diagnostics)
|
||||
if document_xml is None:
|
||||
return ExtractedSegment(
|
||||
NormalizedMarkdownSegment(
|
||||
segment_id=f"seg-{order + 1:04d}",
|
||||
order=order,
|
||||
markdown="",
|
||||
),
|
||||
diagnostics,
|
||||
)
|
||||
body = _first_descendant(document_xml, "body")
|
||||
if body is None:
|
||||
body = document_xml
|
||||
anchors = _anchors(body)
|
||||
blocks = _element_blocks(body)
|
||||
markdown = "\n\n".join(block for block in blocks if block.strip()).strip()
|
||||
heading, heading_level = _first_heading(body)
|
||||
heading = heading or nav_label
|
||||
segment_id = _segment_id(anchors, order)
|
||||
provenance = SourceProvenance(
|
||||
source_uri=asset.uri,
|
||||
source_path=asset.path,
|
||||
source_href=href,
|
||||
package_path=package_path,
|
||||
anchor=anchors[0] if anchors else None,
|
||||
section=heading,
|
||||
)
|
||||
return ExtractedSegment(
|
||||
NormalizedMarkdownSegment(
|
||||
segment_id=segment_id,
|
||||
order=order,
|
||||
markdown=markdown,
|
||||
heading=heading,
|
||||
heading_level=heading_level,
|
||||
anchors=anchors,
|
||||
provenance=[provenance],
|
||||
metadata={"package_path": package_path, "href": href},
|
||||
),
|
||||
diagnostics,
|
||||
)
|
||||
|
||||
|
||||
def _element_blocks(element: ET.Element) -> list[str]:
|
||||
blocks: list[str] = []
|
||||
for child in list(element):
|
||||
name = _local_name(child.tag)
|
||||
if name in {"script", "style", "head"}:
|
||||
continue
|
||||
if name in {"h1", "h2", "h3", "h4", "h5", "h6"}:
|
||||
text = _inline_text(child)
|
||||
if text:
|
||||
blocks.append(f"{'#' * int(name[1])} {text}")
|
||||
elif name == "p":
|
||||
text = _inline_text(child)
|
||||
if text:
|
||||
blocks.append(text)
|
||||
elif name in {"ul", "ol"}:
|
||||
for item in child:
|
||||
if _local_name(item.tag) == "li":
|
||||
text = _inline_text(item)
|
||||
if text:
|
||||
blocks.append(f"- {text}")
|
||||
elif name == "blockquote":
|
||||
text = _inline_text(child)
|
||||
if text:
|
||||
blocks.append("\n".join(f"> {line}" for line in text.splitlines()))
|
||||
elif name in {"section", "article", "div", "main", "body"}:
|
||||
blocks.extend(_element_blocks(child))
|
||||
elif name in {"br", "hr"}:
|
||||
continue
|
||||
else:
|
||||
blocks.extend(_element_blocks(child))
|
||||
text = _direct_text(child)
|
||||
if text:
|
||||
blocks.append(text)
|
||||
return blocks
|
||||
|
||||
|
||||
def _inline_text(element: ET.Element) -> str:
|
||||
return _clean_text(" ".join(part for part in element.itertext()))
|
||||
|
||||
|
||||
def _direct_text(element: ET.Element) -> str:
|
||||
values = [element.text or ""]
|
||||
for child in list(element):
|
||||
values.append(child.tail or "")
|
||||
return _clean_text(" ".join(values))
|
||||
|
||||
|
||||
def _first_heading(element: ET.Element) -> tuple[str | None, int | None]:
|
||||
for descendant in element.iter():
|
||||
name = _local_name(descendant.tag)
|
||||
if name in {"h1", "h2", "h3", "h4", "h5", "h6"}:
|
||||
text = _inline_text(descendant)
|
||||
if text:
|
||||
return text, int(name[1])
|
||||
return None, None
|
||||
|
||||
|
||||
def _first_descendant(element: ET.Element, local_name: str) -> ET.Element | None:
|
||||
for descendant in element.iter():
|
||||
if _local_name(descendant.tag) == local_name:
|
||||
return descendant
|
||||
return None
|
||||
|
||||
|
||||
def _anchors(element: ET.Element) -> list[str]:
|
||||
anchors: list[str] = []
|
||||
for descendant in element.iter():
|
||||
value = descendant.attrib.get("id") or descendant.attrib.get("name")
|
||||
if value and value not in anchors:
|
||||
anchors.append(value)
|
||||
return anchors
|
||||
|
||||
|
||||
def _is_boilerplate(item: dict[str, str]) -> bool:
|
||||
haystack = " ".join(
|
||||
[
|
||||
item.get("id", ""),
|
||||
item.get("href", ""),
|
||||
item.get("properties", ""),
|
||||
]
|
||||
).lower()
|
||||
return any(hint in haystack for hint in BOILERPLATE_HINTS)
|
||||
|
||||
|
||||
def _resolve_package_path(rootfile_path: str, href: str) -> str:
|
||||
base = posixpath.dirname(rootfile_path)
|
||||
return posixpath.normpath(posixpath.join(base, href))
|
||||
|
||||
|
||||
def _segment_id(anchors: list[str], order: int) -> str:
|
||||
if anchors:
|
||||
slug = re.sub(r"[^a-z0-9._-]+", "-", anchors[0].lower()).strip("-")
|
||||
if slug:
|
||||
return slug
|
||||
return f"seg-{order + 1:04d}"
|
||||
|
||||
|
||||
def _document_id(asset: SourceAsset, metadata: SourceMetadata) -> str:
|
||||
identifier = next(iter(metadata.identifiers.values()), None)
|
||||
if identifier:
|
||||
return f"source.epub3:{identifier}"
|
||||
if asset.digest:
|
||||
return f"source.epub3:{asset.digest.removeprefix('sha256:')}"
|
||||
return f"source.epub3:{asset.name or asset.uri}"
|
||||
|
||||
|
||||
def _adapter_info(options: dict[str, Any]) -> dict[str, Any]:
|
||||
return {
|
||||
"id": "source.epub3",
|
||||
"version": "1",
|
||||
"options": options,
|
||||
}
|
||||
|
||||
|
||||
def _warning(
|
||||
asset: SourceAsset,
|
||||
code: str,
|
||||
message: str,
|
||||
*,
|
||||
details: dict[str, Any] | None = None,
|
||||
) -> Diagnostic:
|
||||
return Diagnostic(
|
||||
severity="warning",
|
||||
code=code,
|
||||
message=message,
|
||||
source=SourceLocation(path=asset.path) if asset.path else None,
|
||||
details=details or {},
|
||||
)
|
||||
|
||||
|
||||
def _malformed(
|
||||
asset: SourceAsset,
|
||||
message: str,
|
||||
details: dict[str, Any] | None = None,
|
||||
) -> Diagnostic:
|
||||
return Diagnostic(
|
||||
severity="error",
|
||||
code="source.malformed",
|
||||
message=message,
|
||||
source=SourceLocation(path=asset.path) if asset.path else None,
|
||||
details=details or {},
|
||||
)
|
||||
|
||||
|
||||
def _warning_count(diagnostics: list[Diagnostic]) -> int:
|
||||
return sum(1 for diagnostic in diagnostics if diagnostic.severity == "warning")
|
||||
|
||||
|
||||
def _local_name(tag: str) -> str:
|
||||
return tag.rsplit("}", 1)[-1] if "}" in tag else tag
|
||||
|
||||
|
||||
def _clean_text(text: str) -> str:
|
||||
cleaned = re.sub(r"\s+", " ", text).strip()
|
||||
return re.sub(r"\s+([.,;:!?])", r"\1", cleaned)
|
||||
Reference in New Issue
Block a user