generated from coulomb/repo-seed
feat(source): add pdf read adapter
This commit is contained in:
242
tests/test_pdf_adapter.py
Normal file
242
tests/test_pdf_adapter.py
Normal file
@@ -0,0 +1,242 @@
|
||||
from pathlib import Path
|
||||
|
||||
from markitect_tool.source import (
|
||||
SourceAdapterMatchRequest,
|
||||
SourceAdapterRegistry,
|
||||
SourceAsset,
|
||||
SourceInspectRequest,
|
||||
SourceReadRequest,
|
||||
discover_source_adapters,
|
||||
inspect_source,
|
||||
normalize_source,
|
||||
)
|
||||
|
||||
from markitect_filter.adapters import pdf_adapter_descriptor
|
||||
|
||||
|
||||
class FakeEntryPoint:
|
||||
name = "pdf"
|
||||
|
||||
def load(self):
|
||||
return pdf_adapter_descriptor
|
||||
|
||||
|
||||
def test_pdf_descriptor_matches_contract():
|
||||
descriptor = pdf_adapter_descriptor()
|
||||
|
||||
assert descriptor.id == "source.pdf"
|
||||
assert descriptor.operations == ["read"]
|
||||
assert descriptor.media_types == ["application/pdf"]
|
||||
assert descriptor.extensions == [".pdf"]
|
||||
assert descriptor.safety["network"] is False
|
||||
assert descriptor.safety["external_process"] is False
|
||||
assert descriptor.option_schema["properties"]["include_page_breaks"]["default"] is False
|
||||
assert descriptor.metadata["dependency_profile"] == "stdlib"
|
||||
|
||||
|
||||
def test_pdf_adapter_matches_pdf_assets(tmp_path: Path):
|
||||
pdf_path = _write_pdf(tmp_path)
|
||||
asset = SourceAsset.from_path(pdf_path, media_type="application/pdf")
|
||||
adapter = pdf_adapter_descriptor().instantiate()
|
||||
|
||||
match = adapter.can_read(SourceAdapterMatchRequest(asset=asset))
|
||||
|
||||
assert match.matched
|
||||
assert match.confidence == 100
|
||||
|
||||
|
||||
def test_pdf_adapter_inspects_metadata(tmp_path: Path):
|
||||
pdf_path = _write_pdf(tmp_path)
|
||||
asset = SourceAsset.from_path(pdf_path, media_type="application/pdf")
|
||||
adapter = pdf_adapter_descriptor().instantiate()
|
||||
|
||||
result = adapter.inspect(SourceInspectRequest(asset=asset))
|
||||
|
||||
assert result.is_valid
|
||||
assert result.metadata.title == "PDF Fixture"
|
||||
assert result.metadata.creators == ["Ada Lovelace"]
|
||||
assert result.metadata.publication_date == "D:20260514093000Z"
|
||||
assert result.metadata.raw["subject"] == "Source Adapter Test"
|
||||
assert result.quality.lossiness == "medium"
|
||||
assert result.quality.metadata["page_count"] == 2
|
||||
assert result.quality.metadata["pages_with_text"] == 2
|
||||
|
||||
|
||||
def test_pdf_adapter_normalizes_pages_to_markdown(tmp_path: Path):
|
||||
pdf_path = _write_pdf(tmp_path)
|
||||
asset = SourceAsset.from_path(pdf_path, media_type="application/pdf")
|
||||
adapter = pdf_adapter_descriptor().instantiate()
|
||||
|
||||
result = adapter.read(SourceReadRequest(asset=asset))
|
||||
|
||||
assert result.is_valid
|
||||
assert result.document is not None
|
||||
assert result.document.document_id == "source.pdf:pdf-fixture"
|
||||
assert result.document.markdown == "Hello PDF\nSecond line\n\nPage two text."
|
||||
assert [segment.segment_id for segment in result.document.segments] == [
|
||||
"page-0001",
|
||||
"page-0002",
|
||||
]
|
||||
assert result.document.segments[0].provenance[0].page == "1"
|
||||
assert result.document.quality.lossiness == "low"
|
||||
assert result.document.quality.metadata["page_coverage"] == 1.0
|
||||
|
||||
|
||||
def test_pdf_adapter_applies_page_range_and_page_markers(tmp_path: Path):
|
||||
pdf_path = _write_pdf(tmp_path)
|
||||
asset = SourceAsset.from_path(pdf_path, media_type="application/pdf")
|
||||
adapter = pdf_adapter_descriptor().instantiate()
|
||||
|
||||
result = adapter.read(
|
||||
SourceReadRequest(
|
||||
asset=asset,
|
||||
options={"page_range": "2", "include_page_breaks": True},
|
||||
)
|
||||
)
|
||||
|
||||
assert result.is_valid
|
||||
assert result.document is not None
|
||||
assert result.document.markdown == "<!-- page: 2 -->\n\nPage two text."
|
||||
assert result.document.segments[0].metadata["page"] == 2
|
||||
|
||||
|
||||
def test_markitect_api_can_use_pdf_registry(tmp_path: Path):
|
||||
pdf_path = _write_pdf(tmp_path)
|
||||
registry = SourceAdapterRegistry([pdf_adapter_descriptor()])
|
||||
|
||||
inspected = inspect_source(pdf_path, registry=registry)
|
||||
normalized = normalize_source(pdf_path, registry=registry)
|
||||
|
||||
assert inspected.is_valid
|
||||
assert inspected.metadata.title == "PDF Fixture"
|
||||
assert normalized.is_valid
|
||||
assert normalized.document is not None
|
||||
assert normalized.document.segments[1].markdown == "Page two text."
|
||||
|
||||
|
||||
def test_pdf_adapter_reports_malformed_pdf(tmp_path: Path):
|
||||
pdf_path = tmp_path / "broken.pdf"
|
||||
pdf_path.write_bytes(b"not a pdf")
|
||||
asset = SourceAsset.from_path(pdf_path, media_type="application/pdf")
|
||||
adapter = pdf_adapter_descriptor().instantiate()
|
||||
|
||||
result = adapter.read(SourceReadRequest(asset=asset))
|
||||
|
||||
assert not result.is_valid
|
||||
assert result.diagnostics[0].code == "source.malformed"
|
||||
|
||||
|
||||
def test_pdf_adapter_reports_encrypted_pdf(tmp_path: Path):
|
||||
pdf_path = _write_pdf(tmp_path, encrypted=True)
|
||||
asset = SourceAsset.from_path(pdf_path, media_type="application/pdf")
|
||||
adapter = pdf_adapter_descriptor().instantiate()
|
||||
|
||||
result = adapter.read(SourceReadRequest(asset=asset))
|
||||
|
||||
assert not result.is_valid
|
||||
assert result.diagnostics[0].code == "source.pdf.encrypted"
|
||||
|
||||
|
||||
def test_pdf_entry_point_discovery_shape():
|
||||
registry = discover_source_adapters([FakeEntryPoint()])
|
||||
|
||||
assert registry.get("source.pdf").name == "PDF"
|
||||
|
||||
|
||||
def _write_pdf(tmp_path: Path, *, encrypted: bool = False) -> Path:
|
||||
pdf_path = tmp_path / ("encrypted.pdf" if encrypted else "fixture.pdf")
|
||||
objects: list[tuple[int, bytes]] = []
|
||||
page_refs = []
|
||||
next_id = 3
|
||||
for page_number, lines in enumerate(
|
||||
[
|
||||
["Hello PDF", "Second line"],
|
||||
["Page two text."],
|
||||
],
|
||||
start=1,
|
||||
):
|
||||
page_id = next_id
|
||||
content_id = next_id + 1
|
||||
next_id += 2
|
||||
page_refs.append(f"{page_id} 0 R")
|
||||
stream = _page_stream(lines)
|
||||
objects.append(
|
||||
(
|
||||
page_id,
|
||||
(
|
||||
f"<< /Type /Page /Parent 2 0 R /MediaBox [0 0 612 792] "
|
||||
f"/Resources << /Font << /F1 7 0 R >> >> /Contents {content_id} 0 R >>"
|
||||
).encode("ascii"),
|
||||
)
|
||||
)
|
||||
objects.append(
|
||||
(
|
||||
content_id,
|
||||
b"<< /Length "
|
||||
+ str(len(stream)).encode("ascii")
|
||||
+ b" >>\nstream\n"
|
||||
+ stream
|
||||
+ b"\nendstream",
|
||||
)
|
||||
)
|
||||
|
||||
objects.extend(
|
||||
[
|
||||
(1, b"<< /Type /Catalog /Pages 2 0 R >>"),
|
||||
(
|
||||
2,
|
||||
(
|
||||
f"<< /Type /Pages /Kids [{' '.join(page_refs)}] "
|
||||
f"/Count {len(page_refs)} >>"
|
||||
).encode("ascii"),
|
||||
),
|
||||
(7, b"<< /Type /Font /Subtype /Type1 /BaseFont /Helvetica >>"),
|
||||
(
|
||||
8,
|
||||
b"<< /Title (PDF Fixture) /Author (Ada Lovelace) "
|
||||
b"/Subject (Source Adapter Test) /Keywords (markitect pdf) "
|
||||
b"/Producer (markitect-filter tests) /CreationDate (D:20260514093000Z) >>",
|
||||
),
|
||||
]
|
||||
)
|
||||
if encrypted:
|
||||
objects.append((9, b"<< /Filter /Standard /V 1 /R 2 >>"))
|
||||
objects.sort(key=lambda item: item[0])
|
||||
|
||||
header = b"%PDF-1.4\n%\xe2\xe3\xcf\xd3\n"
|
||||
content = bytearray(header)
|
||||
max_id = max(object_id for object_id, _ in objects)
|
||||
offsets = {0: 0}
|
||||
for object_id, body in objects:
|
||||
offsets[object_id] = len(content)
|
||||
content.extend(f"{object_id} 0 obj\n".encode("ascii"))
|
||||
content.extend(body)
|
||||
content.extend(b"\nendobj\n")
|
||||
|
||||
xref_offset = len(content)
|
||||
content.extend(f"xref\n0 {max_id + 1}\n".encode("ascii"))
|
||||
content.extend(b"0000000000 65535 f \n")
|
||||
for object_id in range(1, max_id + 1):
|
||||
content.extend(f"{offsets.get(object_id, 0):010d} 00000 n \n".encode("ascii"))
|
||||
trailer = f"trailer\n<< /Size {max_id + 1} /Root 1 0 R /Info 8 0 R".encode("ascii")
|
||||
if encrypted:
|
||||
trailer += b" /Encrypt 9 0 R"
|
||||
trailer += b" >>\n"
|
||||
content.extend(trailer)
|
||||
content.extend(f"startxref\n{xref_offset}\n%%EOF\n".encode("ascii"))
|
||||
pdf_path.write_bytes(bytes(content))
|
||||
return pdf_path
|
||||
|
||||
|
||||
def _page_stream(lines: list[str]) -> bytes:
|
||||
parts = ["BT", "/F1 12 Tf", "72 720 Td"]
|
||||
for index, line in enumerate(lines):
|
||||
if index:
|
||||
parts.append("T*")
|
||||
parts.append(f"({_pdf_literal(line)}) Tj")
|
||||
parts.append("ET")
|
||||
return "\n".join(parts).encode("ascii")
|
||||
|
||||
|
||||
def _pdf_literal(text: str) -> str:
|
||||
return text.replace("\\", "\\\\").replace("(", "\\(").replace(")", "\\)")
|
||||
Reference in New Issue
Block a user