"""Structural drift detection for markidocx (FR-700).""" from __future__ import annotations import re from dataclasses import dataclass, field from markidocx.errors import OutputState HEADING_RE = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE) LIST_ITEM_RE = re.compile(r"^(\s*[-*+]|\s*\d+\.)\s+(.+)$", re.MULTILINE) TABLE_ROW_RE = re.compile(r"^\|.+\|$", re.MULTILINE) FOOTNOTE_RE = re.compile(r"\[\^[^\]]+\]") LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)") @dataclass class DriftReport: has_drift: bool preserved: list[str] = field(default_factory=list) degraded: list[str] = field(default_factory=list) broken: list[str] = field(default_factory=list) unsupported: list[str] = field(default_factory=list) output_state: OutputState = OutputState.FINAL def compare(original: str, reimported: str) -> DriftReport: """Compare *original* Markdown against *reimported* Markdown. Classifies each structural element as: - preserved: identical in both - degraded: present but modified - broken: present in original, missing in reimported - unsupported: construct not supported by the round-trip Returns a DriftReport. """ preserved: list[str] = [] degraded: list[str] = [] broken: list[str] = [] unsupported: list[str] = [] # --- Headings (FR-501) --- orig_headings = _extract_headings(original) reim_headings = _extract_headings(reimported) _compare_sets("heading", orig_headings, reim_headings, preserved, degraded, broken) # --- Lists (FR-502) --- orig_lists = _extract_list_items(original) reim_lists = _extract_list_items(reimported) _compare_sets("list_item", orig_lists, reim_lists, preserved, degraded, broken) # --- Tables (FR-503) --- orig_tables = _count_tables(original) reim_tables = _count_tables(reimported) if orig_tables == reim_tables: if orig_tables > 0: preserved.append(f"tables:{orig_tables}") elif reim_tables < orig_tables: broken.append(f"tables:missing {orig_tables - reim_tables} of {orig_tables}") else: degraded.append(f"tables:count changed {orig_tables}→{reim_tables}") # --- Footnotes (FR-504) --- orig_fn = set(FOOTNOTE_RE.findall(original)) reim_fn = set(FOOTNOTE_RE.findall(reimported)) for fn in orig_fn: if fn in reim_fn: preserved.append(f"footnote:{fn}") else: broken.append(f"footnote:{fn}") # --- Links (FR-506) --- orig_links = {m.group(0) for m in LINK_RE.finditer(original)} reim_links = {m.group(0) for m in LINK_RE.finditer(reimported)} for link in orig_links: if link in reim_links: preserved.append(f"link:{link[:40]}") else: degraded.append(f"link:lost {link[:40]}") # --- Cross-references (FR-531, FR-540) --- _compare_xrefs(original, reimported, preserved, degraded, broken) # --- Figures (FR-532, FR-541) --- _compare_figures(original, reimported, preserved, degraded, broken) # --- Diagram source blocks (FR-534) --- _compare_diagram_blocks(original, reimported, preserved, degraded, broken) # --- Citations & Bibliography (FR-535, FR-542) --- from markidocx.bibliography import compare_citations compare_citations(original, reimported, preserved, degraded, broken) has_drift = bool(degraded or broken) output_state = ( OutputState.FINAL if not has_drift else (OutputState.DEGRADED if not broken else OutputState.PARTIAL) ) return DriftReport( has_drift=has_drift, preserved=preserved, degraded=degraded, broken=broken, unsupported=unsupported, output_state=output_state, ) def _extract_headings(text: str) -> list[str]: return [f"{'#' * len(m.group(1))} {m.group(2).strip()}" for m in HEADING_RE.finditer(text)] def _extract_list_items(text: str) -> list[str]: return [m.group(2).strip() for m in LIST_ITEM_RE.finditer(text)] def _count_tables(text: str) -> int: rows = TABLE_ROW_RE.findall(text) if not rows: return 0 # Count separator rows as table boundaries sep_re = re.compile(r"^\|[-| :]+\|$") count = sum(1 for r in rows if sep_re.match(r)) return count def _compare_figures( original: str, reimported: str, preserved: list[str], degraded: list[str], broken: list[str], ) -> None: """Compare figure labels and captions (FR-532, FR-541).""" from markidocx.figures import extract_figure_captions, extract_figure_labels orig_labels = extract_figure_labels(original) reim_labels = extract_figure_labels(reimported) for label in orig_labels: if label in reim_labels: preserved.append(f"figure-label:{label}") else: broken.append(f"figure-label:missing '{label}'") orig_captions = extract_figure_captions(original) reim_captions = extract_figure_captions(reimported) orig_set = set(orig_captions) reim_set = set(reim_captions) for caption in orig_set: if caption in reim_set: preserved.append(f"figure-caption:{caption[:40]}") else: degraded.append(f"figure-caption:lost '{caption[:40]}'") def _compare_xrefs( original: str, reimported: str, preserved: list[str], degraded: list[str], broken: list[str], ) -> None: """Compare cross-reference anchors and links (FR-531, FR-540).""" from markidocx.xref import extract_anchors, extract_xref_links orig_anchors = extract_anchors(original) reim_anchors = extract_anchors(reimported) for anchor in orig_anchors: if anchor in reim_anchors: preserved.append(f"xref-anchor:{anchor}") else: broken.append(f"xref-anchor:missing '{anchor}'") orig_xrefs = extract_xref_links(original) reim_xrefs = extract_xref_links(reimported) for link_text, anchor in orig_xrefs: if (link_text, anchor) in reim_xrefs: preserved.append(f"xref-link:[{link_text}][{anchor}]") elif anchor not in reim_anchors: broken.append(f"xref-link:broken-target [{link_text}][{anchor}]") else: degraded.append(f"xref-link:degraded [{link_text}][{anchor}]") _FENCED_BLOCK_RE = re.compile(r"```(\w+)\n(.*?)```", re.DOTALL) def _extract_fenced_blocks(text: str) -> list[tuple[str, str]]: """Extract all fenced code blocks as (language, source) pairs.""" return [(m.group(1).strip().lower(), m.group(2).rstrip()) for m in _FENCED_BLOCK_RE.finditer(text)] def _compare_diagram_blocks( original: str, reimported: str, preserved: list[str], degraded: list[str], broken: list[str], ) -> None: """Compare diagram fenced blocks for source-content drift (FR-534).""" from markidocx.diagrams import DIAGRAM_TYPES orig_blocks = [(lang, src) for lang, src in _extract_fenced_blocks(original) if lang in DIAGRAM_TYPES] reim_blocks = [(lang, src) for lang, src in _extract_fenced_blocks(reimported) if lang in DIAGRAM_TYPES] for i, (lang, src) in enumerate(orig_blocks): if i < len(reim_blocks): reim_lang, reim_src = reim_blocks[i] if lang == reim_lang and src == reim_src: preserved.append(f"diagram:{lang}[{i}]") else: degraded.append(f"diagram:{lang}[{i}]:source-mutated") else: broken.append(f"diagram:{lang}[{i}]:missing") def _compare_sets( kind: str, orig: list[str], reim: list[str], preserved: list[str], degraded: list[str], broken: list[str], ) -> None: orig_counts: dict[str, int] = {} for item in orig: orig_counts[item] = orig_counts.get(item, 0) + 1 reim_counts: dict[str, int] = {} for item in reim: reim_counts[item] = reim_counts.get(item, 0) + 1 for item, count in orig_counts.items(): reim_count = reim_counts.get(item, 0) if reim_count >= count: preserved.append(f"{kind}:{item[:60]}") elif reim_count > 0: degraded.append(f"{kind}:partial '{item[:60]}' ({reim_count}/{count})") else: broken.append(f"{kind}:missing '{item[:60]}'")