SQLite-backed local snapshot store

This commit is contained in:
2026-05-04 08:56:41 +02:00
parent 0d1ad21a9f
commit 36ff4cedab
7 changed files with 926 additions and 5 deletions

View File

@@ -32,6 +32,13 @@ from markitect_tool.backend.interfaces import (
QueryAdapter,
SnapshotBackend,
)
from markitect_tool.backend.local_store import (
DEFAULT_LOCAL_INDEX_PATH,
LOCAL_INDEX_SCHEMA_VERSION,
LocalIndexBuildResult,
LocalSnapshotStore,
local_index_path_for,
)
__all__ = [
"BACKEND_CAPABILITIES",
@@ -60,4 +67,9 @@ __all__ = [
"ProcessorResultStore",
"QueryAdapter",
"SnapshotBackend",
"DEFAULT_LOCAL_INDEX_PATH",
"LOCAL_INDEX_SCHEMA_VERSION",
"LocalIndexBuildResult",
"LocalSnapshotStore",
"local_index_path_for",
]

View File

@@ -0,0 +1,510 @@
"""Local SQLite snapshot and metadata store."""
from __future__ import annotations
import json
import sqlite3
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from markitect_tool.backend.engine import (
EMPTY_PARSE_OPTIONS_HASH,
PARSER_ID,
PARSER_VERSION,
DependencyEdge,
ProvenanceEnvelope,
snapshot_identity_for_file,
)
from markitect_tool.backend.planning import SnapshotState, plan_snapshot_refresh
from markitect_tool.cache import scan_markdown_files
from markitect_tool.contract import collect_metrics
from markitect_tool.core import parse_markdown_file
DEFAULT_LOCAL_INDEX_PATH = ".markitect/cache/index.sqlite3"
LOCAL_INDEX_SCHEMA_VERSION = "1"
@dataclass(frozen=True)
class LocalIndexBuildResult:
"""Summary of a local index build or refresh."""
index_path: str
root: str
paths: list[str]
planned: dict[str, Any]
parsed: list[str] = field(default_factory=list)
indexed: list[str] = field(default_factory=list)
metadata_updated: list[str] = field(default_factory=list)
deleted: list[str] = field(default_factory=list)
@property
def dirty(self) -> bool:
return bool(self.parsed or self.indexed or self.metadata_updated or self.deleted)
def to_dict(self) -> dict[str, Any]:
data = asdict(self)
data["dirty"] = self.dirty
data["counts"] = {
"parsed": len(self.parsed),
"indexed": len(self.indexed),
"metadata_updated": len(self.metadata_updated),
"deleted": len(self.deleted),
}
return data
class LocalSnapshotStore:
"""SQLite-backed local snapshot store for parsed Markdown documents."""
def __init__(self, path: str | Path = DEFAULT_LOCAL_INDEX_PATH) -> None:
self.path = Path(path)
def initialize(self) -> None:
"""Create or migrate the local index schema."""
self.path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
_create_schema(conn)
conn.execute(
"""
insert into meta(key, value) values('schema_version', ?)
on conflict(key) do update set value = excluded.value
""",
(LOCAL_INDEX_SCHEMA_VERSION,),
)
def load_state(self) -> list[SnapshotState]:
"""Load cheap refresh-planning state without loading document JSON."""
if not self.path.exists():
return []
with self._connect() as conn:
_create_schema(conn)
rows = conn.execute(
"""
select path, size, mtime_ns, content_hash, snapshot_id, parser,
parser_version, parse_options_hash, contract_hash, indexed
from sources
order by path
"""
).fetchall()
dependencies = _load_dependencies(conn)
return [
SnapshotState(
path=row["path"],
size=row["size"],
mtime_ns=row["mtime_ns"],
content_hash=row["content_hash"],
snapshot_id=row["snapshot_id"],
parser=row["parser"],
parser_version=row["parser_version"],
parse_options_hash=row["parse_options_hash"],
contract_hash=row["contract_hash"],
indexed=bool(row["indexed"]),
dependencies=dependencies.get(row["path"], []),
)
for row in rows
]
def put_file(
self,
path: str | Path,
*,
root: str | Path = ".",
parse_options: dict[str, Any] | None = None,
contract_hash: str | None = None,
) -> SnapshotState:
"""Parse and persist one Markdown file."""
self.initialize()
file_path = Path(path)
root_path = Path(root).resolve()
relative_path = _relative(file_path, root_path)
identity = snapshot_identity_for_file(
file_path,
parse_options=parse_options,
contract_hash=contract_hash,
)
document = parse_markdown_file(file_path)
metrics = collect_metrics(document).to_dict()
stat = file_path.stat()
now = datetime.now(timezone.utc).isoformat()
provenance = ProvenanceEnvelope(
operation="local_snapshot_store.put_file",
snapshot_id=identity.snapshot_id,
source_path=relative_path,
content_hash=identity.content_hash,
backend_id="local-sqlite",
)
with self._connect() as conn:
_create_schema(conn)
conn.execute(
"""
insert into sources(
path, abs_path, size, mtime_ns, content_hash, snapshot_id,
parser, parser_version, parse_options_hash, contract_hash,
indexed, document_json, frontmatter_json, metrics_json,
provenance_json, updated_at
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?)
on conflict(path) do update set
abs_path = excluded.abs_path,
size = excluded.size,
mtime_ns = excluded.mtime_ns,
content_hash = excluded.content_hash,
snapshot_id = excluded.snapshot_id,
parser = excluded.parser,
parser_version = excluded.parser_version,
parse_options_hash = excluded.parse_options_hash,
contract_hash = excluded.contract_hash,
indexed = excluded.indexed,
document_json = excluded.document_json,
frontmatter_json = excluded.frontmatter_json,
metrics_json = excluded.metrics_json,
provenance_json = excluded.provenance_json,
updated_at = excluded.updated_at
""",
(
relative_path,
str(file_path.resolve()),
stat.st_size,
stat.st_mtime_ns,
identity.content_hash,
identity.snapshot_id,
identity.parser,
identity.parser_version,
identity.parse_options_hash,
identity.contract_hash,
_json(document.to_dict()),
_json(document.frontmatter),
_json(metrics),
_json(provenance.to_dict()),
now,
),
)
_replace_document_units(conn, relative_path, identity.snapshot_id, document.to_dict())
return SnapshotState(
path=relative_path,
size=stat.st_size,
mtime_ns=stat.st_mtime_ns,
content_hash=identity.content_hash,
snapshot_id=identity.snapshot_id,
parser=identity.parser,
parser_version=identity.parser_version,
parse_options_hash=identity.parse_options_hash,
contract_hash=identity.contract_hash,
indexed=True,
)
def update_metadata(self, path: str, *, root: str | Path = ".") -> None:
"""Update file size and mtime when content hash is unchanged."""
file_path = Path(root) / path
stat = file_path.stat()
with self._connect() as conn:
_create_schema(conn)
conn.execute(
"update sources set size = ?, mtime_ns = ?, updated_at = ? where path = ?",
(stat.st_size, stat.st_mtime_ns, datetime.now(timezone.utc).isoformat(), path),
)
def delete_path(self, path: str) -> None:
"""Delete one indexed source and derived rows."""
if not self.path.exists():
return
with self._connect() as conn:
_create_schema(conn)
conn.execute("delete from blocks where path = ?", (path,))
conn.execute("delete from sections where path = ?", (path,))
conn.execute("delete from headings where path = ?", (path,))
conn.execute("delete from dependencies where path = ?", (path,))
conn.execute("delete from sources where path = ?", (path,))
def get_document(self, path: str) -> dict[str, Any]:
"""Return stored document JSON for a relative source path."""
with self._connect() as conn:
_create_schema(conn)
row = conn.execute(
"select document_json from sources where path = ?",
(path,),
).fetchone()
if row is None:
raise KeyError(f"No indexed document `{path}`")
return json.loads(row["document_json"])
def build(
self,
paths: list[str | Path],
*,
root: str | Path = ".",
recursive: bool = True,
parse_options: dict[str, Any] | None = None,
contract_hash: str | None = None,
verify_hashes: bool = True,
) -> LocalIndexBuildResult:
"""Incrementally build or refresh the local index."""
self.initialize()
root_path = Path(root).resolve()
plan = plan_snapshot_refresh(
paths,
previous=self.load_state(),
root=root_path,
recursive=recursive,
parse_options=parse_options,
contract_hash=contract_hash,
verify_hashes=verify_hashes,
)
current_files = {
_relative(path, root_path): path
for path in scan_markdown_files(paths, recursive=recursive)
}
parsed: list[str] = []
indexed: list[str] = []
metadata_updated: list[str] = []
deleted: list[str] = []
for entry in plan.entries:
if "delete" in entry.actions:
self.delete_path(entry.path)
deleted.append(entry.path)
continue
if "parse" in entry.actions or "index" in entry.actions:
file_path = current_files.get(entry.path)
if file_path is None:
continue
self.put_file(
file_path,
root=root_path,
parse_options=parse_options,
contract_hash=contract_hash,
)
if "parse" in entry.actions:
parsed.append(entry.path)
if "index" in entry.actions:
indexed.append(entry.path)
continue
if "metadata" in entry.actions:
self.update_metadata(entry.path, root=root_path)
metadata_updated.append(entry.path)
return LocalIndexBuildResult(
index_path=str(self.path),
root=str(root_path),
paths=[str(path) for path in paths],
planned=plan.to_dict(),
parsed=parsed,
indexed=indexed,
metadata_updated=metadata_updated,
deleted=deleted,
)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.path)
conn.row_factory = sqlite3.Row
conn.execute("pragma foreign_keys = on")
return conn
def local_index_path_for(root: str | Path, index_path: str | Path | None = None) -> Path:
"""Return the local SQLite index path for a root and optional override."""
path = Path(index_path or DEFAULT_LOCAL_INDEX_PATH)
if path.is_absolute():
return path
return Path(root) / path
def _create_schema(conn: sqlite3.Connection) -> None:
conn.executescript(
"""
create table if not exists meta(
key text primary key,
value text not null
);
create table if not exists sources(
path text primary key,
abs_path text not null,
size integer not null,
mtime_ns integer not null,
content_hash text not null,
snapshot_id text not null unique,
parser text not null,
parser_version text not null,
parse_options_hash text not null,
contract_hash text,
indexed integer not null default 1,
document_json text not null,
frontmatter_json text not null,
metrics_json text not null,
provenance_json text not null,
updated_at text not null
);
create table if not exists headings(
snapshot_id text not null,
path text not null,
idx integer not null,
level integer not null,
text text not null,
line integer not null,
primary key(snapshot_id, idx)
);
create table if not exists sections(
snapshot_id text not null,
path text not null,
idx integer not null,
heading_text text not null,
heading_level integer not null,
line integer not null,
text text not null,
line_start integer,
line_end integer,
primary key(snapshot_id, idx)
);
create table if not exists blocks(
snapshot_id text not null,
path text not null,
idx integer not null,
type text not null,
text text not null,
line_start integer,
line_end integer,
heading_level integer,
primary key(snapshot_id, idx)
);
create table if not exists dependencies(
path text not null,
source_id text not null,
target text not null,
kind text not null,
target_snapshot_id text,
metadata_json text not null default '{}'
);
create index if not exists idx_sources_content_hash on sources(content_hash);
create index if not exists idx_sources_snapshot_id on sources(snapshot_id);
create index if not exists idx_sources_parser on sources(parser, parser_version);
create index if not exists idx_headings_path on headings(path);
create index if not exists idx_sections_path on sections(path);
create index if not exists idx_blocks_path on blocks(path);
create index if not exists idx_dependencies_target on dependencies(target);
"""
)
def _replace_document_units(
conn: sqlite3.Connection,
path: str,
snapshot_id: str,
document: dict[str, Any],
) -> None:
conn.execute("delete from blocks where path = ?", (path,))
conn.execute("delete from sections where path = ?", (path,))
conn.execute("delete from headings where path = ?", (path,))
for idx, heading in enumerate(document.get("headings", [])):
conn.execute(
"""
insert into headings(snapshot_id, path, idx, level, text, line)
values (?, ?, ?, ?, ?, ?)
""",
(
snapshot_id,
path,
idx,
int(heading["level"]),
str(heading["text"]),
int(heading["line"]),
),
)
for idx, section in enumerate(document.get("sections", [])):
heading = section["heading"]
text = "\n\n".join(str(block.get("text", "")) for block in section.get("blocks", []))
line_start = _first_present(block.get("line_start") for block in section.get("blocks", []))
line_end = _last_present(block.get("line_end") for block in section.get("blocks", []))
conn.execute(
"""
insert into sections(
snapshot_id, path, idx, heading_text, heading_level, line,
text, line_start, line_end
) values (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
snapshot_id,
path,
idx,
str(heading["text"]),
int(heading["level"]),
int(heading["line"]),
text,
line_start,
line_end,
),
)
for idx, block in enumerate(document.get("blocks", [])):
conn.execute(
"""
insert into blocks(
snapshot_id, path, idx, type, text, line_start, line_end, heading_level
) values (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
snapshot_id,
path,
idx,
str(block["type"]),
str(block.get("text", "")),
block.get("line_start"),
block.get("line_end"),
block.get("heading_level"),
),
)
def _load_dependencies(conn: sqlite3.Connection) -> dict[str, list[DependencyEdge]]:
rows = conn.execute(
"""
select path, source_id, target, kind, target_snapshot_id, metadata_json
from dependencies
order by path, source_id, target
"""
).fetchall()
dependencies: dict[str, list[DependencyEdge]] = {}
for row in rows:
dependencies.setdefault(row["path"], []).append(
DependencyEdge(
source_id=row["source_id"],
target=row["target"],
kind=row["kind"],
target_snapshot_id=row["target_snapshot_id"],
metadata=json.loads(row["metadata_json"] or "{}"),
)
)
return dependencies
def _relative(path: Path, root: Path) -> str:
resolved = path.resolve()
try:
return resolved.relative_to(root).as_posix()
except ValueError:
return resolved.as_posix()
def _json(data: Any) -> str:
return json.dumps(data, sort_keys=True, ensure_ascii=False)
def _first_present(values: Any) -> int | None:
for value in values:
if value is not None:
return int(value)
return None
def _last_present(values: Any) -> int | None:
found: int | None = None
for value in values:
if value is not None:
found = int(value)
return found