feat(infospace): systematic long-text processing — rich commit bodies, per-source eval/classify, chapters view
Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled

Three coordinated changes that let the pipeline produce a clean
chapter-by-chapter git history on long texts without archaeology after
the fact.

1. Richer commit messages. `SourcePipeline._git_commit` now diffs the
   staged changes, buckets added files by output subdirectory (entities,
   evaluations, classifications, mappings, analyses, metrics, logs), and
   includes counts in the commit body. So `git log` reads "entities:
   +23, evaluations: +23" per chapter instead of the same generic blurb
   on every commit. Zero behaviour change when no output changed; falls
   back to the original message if the diff query fails.

2. --eval-after-source / --classify-after-source on `infospace process`.
   After a source's stages succeed, the pipeline identifies which entity
   files are *new* (set diff of entity slugs before vs after), loads
   their EntityMeta, and runs per-entity evaluation and/or
   classification scoped to just those slugs before the per-source git
   commit lands. Result: each chapter's commit is self-contained —
   extraction + evaluation + classification in one atomic unit. Gated
   behind explicit flags because the cost is real (LLM latency per
   chapter rather than amortised across one bulk batch).

3. `markitect infospace chapters` subcommand. Lists source files in
   canonical order with entity count, evaluated count, classified
   count, and mean per-entity score per source. Text or JSON output.
   Natural triage surface for long-text infospaces — spot chapters that
   under-extracted or evaluated poorly.

Also: `docs/advanced-usage.md` gets a new "Systematic processing of
long texts" section with the recommended flag combo and the tradeoff
note on cost.

11 new unit tests cover the chapters command (text/json/no-sources),
the process flag wiring (help + provider requirement), and the
commit-body bucket logic. Full infospace+llm unit suite (315 tests)
green; 3 pre-existing infospace failures unchanged.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-22 08:24:26 +02:00
parent 9e8d73fa7d
commit e3e5b8ecc1
4 changed files with 501 additions and 4 deletions

View File

@@ -7,8 +7,9 @@ inspecting, and evaluating infospaces.
from __future__ import annotations
import re
from pathlib import Path
from typing import Optional
from typing import Dict, Optional
import click
@@ -228,6 +229,134 @@ def _entities_by_type(cfg, root: "Path", entity_list: list) -> None:
click.echo(f"\nTotal: {total} entities")
# ── chapters (per-source triage view) ────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option(
"--format", "output_format",
type=click.Choice(["text", "json"]),
default="text",
help="Output format.",
)
def chapters(config_path: Optional[str], output_format: str):
"""List source files in canonical order with per-source stats.
For each source file in the sources directory, reports entity count,
mean per-entity score (if evaluated), classification coverage, and
processing status. Useful for triaging long-text infospaces.
"""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
sources_dir = root / cfg.topic.sources if cfg.topic.sources else root
if not sources_dir.is_dir():
click.echo(f"No sources directory at {sources_dir}.", err=True)
raise SystemExit(1)
source_files = sorted(sources_dir.glob("*.md"))
if not source_files:
click.echo(f"No source files in {sources_dir}.", err=True)
raise SystemExit(1)
entities_dir = root / cfg.entities_dir
entity_list = (
parse_entity_directory(entities_dir) if entities_dir.is_dir() else []
)
# Build a source_id → [entities] map using the source_chapter field.
# Matching is lenient: entities with a source_chapter substring-equal
# to a normalized form of the source stem count as belonging to it.
def _chapter_keys(source_id: str) -> list:
"""Return strings an entity's source_chapter might contain."""
keys = [source_id, source_id.replace("-", " ")]
m = re.match(r"book-(\d+)-chapter-(\d+)", source_id)
if m:
book, chap = m.group(1), m.group(2)
roman = {"1": "I", "2": "II", "3": "III", "4": "IV", "5": "V"}
if book in roman:
keys.append(f"Book {roman[book]}, Chapter {int(chap)}")
keys.append(f"Book {roman[book]} Chapter {int(chap)}")
return keys
# Precompute evaluation scores and classification slugs once.
evals_dir = root / cfg.evaluations_dir
cls_dir = root / cfg.classifications_dir
eval_scores: Dict[str, float] = {}
if evals_dir.is_dir():
from markitect.infospace.evaluation_io import read_entity_evaluation
for ev_path in evals_dir.glob("*.md"):
try:
ev = read_entity_evaluation(ev_path)
if ev.overall_score is not None:
eval_scores[ev_path.stem] = ev.overall_score
except Exception:
continue
classified_slugs = (
{p.stem for p in cls_dir.glob("*.md")} if cls_dir.is_dir() else set()
)
rows = []
for source_file in source_files:
source_id = source_file.stem
keys = _chapter_keys(source_id)
matched = [
e for e in entity_list
if any(k.lower() in (e.source_chapter or "").lower() for k in keys)
]
slugs = {e.slug for e in matched}
evaluated = slugs & set(eval_scores)
classified = slugs & classified_slugs
mean = (
sum(eval_scores[s] for s in evaluated) / len(evaluated)
if evaluated else None
)
rows.append({
"source_id": source_id,
"entities": len(matched),
"evaluated": len(evaluated),
"classified": len(classified),
"mean_score": round(mean, 2) if mean is not None else None,
})
if output_format == "json":
import json
click.echo(json.dumps(rows, indent=2))
return
# Text: aligned table.
headers = ("source", "entities", "evaluated", "classified", "mean_score")
widths = [
max(len(h), max((len(str(r[h.replace(' ', '_')])) if h != "source"
else len(r["source_id"]))
for r in rows)) if rows else len(h)
for h in headers
]
fmt = " ".join(f"{{:<{w}}}" for w in widths)
click.echo(fmt.format(*headers))
click.echo(fmt.format(*("-" * w for w in widths)))
for r in rows:
click.echo(fmt.format(
r["source_id"],
r["entities"],
r["evaluated"],
r["classified"],
"-" if r["mean_score"] is None else f"{r['mean_score']:.2f}",
))
totals = {
"entities": sum(r["entities"] for r in rows),
"evaluated": sum(r["evaluated"] for r in rows),
"classified": sum(r["classified"] for r in rows),
}
click.echo(
f"\n{len(rows)} source file(s); "
f"{totals['entities']} entities, "
f"{totals['evaluated']} evaluated, "
f"{totals['classified']} classified."
)
# ── entity (single lookup) ───────────────────────────────────────────
@@ -1167,6 +1296,18 @@ def disciplines(config_path: Optional[str]):
help="Run collection checks (C1C5) after each source file.",
)
@click.option("--no-commit", is_flag=True, help="Skip git commits.")
@click.option(
"--eval-after-source",
is_flag=True,
help="After each source's stages succeed, evaluate just the newly-"
"added entities so the per-source commit is self-contained.",
)
@click.option(
"--classify-after-source",
is_flag=True,
help="After each source's stages succeed, classify just the newly-"
"added entities so the per-source commit is self-contained.",
)
def process(
glob_pattern: Optional[str],
process_all: bool,
@@ -1175,6 +1316,8 @@ def process(
model: Optional[str],
check_after_each: bool,
no_commit: bool,
eval_after_source: bool,
classify_after_source: bool,
):
"""Process source files through the pipeline defined in infospace.yaml.
@@ -1248,12 +1391,22 @@ def process(
# Run pipeline
from markitect.infospace.pipeline import SourcePipeline
if (eval_after_source or classify_after_source) and adapter is None:
click.echo(
"Error: --eval-after-source / --classify-after-source require "
"--provider (they call the LLM).",
err=True,
)
raise SystemExit(1)
pipeline = SourcePipeline(
cfg, root,
adapter=adapter,
provider=provider or "",
model=(model or _PROVIDER_DEFAULTS.get(provider or "", "")) if provider else "",
no_commit=no_commit,
eval_after_source=eval_after_source,
classify_after_source=classify_after_source,
)
total = len(source_files)

View File

@@ -62,6 +62,8 @@ class SourcePipeline:
provider: str = "",
model: str = "",
no_commit: bool = False,
eval_after_source: bool = False,
classify_after_source: bool = False,
) -> None:
self.config = config
self.root = root
@@ -69,6 +71,8 @@ class SourcePipeline:
self.provider = provider
self.model = model
self.no_commit = no_commit
self.eval_after_source = eval_after_source
self.classify_after_source = classify_after_source
# ── Public API ────────────────────────────────────────────────────
@@ -110,6 +114,12 @@ class SourcePipeline:
stage_outputs: Dict[str, str] = {}
stage_logs: List[Dict[str, Any]] = []
# Snapshot entity slugs before any stage runs so we can identify
# which entities were newly produced by this source. Used to scope
# --eval-after-source / --classify-after-source to only the new
# entities.
pre_entity_slugs = self._current_entity_slugs()
print(f"\nProcessing: {source_id}")
print("=" * 60)
@@ -133,6 +143,14 @@ class SourcePipeline:
print(f"\n {source_id}: all stages complete.")
self._write_processing_log(source_id, stage_logs, success=True)
# Per-source follow-ups: evaluate and/or classify just the new
# entities this source produced, so the next commit contains a
# fully-processed chapter.
new_slugs = self._current_entity_slugs() - pre_entity_slugs
if new_slugs and (self.eval_after_source or self.classify_after_source):
self._run_per_source_followups(new_slugs)
if not self.no_commit:
self._git_commit(source_id)
@@ -636,7 +654,13 @@ class SourcePipeline:
# ── Git Integration ───────────────────────────────────────────────
def _git_commit(self, source_id: str) -> None:
"""Stage all output changes and commit them for *source_id*."""
"""Stage all output changes and commit them for *source_id*.
The commit message body summarises what actually changed — counts
of entities / evaluations / classifications / analyses added — so
``git log`` reads like the chapter-by-chapter story of the
infospace growing, not a wall of identical messages.
"""
output_dir = self.root / "output"
try:
subprocess.run(
@@ -645,11 +669,11 @@ class SourcePipeline:
check=True,
capture_output=True,
)
body = self._compose_commit_body(source_id)
result = subprocess.run(
[
"git", "commit", "-m",
f"infospace: process {source_id}\n\n"
f"Extract entities, map to VSM, and synthesize analysis.",
f"infospace: process {source_id}\n\n{body}",
],
cwd=str(self.root),
capture_output=True,
@@ -666,3 +690,146 @@ class SourcePipeline:
except subprocess.CalledProcessError as e:
stderr = e.stderr.decode() if isinstance(e.stderr, bytes) else (e.stderr or "")
print(f" Warning: Git error: {stderr.strip()}")
# ── Per-source helpers ────────────────────────────────────────────
def _current_entity_slugs(self) -> set:
"""Return the set of entity file stems currently on disk."""
entities_dir = self.root / self.config.entities_dir
if not entities_dir.is_dir():
return set()
return {p.stem for p in entities_dir.glob("*.md")}
def _run_per_source_followups(self, new_slugs: set) -> None:
"""Run per-source evaluation and/or classification on *new_slugs*.
Called after a source's pipeline stages succeed, before the git
commit, so each chapter's commit contains the full set of
artefacts derived from it.
"""
from markitect.infospace.entity_parser import parse_entity_directory
entities_dir = self.root / self.config.entities_dir
all_entities = parse_entity_directory(entities_dir)
new_entities = [e for e in all_entities if e.slug in new_slugs]
if not new_entities:
return
if self.adapter is None:
print(
" Skipping per-source eval/classify: no LLM adapter "
"configured (run with --provider)."
)
return
from markitect.prompts.execution.models import RunConfig
run_config = RunConfig(
model_name=self.model or None, temperature=0.3, max_tokens=2000
)
if self.eval_after_source:
from markitect.infospace.evaluate import run_entity_evaluation
print(f" Evaluating {len(new_entities)} new entity/entities…")
try:
run_entity_evaluation(
config=self.config,
entities=new_entities,
adapter=self.adapter,
run_config=run_config,
output_dir=self.root / self.config.evaluations_dir,
)
except Exception as exc:
print(f" Warning: per-source evaluation failed: {exc}")
if self.classify_after_source:
from markitect.infospace.classifier import run_entity_classification
print(f" Classifying {len(new_entities)} new entity/entities…")
try:
run_entity_classification(
config=self.config,
entities=new_entities,
adapter=self.adapter,
run_config=run_config,
output_dir=self.root / self.config.classifications_dir,
)
except Exception as exc:
print(f" Warning: per-source classification failed: {exc}")
def _compose_commit_body(self, source_id: str) -> str:
"""Summarise staged output changes into a commit-message body.
Counts added files per output subdirectory (entities, evaluations,
classifications, analyses, mappings…) and produces one line per
bucket that actually saw additions. Modified/deleted files are
noted separately for auditability.
"""
default = "Extract entities, map to VSM, and synthesize analysis."
try:
result = subprocess.run(
["git", "diff", "--cached", "--name-status", "--", "output"],
cwd=str(self.root),
check=True,
capture_output=True,
text=True,
)
except subprocess.CalledProcessError:
return default
added_by_bucket: Dict[str, int] = {}
modified = 0
deleted = 0
for line in result.stdout.splitlines():
parts = line.split("\t")
if len(parts) < 2:
continue
status = parts[0]
path = parts[-1]
if status.startswith("A"):
bucket = self._bucket_for(path)
if bucket:
added_by_bucket[bucket] = added_by_bucket.get(bucket, 0) + 1
elif status.startswith("M"):
modified += 1
elif status.startswith("D"):
deleted += 1
if not added_by_bucket and not modified and not deleted:
return default
# Emit buckets in a deterministic, reader-friendly order.
order = ["entities", "mappings", "analyses", "evaluations",
"classifications", "metrics", "logs", "other"]
lines: List[str] = []
for bucket in order:
n = added_by_bucket.get(bucket, 0)
if n:
lines.append(f"- {bucket}: +{n}")
if modified:
lines.append(f"- modified: {modified}")
if deleted:
lines.append(f"- deleted: {deleted}")
return "\n".join(lines) if lines else default
def _bucket_for(self, path: str) -> Optional[str]:
"""Map an ``output/...`` path to a commit-summary bucket name."""
# Use configured directory basenames where possible so non-default
# layouts still bucket correctly.
buckets = {
Path(self.config.entities_dir).name: "entities",
Path(self.config.evaluations_dir).name: "evaluations",
Path(self.config.classifications_dir).name: "classifications",
}
parts = Path(path).parts
if len(parts) < 2 or parts[0] != "output":
return None
sub = parts[1]
if sub in buckets:
return buckets[sub]
# Heuristic fallback for common additional output subdirectories.
known = {"mappings", "analyses", "metrics", "logs"}
if sub in known:
return sub
return "other"