Optional JSONPath query/extract support, FTS5 section/block search, mkt cache query and search. Local SQLite backend now supports parsed snapshot persistence, incremental refresh, cached querying, and ranked full-text search

This commit is contained in:
2026-05-04 10:32:06 +02:00
parent 36ff4cedab
commit 0015c8a385
11 changed files with 540 additions and 22 deletions

View File

@@ -36,6 +36,7 @@ from markitect_tool.backend.local_store import (
DEFAULT_LOCAL_INDEX_PATH,
LOCAL_INDEX_SCHEMA_VERSION,
LocalIndexBuildResult,
LocalSearchResult,
LocalSnapshotStore,
local_index_path_for,
)
@@ -70,6 +71,7 @@ __all__ = [
"DEFAULT_LOCAL_INDEX_PATH",
"LOCAL_INDEX_SCHEMA_VERSION",
"LocalIndexBuildResult",
"LocalSearchResult",
"LocalSnapshotStore",
"local_index_path_for",
]

View File

@@ -56,6 +56,24 @@ class LocalIndexBuildResult:
return data
@dataclass(frozen=True)
class LocalSearchResult:
"""One FTS search match from the local index."""
path: str
snapshot_id: str
unit_kind: str
unit_index: int
heading: str | None
text: str
rank: float
line_start: int | None = None
line_end: int | None = None
def to_dict(self) -> dict[str, Any]:
return {key: value for key, value in asdict(self).items() if value is not None}
class LocalSnapshotStore:
"""SQLite-backed local snapshot store for parsed Markdown documents."""
@@ -217,6 +235,7 @@ class LocalSnapshotStore:
return
with self._connect() as conn:
_create_schema(conn)
conn.execute("delete from search_units where path = ?", (path,))
conn.execute("delete from blocks where path = ?", (path,))
conn.execute("delete from sections where path = ?", (path,))
conn.execute("delete from headings where path = ?", (path,))
@@ -236,6 +255,45 @@ class LocalSnapshotStore:
raise KeyError(f"No indexed document `{path}`")
return json.loads(row["document_json"])
def search(self, query: str, *, limit: int = 20) -> list[LocalSearchResult]:
"""Search indexed section and block text with SQLite FTS5."""
if not query.strip():
raise ValueError("Search query cannot be empty")
if not self.path.exists():
return []
with self._connect() as conn:
_create_schema(conn)
try:
rows = conn.execute(
"""
select s.path, s.snapshot_id, s.unit_kind, s.unit_index,
s.heading, s.text, s.line_start, s.line_end,
bm25(search_units) as rank
from search_units s
where search_units match ?
order by rank
limit ?
""",
(query, limit),
).fetchall()
except sqlite3.OperationalError as exc:
raise ValueError(f"Invalid FTS query `{query}`: {exc}") from exc
return [
LocalSearchResult(
path=row["path"],
snapshot_id=row["snapshot_id"],
unit_kind=row["unit_kind"],
unit_index=row["unit_index"],
heading=row["heading"],
text=row["text"],
line_start=row["line_start"],
line_end=row["line_end"],
rank=float(row["rank"]),
)
for row in rows
]
def build(
self,
paths: list[str | Path],
@@ -382,6 +440,16 @@ def _create_schema(conn: sqlite3.Connection) -> None:
target_snapshot_id text,
metadata_json text not null default '{}'
);
create virtual table if not exists search_units using fts5(
path unindexed,
snapshot_id unindexed,
unit_kind unindexed,
unit_index unindexed,
heading,
text,
line_start unindexed,
line_end unindexed
);
create index if not exists idx_sources_content_hash on sources(content_hash);
create index if not exists idx_sources_snapshot_id on sources(snapshot_id);
create index if not exists idx_sources_parser on sources(parser, parser_version);
@@ -402,6 +470,7 @@ def _replace_document_units(
conn.execute("delete from blocks where path = ?", (path,))
conn.execute("delete from sections where path = ?", (path,))
conn.execute("delete from headings where path = ?", (path,))
conn.execute("delete from search_units where path = ?", (path,))
for idx, heading in enumerate(document.get("headings", [])):
conn.execute(
"""
@@ -441,6 +510,22 @@ def _replace_document_units(
line_end,
),
)
conn.execute(
"""
insert into search_units(
path, snapshot_id, unit_kind, unit_index, heading, text, line_start, line_end
) values (?, ?, 'section', ?, ?, ?, ?, ?)
""",
(
path,
snapshot_id,
idx,
str(heading["text"]),
text,
line_start,
line_end,
),
)
for idx, block in enumerate(document.get("blocks", [])):
conn.execute(
"""
@@ -459,6 +544,22 @@ def _replace_document_units(
block.get("heading_level"),
),
)
conn.execute(
"""
insert into search_units(
path, snapshot_id, unit_kind, unit_index, heading, text, line_start, line_end
) values (?, ?, 'block', ?, ?, ?, ?, ?)
""",
(
path,
snapshot_id,
idx,
None,
str(block.get("text", "")),
block.get("line_start"),
block.get("line_end"),
),
)
def _load_dependencies(conn: sqlite3.Connection) -> dict[str, list[DependencyEdge]]:

View File

@@ -29,7 +29,7 @@ from markitect_tool.content_class import (
ContentClassResolutionError,
load_content_class_file,
)
from markitect_tool.core import parse_markdown_file
from markitect_tool.core import Document, parse_markdown_file
from markitect_tool.contract import (
ContractLoaderError,
check_markdown_file,
@@ -52,7 +52,13 @@ from markitect_tool.generation import (
from markitect_tool.literate import tangle_markdown, weave_markdown, write_tangle_files
from markitect_tool.ops import IncludeError, compose_files, resolve_includes, transform_markdown
from markitect_tool.processor import ProcessorContext, run_fenced_processors
from markitect_tool.query import InvalidQueryError, extract_document, query_document
from markitect_tool.query import (
InvalidQueryError,
extract_document,
extract_document_jsonpath,
query_document,
query_document_jsonpath,
)
from markitect_tool.reference import (
ReferenceContext,
ReferenceResolutionError,
@@ -162,6 +168,13 @@ def metrics(file: Path, output_format: str) -> None:
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.argument("selector")
@click.option(
"--engine",
type=click.Choice(["selector", "jsonpath"], case_sensitive=False),
default="selector",
show_default=True,
help="Query engine to use.",
)
@click.option(
"--format",
"output_format",
@@ -169,16 +182,21 @@ def metrics(file: Path, output_format: str) -> None:
default="json",
show_default=True,
)
def query(file: Path, selector: str, output_format: str) -> None:
def query(file: Path, selector: str, engine: str, output_format: str) -> None:
"""Query structured Markdown content with a small selector."""
document = parse_markdown_file(file)
try:
matches = query_document(document, selector)
matches = (
query_document_jsonpath(document, selector)
if engine == "jsonpath"
else query_document(document, selector)
)
except InvalidQueryError as exc:
raise click.ClickException(str(exc)) from exc
data = {
"selector": selector,
"engine": engine,
"document_path": str(file),
"count": len(matches),
"matches": [match.to_dict() for match in matches],
@@ -189,6 +207,13 @@ def query(file: Path, selector: str, output_format: str) -> None:
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.argument("selector")
@click.option(
"--engine",
type=click.Choice(["selector", "jsonpath"], case_sensitive=False),
default="selector",
show_default=True,
help="Query engine to use.",
)
@click.option(
"--format",
"output_format",
@@ -196,16 +221,21 @@ def query(file: Path, selector: str, output_format: str) -> None:
default="text",
show_default=True,
)
def extract(file: Path, selector: str, output_format: str) -> None:
def extract(file: Path, selector: str, engine: str, output_format: str) -> None:
"""Extract text or Markdown content from structured Markdown."""
document = parse_markdown_file(file)
try:
items = extract_document(document, selector)
items = (
extract_document_jsonpath(document, selector)
if engine == "jsonpath"
else extract_document(document, selector)
)
except InvalidQueryError as exc:
raise click.ClickException(str(exc)) from exc
data = {
"selector": selector,
"engine": engine,
"document_path": str(file),
"count": len(items),
"items": items,
@@ -976,6 +1006,124 @@ def cache_index(
_emit_local_index_data(result.to_dict(), output_format)
@cache.command("query")
@click.argument("selector")
@click.option(
"--root",
type=click.Path(exists=True, file_okay=False, path_type=Path),
default=Path("."),
show_default=True,
help="Root used for the default local index path.",
)
@click.option(
"--index-path",
type=click.Path(dir_okay=False, path_type=Path),
help="SQLite index path. Defaults to .markitect/cache/index.sqlite3 under root.",
)
@click.option(
"--path",
"paths",
multiple=True,
help="Restrict query to one or more indexed relative paths.",
)
@click.option(
"--engine",
type=click.Choice(["selector", "jsonpath"], case_sensitive=False),
default="selector",
show_default=True,
help="Query engine to use.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="json",
show_default=True,
)
def cache_query(
selector: str,
root: Path,
index_path: Path | None,
paths: tuple[str, ...],
engine: str,
output_format: str,
) -> None:
"""Run a selector or JSONPath query over indexed document snapshots."""
store = LocalSnapshotStore(local_index_path_for(root, index_path))
indexed_paths = sorted(paths or [state.path for state in store.load_state()])
all_matches = []
try:
for indexed_path in indexed_paths:
document = Document.from_dict(store.get_document(indexed_path))
matches = (
query_document_jsonpath(document, selector)
if engine == "jsonpath"
else query_document(document, selector)
)
for match in matches:
item = match.to_dict()
item["source_path"] = indexed_path
all_matches.append(item)
except KeyError as exc:
raise click.ClickException(str(exc)) from exc
except InvalidQueryError as exc:
raise click.ClickException(str(exc)) from exc
data = {
"selector": selector,
"engine": engine,
"index_path": str(local_index_path_for(root, index_path)),
"count": len(all_matches),
"matches": all_matches,
}
_emit_query(data, output_format)
@main.command()
@click.argument("text")
@click.option(
"--root",
type=click.Path(exists=True, file_okay=False, path_type=Path),
default=Path("."),
show_default=True,
help="Root used for the default local index path.",
)
@click.option(
"--index-path",
type=click.Path(dir_okay=False, path_type=Path),
help="SQLite index path. Defaults to .markitect/cache/index.sqlite3 under root.",
)
@click.option("--limit", type=int, default=20, show_default=True)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def search(
text: str,
root: Path,
index_path: Path | None,
limit: int,
output_format: str,
) -> None:
"""Search the local SQLite index with FTS5."""
try:
store = LocalSnapshotStore(local_index_path_for(root, index_path))
results = store.search(text, limit=limit)
except ValueError as exc:
raise click.ClickException(str(exc)) from exc
data = {
"query": text,
"index_path": str(local_index_path_for(root, index_path)),
"count": len(results),
"matches": [result.to_dict() for result in results],
}
_emit_search_results(data, output_format)
@main.group()
def template() -> None:
"""Render and inspect deterministic Markdown templates."""
@@ -1392,6 +1540,26 @@ def _emit_local_index_data(data: dict, output_format: str) -> None:
click.echo(f"- {value}")
def _emit_search_results(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(f"{data['count']} match(es)")
for match in data["matches"]:
span = ""
if match.get("line_start"):
span = f":{match['line_start']}"
heading = f" [{match['heading']}]" if match.get("heading") else ""
click.echo(
f"- {match['path']}{span} {match['unit_kind']}#{match['unit_index']}{heading}"
)
preview = " ".join(str(match.get("text", "")).split())
if preview:
click.echo(f" {preview[:160]}")
def _emit_reference_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))

View File

@@ -17,6 +17,10 @@ class Heading:
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Heading":
return cls(level=int(data["level"]), text=str(data["text"]), line=int(data["line"]))
@dataclass(frozen=True)
class ContentBlock:
@@ -32,6 +36,16 @@ class ContentBlock:
data = asdict(self)
return {key: value for key, value in data.items() if value is not None}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ContentBlock":
return cls(
type=str(data["type"]),
text=str(data.get("text", "")),
line_start=int(data["line_start"]) if data.get("line_start") is not None else None,
line_end=int(data["line_end"]) if data.get("line_end") is not None else None,
heading_level=int(data["heading_level"]) if data.get("heading_level") is not None else None,
)
@dataclass(frozen=True)
class Section:
@@ -46,6 +60,13 @@ class Section:
"blocks": [block.to_dict() for block in self.blocks],
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Section":
return cls(
heading=Heading.from_dict(data["heading"]),
blocks=[ContentBlock.from_dict(block) for block in data.get("blocks", [])],
)
@dataclass(frozen=True)
class Document:
@@ -70,3 +91,15 @@ class Document:
"tokens": self.tokens,
}
return {key: value for key, value in data.items() if value is not None}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Document":
return cls(
source_path=str(data["source_path"]) if data.get("source_path") is not None else None,
frontmatter=dict(data.get("frontmatter", {})),
body=str(data.get("body", "")),
blocks=[ContentBlock.from_dict(block) for block in data.get("blocks", [])],
headings=[Heading.from_dict(heading) for heading in data.get("headings", [])],
sections=[Section.from_dict(section) for section in data.get("sections", [])],
tokens=list(data.get("tokens", [])),
)

View File

@@ -4,12 +4,16 @@ from markitect_tool.query.engine import (
InvalidQueryError,
QueryMatch,
extract_document,
extract_document_jsonpath,
query_document,
query_document_jsonpath,
)
__all__ = [
"InvalidQueryError",
"QueryMatch",
"extract_document",
"extract_document_jsonpath",
"query_document",
"query_document_jsonpath",
]

View File

@@ -60,6 +60,42 @@ def query_document(document: Document, selector: str) -> list[QueryMatch]:
raise InvalidQueryError(f"Unsupported selector target `{parsed.target}`")
def query_document_jsonpath(document: Document, expression: str) -> list[QueryMatch]:
"""Query a parsed document with JSONPath over ``Document.to_dict()``.
JSONPath support is intentionally optional so the core selector engine
remains dependency-light. Install ``markitect-tool[query]`` to enable it.
"""
try:
from jsonpath_ng.ext import parse as parse_jsonpath
except ImportError as exc: # pragma: no cover - branch depends on env deps
raise InvalidQueryError(
"JSONPath queries require the optional `jsonpath-ng` dependency. "
"Install `markitect-tool[query]`."
) from exc
try:
compiled = parse_jsonpath(expression)
except Exception as exc: # jsonpath-ng raises parser-specific exceptions
raise InvalidQueryError(f"Invalid JSONPath expression `{expression}`: {exc}") from exc
matches: list[QueryMatch] = []
for match in compiled.find(document.to_dict()):
path = "$" + str(match.full_path)
value = match.value
matches.append(
QueryMatch(
kind=_jsonpath_kind(path, value),
path=path,
value=value,
text=_text_value(value),
line=_jsonpath_line(value),
)
)
return matches
def extract_document(document: Document, selector: str) -> list[str]:
"""Extract text content from query matches."""
@@ -74,6 +110,16 @@ def extract_document(document: Document, selector: str) -> list[str]:
return extracted
def extract_document_jsonpath(document: Document, expression: str) -> list[str]:
"""Extract textual JSONPath matches from a parsed document."""
extracted: list[str] = []
for match in query_document_jsonpath(document, expression):
if match.text is not None:
extracted.append(match.text)
return extracted
def _parse_selector(selector: str) -> _Selector:
raw = selector.strip()
if not raw:
@@ -240,3 +286,25 @@ def _text_value(value: Any) -> str | None:
if isinstance(value, int | float | bool):
return str(value)
return None
def _jsonpath_kind(path: str, value: Any) -> str:
if ".frontmatter" in path:
return "frontmatter"
if ".headings" in path:
return "heading" if isinstance(value, dict) else "heading_value"
if ".sections" in path:
return "section" if isinstance(value, dict) else "section_value"
if ".blocks" in path:
return "block" if isinstance(value, dict) else "block_value"
if ".tokens" in path:
return "token" if isinstance(value, dict) else "token_value"
return "jsonpath"
def _jsonpath_line(value: Any) -> int | None:
if isinstance(value, dict):
raw_line = value.get("line") or value.get("line_start")
if isinstance(raw_line, int):
return raw_line
return None