Workplan dependencies and prio for text research lab workplans

This commit is contained in:
2026-05-04 00:12:07 +02:00
parent 4fc891c076
commit 6f0facd744
18 changed files with 1644 additions and 1 deletions

View File

@@ -0,0 +1,15 @@
"""Query and extraction helpers for parsed Markdown documents."""
from markitect_tool.query.engine import (
InvalidQueryError,
QueryMatch,
extract_document,
query_document,
)
__all__ = [
"InvalidQueryError",
"QueryMatch",
"extract_document",
"query_document",
]

View File

@@ -0,0 +1,242 @@
"""Small selector engine for structured Markdown documents."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from markitect_tool.contract import collect_metrics
from markitect_tool.core import ContentBlock, Document, Heading, Section
class InvalidQueryError(ValueError):
"""Raised when a selector cannot be parsed or evaluated."""
@dataclass(frozen=True)
class QueryMatch:
"""One match returned by a selector."""
kind: str
path: str
value: Any
text: str | None = None
line: int | None = None
def to_dict(self) -> dict[str, Any]:
data = {
"kind": self.kind,
"path": self.path,
"value": self.value,
"text": self.text,
"line": self.line,
}
return {key: value for key, value in data.items() if value is not None}
@dataclass(frozen=True)
class _Selector:
target: str
path: list[str]
filters: dict[str, str]
def query_document(document: Document, selector: str) -> list[QueryMatch]:
"""Query a parsed document with a small Markitect selector."""
parsed = _parse_selector(selector)
if parsed.target in {"document", "$", "."}:
return [QueryMatch(kind="document", path="$", value=document.to_dict())]
if parsed.target == "frontmatter":
return _query_mapping(document.frontmatter, parsed.path, "frontmatter", "$.frontmatter")
if parsed.target == "headings":
return _query_headings(document.headings, parsed.filters)
if parsed.target == "sections":
return _query_sections(document.sections, parsed.filters)
if parsed.target == "blocks":
return _query_blocks(document.blocks, parsed.filters)
if parsed.target == "metrics":
return _query_mapping(collect_metrics(document).to_dict(), parsed.path, "metrics", "$.metrics")
raise InvalidQueryError(f"Unsupported selector target `{parsed.target}`")
def extract_document(document: Document, selector: str) -> list[str]:
"""Extract text content from query matches."""
extracted: list[str] = []
for match in query_document(document, selector):
if match.text is not None:
extracted.append(match.text)
elif isinstance(match.value, str):
extracted.append(match.value)
elif isinstance(match.value, int | float | bool):
extracted.append(str(match.value))
return extracted
def _parse_selector(selector: str) -> _Selector:
raw = selector.strip()
if not raw:
raise InvalidQueryError("Selector cannot be empty")
filters: dict[str, str] = {}
base = raw
if "[" in raw or "]" in raw:
if not raw.endswith("]") or raw.count("[") != 1 or raw.count("]") != 1:
raise InvalidQueryError(f"Invalid selector filter syntax `{selector}`")
base, raw_filter = raw[:-1].split("[", 1)
filters = _parse_filters(raw_filter)
parts = [part for part in base.split(".") if part]
if not parts:
return _Selector(target="document", path=[], filters=filters)
return _Selector(target=parts[0], path=parts[1:], filters=filters)
def _parse_filters(raw_filter: str) -> dict[str, str]:
filters: dict[str, str] = {}
for raw_part in raw_filter.split(","):
part = raw_part.strip()
if not part:
continue
operator = "~=" if "~=" in part else "="
if operator not in part:
raise InvalidQueryError(f"Invalid filter `{part}`")
key, value = part.split(operator, 1)
key = key.strip()
if operator == "~=":
key = f"{key}~"
if not key:
raise InvalidQueryError(f"Invalid filter `{part}`")
filters[key] = _strip_quotes(value.strip())
return filters
def _query_mapping(
mapping: dict[str, Any],
path: list[str],
kind: str,
root_path: str,
) -> list[QueryMatch]:
if not path:
return [QueryMatch(kind=kind, path=root_path, value=mapping)]
value: Any = mapping
current_path = root_path
for part in path:
current_path = f"{current_path}.{part}"
if isinstance(value, dict) and part in value:
value = value[part]
else:
return []
return [QueryMatch(kind=kind, path=current_path, value=value, text=_text_value(value))]
def _query_headings(headings: list[Heading], filters: dict[str, str]) -> list[QueryMatch]:
matches: list[QueryMatch] = []
for index, heading in enumerate(headings):
if not _match_heading(heading, filters):
continue
matches.append(
QueryMatch(
kind="heading",
path=f"$.headings[{index}]",
value=heading.to_dict(),
text=f"{'#' * heading.level} {heading.text}",
line=heading.line,
)
)
return matches
def _query_sections(sections: list[Section], filters: dict[str, str]) -> list[QueryMatch]:
matches: list[QueryMatch] = []
for index, section in enumerate(sections):
if not _match_section(section, filters):
continue
matches.append(
QueryMatch(
kind="section",
path=f"$.sections[{index}]",
value=section.to_dict(),
text=_section_markdown(section),
line=section.heading.line,
)
)
return matches
def _query_blocks(blocks: list[ContentBlock], filters: dict[str, str]) -> list[QueryMatch]:
matches: list[QueryMatch] = []
for index, block in enumerate(blocks):
if not _match_block(block, filters):
continue
matches.append(
QueryMatch(
kind="block",
path=f"$.blocks[{index}]",
value=block.to_dict(),
text=block.text,
line=block.line_start,
)
)
return matches
def _match_heading(heading: Heading, filters: dict[str, str]) -> bool:
for key, expected in filters.items():
if key == "level" and str(heading.level) != expected:
return False
if key in {"text", "heading", "title"} and heading.text != expected:
return False
if key in {"text~", "heading~", "title~"} and expected.lower() not in heading.text.lower():
return False
return True
def _match_section(section: Section, filters: dict[str, str]) -> bool:
section_text = "\n".join(block.text for block in section.blocks if block.text)
for key, expected in filters.items():
if key == "level" and str(section.heading.level) != expected:
return False
if key in {"heading", "title", "text"} and section.heading.text != expected:
return False
if key in {"heading~", "title~", "text~"} and expected.lower() not in section.heading.text.lower():
return False
if key == "contains" and expected not in section_text:
return False
if key == "contains~" and expected.lower() not in section_text.lower():
return False
return True
def _match_block(block: ContentBlock, filters: dict[str, str]) -> bool:
for key, expected in filters.items():
if key == "type" and block.type != expected:
return False
if key == "contains" and expected not in block.text:
return False
if key == "contains~" and expected.lower() not in block.text.lower():
return False
return True
def _section_markdown(section: Section) -> str:
lines = [f"{'#' * section.heading.level} {section.heading.text}"]
for block in section.blocks:
if block.text:
lines.extend(["", block.text])
return "\n".join(lines).strip()
def _strip_quotes(value: str) -> str:
if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}:
return value[1:-1]
return value
def _text_value(value: Any) -> str | None:
if isinstance(value, str):
return value
if isinstance(value, int | float | bool):
return str(value)
return None