Files
markitect-tool/tests/test_backend_refresh_planning.py

164 lines
4.9 KiB
Python

import os
from pathlib import Path
from click.testing import CliRunner
from markitect_tool.backend import (
DependencyEdge,
SnapshotState,
load_snapshot_state_file,
plan_snapshot_refresh,
)
from markitect_tool.cli import main
def test_refresh_plan_marks_all_files_new_without_previous_state(tmp_path: Path):
source = tmp_path / "doc.md"
source.write_text("# Doc\n", encoding="utf-8")
plan = plan_snapshot_refresh([tmp_path], root=tmp_path)
assert plan.dirty
assert plan.needs_hash == ["doc.md"]
assert plan.needs_parse == ["doc.md"]
assert plan.needs_index == ["doc.md"]
def test_refresh_plan_uses_cheap_metadata_for_unchanged_file(tmp_path: Path):
source = tmp_path / "doc.md"
source.write_text("# Doc\n", encoding="utf-8")
stat = source.stat()
previous = SnapshotState(
path="doc.md",
size=stat.st_size,
mtime_ns=stat.st_mtime_ns,
content_hash="sha256:known",
snapshot_id="snapshot:known",
)
plan = plan_snapshot_refresh([tmp_path], previous=[previous], root=tmp_path)
assert not plan.dirty
assert plan.unchanged == ["doc.md"]
assert plan.needs_hash == []
def test_refresh_plan_can_hash_metadata_changed_file_and_skip_parse_if_content_same(tmp_path: Path):
source = tmp_path / "doc.md"
source.write_text("# Doc\n", encoding="utf-8")
stat = source.stat()
content_hash = _hash_file(source)
previous = SnapshotState(
path="doc.md",
size=stat.st_size,
mtime_ns=stat.st_mtime_ns,
content_hash=content_hash,
snapshot_id="snapshot:known",
)
os.utime(source, ns=(stat.st_atime_ns + 1_000_000_000, stat.st_mtime_ns + 1_000_000_000))
plan = plan_snapshot_refresh(
[tmp_path],
previous=[previous],
root=tmp_path,
verify_hashes=True,
)
assert plan.needs_hash == ["doc.md"]
assert plan.needs_metadata_update == ["doc.md"]
assert plan.needs_parse == []
assert plan.needs_index == []
def test_refresh_plan_invalidates_transitive_dependents(tmp_path: Path):
source = tmp_path / "source.md"
dependent = tmp_path / "dependent.md"
transitive = tmp_path / "transitive.md"
source.write_text("# Source changed\n", encoding="utf-8")
dependent.write_text("# Dependent\n", encoding="utf-8")
transitive.write_text("# Transitive\n", encoding="utf-8")
source_stat = source.stat()
dependent_stat = dependent.stat()
transitive_stat = transitive.stat()
previous = [
SnapshotState(
path="source.md",
size=1,
mtime_ns=1,
content_hash="sha256:old",
snapshot_id="snapshot:source",
),
SnapshotState(
path="dependent.md",
size=dependent_stat.st_size,
mtime_ns=dependent_stat.st_mtime_ns,
content_hash=_hash_file(dependent),
snapshot_id="snapshot:dependent",
dependencies=[
DependencyEdge(source_id="snapshot:dependent", target="source.md", kind="reference")
],
),
SnapshotState(
path="transitive.md",
size=transitive_stat.st_size,
mtime_ns=transitive_stat.st_mtime_ns,
content_hash=_hash_file(transitive),
snapshot_id="snapshot:transitive",
dependencies=[
DependencyEdge(source_id="snapshot:transitive", target="dependent.md", kind="reference")
],
),
]
plan = plan_snapshot_refresh([tmp_path], previous=previous, root=tmp_path)
assert plan.needs_parse == ["source.md"]
assert plan.invalidated == ["dependent.md", "transitive.md"]
entries = {entry.path: entry for entry in plan.entries}
assert entries["dependent.md"].invalidated_by == ["source.md"]
assert entries["transitive.md"].invalidated_by == ["dependent.md"]
assert source_stat.st_size != 1
def test_snapshot_state_file_and_cli_refresh_plan(tmp_path: Path):
source = tmp_path / "doc.md"
state_file = tmp_path / "state.yaml"
source.write_text("# Doc\n", encoding="utf-8")
stat = source.stat()
state_file.write_text(
f"""snapshots:
- path: doc.md
size: {stat.st_size}
mtime_ns: {stat.st_mtime_ns}
content_hash: {_hash_file(source)}
snapshot_id: snapshot:known
""",
encoding="utf-8",
)
states = load_snapshot_state_file(state_file)
result = CliRunner().invoke(
main,
[
"backend",
"refresh-plan",
str(tmp_path),
"--root",
str(tmp_path),
"--state",
str(state_file),
],
)
assert states[0].path == "doc.md"
assert result.exit_code == 0
assert "clean" in result.output
assert "unchanged: 1" in result.output
def _hash_file(path: Path) -> str:
import hashlib
return "sha256:" + hashlib.sha256(path.read_bytes()).hexdigest()