Files
marki-docx/src/markidocx/importer.py
Bernd Worsch 1f3dddf7d6 feat: WP-0001 + WP-0002 complete — LEVEL1 core + service interfaces
WP-0001 (Foundation & LEVEL1 Core):
- manifest model (FR-100), MD→DOCX builder (FR-200), DOCX→MD importer
  (FR-300/400), template family registry (FR-600), drift detector (FR-700),
  CLI wiring, pre-commit config, CI skeleton, regression harness

WP-0002 (Service Interfaces & Workflow Orchestration):
- REST service via FastAPI (FR-900): /health, /version, /capabilities,
  /templates, /styles, /validate, /build, /import, /compare,
  /templates/register, /workflows/{name}, /evidence/{run_id}
- Evidence & report store (FR-1400): JSON-backed, per-run, retrievable
  through all interfaces, classification (pass/warnings/failed)
- Composite workflow orchestration (FR-1300): single-file-roundtrip,
  multi-file-roundtrip, release-regression, family-switch-build
- MCP server via FastMCP (FR-1000): all tools + resources
- CLI additions: `markidocx serve`, `markidocx workflow`, `markidocx mcp`
- Interface parity tests: CLI / REST / MCP produce equivalent results

135 tests passing, ruff + mypy clean.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-16 07:46:31 +00:00

219 lines
6.5 KiB
Python

"""DOCX→Markdown importer for markidocx (FR-300, FR-400)."""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from pathlib import Path
from docx import Document
from docx.document import Document as DocxDocument
from docx.table import Table
from docx.text.paragraph import Paragraph
from markidocx.manifest import Manifest
HEADING_STYLE_RE = re.compile(r"^Heading (\d+)$", re.IGNORECASE)
LIST_BULLET_RE = re.compile(r"^List Bullet", re.IGNORECASE)
LIST_NUMBER_RE = re.compile(r"^List Number", re.IGNORECASE)
@dataclass
class ImportResult:
success: bool
output_files: list[Path]
mapping_status: str # "redistributed" | "merged" | "failed"
warnings: list[str] = field(default_factory=list)
def import_document(manifest: Manifest, docx_path: Path) -> ImportResult:
"""Import *docx_path* and write Markdown back to the project sources.
If multiple source files exist and section boundaries can be detected,
content is redistributed to the original files. Otherwise a single
merged file is produced.
"""
warnings: list[str] = []
if not docx_path.exists():
return ImportResult(
success=False,
output_files=[],
mapping_status="failed",
warnings=[f"DOCX file not found: {docx_path}"],
)
try:
doc = Document(str(docx_path))
except Exception as exc:
return ImportResult(
success=False,
output_files=[],
mapping_status="failed",
warnings=[f"Could not open DOCX: {exc}"],
)
md_text = _docx_to_markdown(doc, warnings)
manifest.output_dir.mkdir(parents=True, exist_ok=True)
# Attempt redistribution to source files (FR-305, FR-405)
if len(manifest.sources) == 1:
out_path = manifest.sources[0].path
out_path.write_text(md_text, encoding="utf-8")
return ImportResult(
success=True,
output_files=[out_path],
mapping_status="redistributed",
warnings=warnings,
)
# Multi-file: attempt redistribution by H1 boundary
sections = _split_by_h1(md_text)
if len(sections) == len(manifest.sources):
output_files: list[Path] = []
for src, section_text in zip(manifest.sources, sections, strict=True):
src.path.write_text(section_text, encoding="utf-8")
output_files.append(src.path)
return ImportResult(
success=True,
output_files=output_files,
mapping_status="redistributed",
warnings=warnings,
)
# Fallback: merged single output (FR-406)
warnings.append(
f"Could not redistribute to {len(manifest.sources)} source files "
f"(found {len(sections)} H1 sections); writing merged output"
)
merged_path = manifest.output_dir / "imported_merged.md"
merged_path.write_text(md_text, encoding="utf-8")
return ImportResult(
success=True,
output_files=[merged_path],
mapping_status="merged",
warnings=warnings,
)
# ---------------------------------------------------------------------------
# DOCX → Markdown conversion
# ---------------------------------------------------------------------------
def _docx_to_markdown(doc: DocxDocument, warnings: list[str]) -> str:
"""Convert a python-docx Document to a Markdown string."""
lines: list[str] = []
# Walk python-docx's block-level items
for block in _iter_blocks(doc):
if isinstance(block, Paragraph):
md = _paragraph_to_md(block, warnings)
if md is not None:
lines.append(md)
elif isinstance(block, Table):
lines.append(_table_to_md(block))
return "\n\n".join(line for line in lines if line is not None)
def _iter_blocks(doc: DocxDocument):
"""Yield Paragraph and Table objects from the document body in order."""
body = doc.element.body
for child in body:
tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
if tag == "p":
yield Paragraph(child, doc)
elif tag == "tbl":
yield Table(child, doc)
def _paragraph_to_md(para: Paragraph, warnings: list[str]) -> str | None:
"""Convert a paragraph to a Markdown line."""
style_name = para.style.name if para.style else "Normal"
text = para.text.strip()
# Headings
m = HEADING_STYLE_RE.match(style_name)
if m:
level = int(m.group(1))
return f"{'#' * level} {text}"
# Lists
if LIST_BULLET_RE.match(style_name):
return f"- {text}"
if LIST_NUMBER_RE.match(style_name):
return f"1. {text}"
# Normal text — preserve inline markup
if not text:
return None
return _runs_to_md(para)
def _runs_to_md(para: Paragraph) -> str:
"""Convert paragraph runs to Markdown with inline formatting."""
parts: list[str] = []
for run in para.runs:
text = run.text
if not text:
continue
if run.bold and run.italic:
text = f"***{text}***"
elif run.bold:
text = f"**{text}**"
elif run.italic:
text = f"*{text}*"
elif run.font.name and "Courier" in run.font.name:
text = f"`{text}`"
parts.append(text)
return "".join(parts)
def _table_to_md(table: Table) -> str:
"""Convert a DOCX table to a GFM Markdown table."""
rows = table.rows
if not rows:
return ""
cells_per_row = [
[cell.text.strip().replace("|", "\\|") for cell in row.cells]
for row in rows
]
# Normalise column count
num_cols = max(len(r) for r in cells_per_row)
for row in cells_per_row:
while len(row) < num_cols:
row.append("")
lines: list[str] = []
header = "| " + " | ".join(cells_per_row[0]) + " |"
separator = "| " + " | ".join(["---"] * num_cols) + " |"
lines.append(header)
lines.append(separator)
for row in cells_per_row[1:]:
lines.append("| " + " | ".join(row) + " |")
return "\n".join(lines)
def _split_by_h1(md_text: str) -> list[str]:
"""Split Markdown text into sections at H1 boundaries."""
lines = md_text.split("\n\n")
sections: list[str] = []
current: list[str] = []
for chunk in lines:
if chunk.startswith("# ") and current:
sections.append("\n\n".join(current))
current = [chunk]
else:
current.append(chunk)
if current:
sections.append("\n\n".join(current))
return sections