diff --git a/examples/infospace-with-history/docs/advanced-usage.md b/examples/infospace-with-history/docs/advanced-usage.md index cff5d36e..a4813f6a 100644 --- a/examples/infospace-with-history/docs/advanced-usage.md +++ b/examples/infospace-with-history/docs/advanced-usage.md @@ -171,6 +171,57 @@ you need to look at, rather than a bare ratio. --- +## 5. Systematic processing of long texts + +For long source material (books, multi-chapter specifications, corpora), the +pipeline can produce a clean chapter-by-chapter git history on its own if +you let it. The pattern: + +```bash +# Process all sources in canonical order, eval and classify per chapter, +# snapshot metrics after each chapter. +markitect infospace process --all \ + --provider openrouter \ + --eval-after-source \ + --classify-after-source \ + --check-after-each +``` + +What you get: + +- **One commit per source file**, not per batch run. The commit message body + lists counts by bucket (`entities: +23`, `evaluations: +23`, + `classifications: +23`) derived from the actual staged diff, so `git log` + reads like the story of the infospace growing. +- **Chapter-atomic commits.** `--eval-after-source` and + `--classify-after-source` evaluate and classify *only the new entities* + from the just-processed source before the commit lands, so each commit is + a self-contained chapter snapshot. +- **Metrics-per-chapter trail.** `--check-after-each` appends a snapshot to + `output/metrics/history.yaml` after every chapter, so `markitect infospace + history` later shows the metric trajectory rather than just start/end. + +**Cost tradeoff.** `--eval-after-source` pays LLM latency per chapter rather +than amortising it across one bulk batch. It's worth it when you care about +the git history or want early quality signal, not when you're bulk-backfilling +a known-good corpus. + +**Triage during the run.** While processing, use `markitect infospace +chapters` in another shell to see per-source entity/eval/classify counts and +mean scores — handy for spotting chapters that under-extracted or evaluated +poorly. + +``` +$ markitect infospace chapters +source entities evaluated classified mean_score +------------------- -------- --------- ---------- ---------- +book-1-chapter-01 96 96 79 4.22 +book-1-chapter-02 16 16 10 4.06 +… +``` + +--- + ## See also - `METRICS-METHODOLOGY.md` — how each metric is computed. diff --git a/markitect/infospace/cli.py b/markitect/infospace/cli.py index 5c7062c6..1c6f7f6c 100644 --- a/markitect/infospace/cli.py +++ b/markitect/infospace/cli.py @@ -7,8 +7,9 @@ inspecting, and evaluating infospaces. from __future__ import annotations +import re from pathlib import Path -from typing import Optional +from typing import Dict, Optional import click @@ -228,6 +229,134 @@ def _entities_by_type(cfg, root: "Path", entity_list: list) -> None: click.echo(f"\nTotal: {total} entities") +# ── chapters (per-source triage view) ──────────────────────────────── + + +@infospace_commands.command() +@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") +@click.option( + "--format", "output_format", + type=click.Choice(["text", "json"]), + default="text", + help="Output format.", +) +def chapters(config_path: Optional[str], output_format: str): + """List source files in canonical order with per-source stats. + + For each source file in the sources directory, reports entity count, + mean per-entity score (if evaluated), classification coverage, and + processing status. Useful for triaging long-text infospaces. + """ + cfg, cfg_path = _load_config_or_exit(config_path) + root = cfg_path.parent + + sources_dir = root / cfg.topic.sources if cfg.topic.sources else root + if not sources_dir.is_dir(): + click.echo(f"No sources directory at {sources_dir}.", err=True) + raise SystemExit(1) + + source_files = sorted(sources_dir.glob("*.md")) + if not source_files: + click.echo(f"No source files in {sources_dir}.", err=True) + raise SystemExit(1) + + entities_dir = root / cfg.entities_dir + entity_list = ( + parse_entity_directory(entities_dir) if entities_dir.is_dir() else [] + ) + + # Build a source_id → [entities] map using the source_chapter field. + # Matching is lenient: entities with a source_chapter substring-equal + # to a normalized form of the source stem count as belonging to it. + def _chapter_keys(source_id: str) -> list: + """Return strings an entity's source_chapter might contain.""" + keys = [source_id, source_id.replace("-", " ")] + m = re.match(r"book-(\d+)-chapter-(\d+)", source_id) + if m: + book, chap = m.group(1), m.group(2) + roman = {"1": "I", "2": "II", "3": "III", "4": "IV", "5": "V"} + if book in roman: + keys.append(f"Book {roman[book]}, Chapter {int(chap)}") + keys.append(f"Book {roman[book]} Chapter {int(chap)}") + return keys + + # Precompute evaluation scores and classification slugs once. + evals_dir = root / cfg.evaluations_dir + cls_dir = root / cfg.classifications_dir + eval_scores: Dict[str, float] = {} + if evals_dir.is_dir(): + from markitect.infospace.evaluation_io import read_entity_evaluation + for ev_path in evals_dir.glob("*.md"): + try: + ev = read_entity_evaluation(ev_path) + if ev.overall_score is not None: + eval_scores[ev_path.stem] = ev.overall_score + except Exception: + continue + classified_slugs = ( + {p.stem for p in cls_dir.glob("*.md")} if cls_dir.is_dir() else set() + ) + + rows = [] + for source_file in source_files: + source_id = source_file.stem + keys = _chapter_keys(source_id) + matched = [ + e for e in entity_list + if any(k.lower() in (e.source_chapter or "").lower() for k in keys) + ] + slugs = {e.slug for e in matched} + evaluated = slugs & set(eval_scores) + classified = slugs & classified_slugs + mean = ( + sum(eval_scores[s] for s in evaluated) / len(evaluated) + if evaluated else None + ) + rows.append({ + "source_id": source_id, + "entities": len(matched), + "evaluated": len(evaluated), + "classified": len(classified), + "mean_score": round(mean, 2) if mean is not None else None, + }) + + if output_format == "json": + import json + click.echo(json.dumps(rows, indent=2)) + return + + # Text: aligned table. + headers = ("source", "entities", "evaluated", "classified", "mean_score") + widths = [ + max(len(h), max((len(str(r[h.replace(' ', '_')])) if h != "source" + else len(r["source_id"])) + for r in rows)) if rows else len(h) + for h in headers + ] + fmt = " ".join(f"{{:<{w}}}" for w in widths) + click.echo(fmt.format(*headers)) + click.echo(fmt.format(*("-" * w for w in widths))) + for r in rows: + click.echo(fmt.format( + r["source_id"], + r["entities"], + r["evaluated"], + r["classified"], + "-" if r["mean_score"] is None else f"{r['mean_score']:.2f}", + )) + totals = { + "entities": sum(r["entities"] for r in rows), + "evaluated": sum(r["evaluated"] for r in rows), + "classified": sum(r["classified"] for r in rows), + } + click.echo( + f"\n{len(rows)} source file(s); " + f"{totals['entities']} entities, " + f"{totals['evaluated']} evaluated, " + f"{totals['classified']} classified." + ) + + # ── entity (single lookup) ─────────────────────────────────────────── @@ -1167,6 +1296,18 @@ def disciplines(config_path: Optional[str]): help="Run collection checks (C1–C5) after each source file.", ) @click.option("--no-commit", is_flag=True, help="Skip git commits.") +@click.option( + "--eval-after-source", + is_flag=True, + help="After each source's stages succeed, evaluate just the newly-" + "added entities so the per-source commit is self-contained.", +) +@click.option( + "--classify-after-source", + is_flag=True, + help="After each source's stages succeed, classify just the newly-" + "added entities so the per-source commit is self-contained.", +) def process( glob_pattern: Optional[str], process_all: bool, @@ -1175,6 +1316,8 @@ def process( model: Optional[str], check_after_each: bool, no_commit: bool, + eval_after_source: bool, + classify_after_source: bool, ): """Process source files through the pipeline defined in infospace.yaml. @@ -1248,12 +1391,22 @@ def process( # Run pipeline from markitect.infospace.pipeline import SourcePipeline + if (eval_after_source or classify_after_source) and adapter is None: + click.echo( + "Error: --eval-after-source / --classify-after-source require " + "--provider (they call the LLM).", + err=True, + ) + raise SystemExit(1) + pipeline = SourcePipeline( cfg, root, adapter=adapter, provider=provider or "", model=(model or _PROVIDER_DEFAULTS.get(provider or "", "")) if provider else "", no_commit=no_commit, + eval_after_source=eval_after_source, + classify_after_source=classify_after_source, ) total = len(source_files) diff --git a/markitect/infospace/pipeline.py b/markitect/infospace/pipeline.py index 030ce264..173da866 100644 --- a/markitect/infospace/pipeline.py +++ b/markitect/infospace/pipeline.py @@ -62,6 +62,8 @@ class SourcePipeline: provider: str = "", model: str = "", no_commit: bool = False, + eval_after_source: bool = False, + classify_after_source: bool = False, ) -> None: self.config = config self.root = root @@ -69,6 +71,8 @@ class SourcePipeline: self.provider = provider self.model = model self.no_commit = no_commit + self.eval_after_source = eval_after_source + self.classify_after_source = classify_after_source # ── Public API ──────────────────────────────────────────────────── @@ -110,6 +114,12 @@ class SourcePipeline: stage_outputs: Dict[str, str] = {} stage_logs: List[Dict[str, Any]] = [] + # Snapshot entity slugs before any stage runs so we can identify + # which entities were newly produced by this source. Used to scope + # --eval-after-source / --classify-after-source to only the new + # entities. + pre_entity_slugs = self._current_entity_slugs() + print(f"\nProcessing: {source_id}") print("=" * 60) @@ -133,6 +143,14 @@ class SourcePipeline: print(f"\n {source_id}: all stages complete.") self._write_processing_log(source_id, stage_logs, success=True) + + # Per-source follow-ups: evaluate and/or classify just the new + # entities this source produced, so the next commit contains a + # fully-processed chapter. + new_slugs = self._current_entity_slugs() - pre_entity_slugs + if new_slugs and (self.eval_after_source or self.classify_after_source): + self._run_per_source_followups(new_slugs) + if not self.no_commit: self._git_commit(source_id) @@ -636,7 +654,13 @@ class SourcePipeline: # ── Git Integration ─────────────────────────────────────────────── def _git_commit(self, source_id: str) -> None: - """Stage all output changes and commit them for *source_id*.""" + """Stage all output changes and commit them for *source_id*. + + The commit message body summarises what actually changed — counts + of entities / evaluations / classifications / analyses added — so + ``git log`` reads like the chapter-by-chapter story of the + infospace growing, not a wall of identical messages. + """ output_dir = self.root / "output" try: subprocess.run( @@ -645,11 +669,11 @@ class SourcePipeline: check=True, capture_output=True, ) + body = self._compose_commit_body(source_id) result = subprocess.run( [ "git", "commit", "-m", - f"infospace: process {source_id}\n\n" - f"Extract entities, map to VSM, and synthesize analysis.", + f"infospace: process {source_id}\n\n{body}", ], cwd=str(self.root), capture_output=True, @@ -666,3 +690,146 @@ class SourcePipeline: except subprocess.CalledProcessError as e: stderr = e.stderr.decode() if isinstance(e.stderr, bytes) else (e.stderr or "") print(f" Warning: Git error: {stderr.strip()}") + + # ── Per-source helpers ──────────────────────────────────────────── + + def _current_entity_slugs(self) -> set: + """Return the set of entity file stems currently on disk.""" + entities_dir = self.root / self.config.entities_dir + if not entities_dir.is_dir(): + return set() + return {p.stem for p in entities_dir.glob("*.md")} + + def _run_per_source_followups(self, new_slugs: set) -> None: + """Run per-source evaluation and/or classification on *new_slugs*. + + Called after a source's pipeline stages succeed, before the git + commit, so each chapter's commit contains the full set of + artefacts derived from it. + """ + from markitect.infospace.entity_parser import parse_entity_directory + + entities_dir = self.root / self.config.entities_dir + all_entities = parse_entity_directory(entities_dir) + new_entities = [e for e in all_entities if e.slug in new_slugs] + if not new_entities: + return + + if self.adapter is None: + print( + " Skipping per-source eval/classify: no LLM adapter " + "configured (run with --provider)." + ) + return + + from markitect.prompts.execution.models import RunConfig + + run_config = RunConfig( + model_name=self.model or None, temperature=0.3, max_tokens=2000 + ) + + if self.eval_after_source: + from markitect.infospace.evaluate import run_entity_evaluation + + print(f" Evaluating {len(new_entities)} new entity/entities…") + try: + run_entity_evaluation( + config=self.config, + entities=new_entities, + adapter=self.adapter, + run_config=run_config, + output_dir=self.root / self.config.evaluations_dir, + ) + except Exception as exc: + print(f" Warning: per-source evaluation failed: {exc}") + + if self.classify_after_source: + from markitect.infospace.classifier import run_entity_classification + + print(f" Classifying {len(new_entities)} new entity/entities…") + try: + run_entity_classification( + config=self.config, + entities=new_entities, + adapter=self.adapter, + run_config=run_config, + output_dir=self.root / self.config.classifications_dir, + ) + except Exception as exc: + print(f" Warning: per-source classification failed: {exc}") + + def _compose_commit_body(self, source_id: str) -> str: + """Summarise staged output changes into a commit-message body. + + Counts added files per output subdirectory (entities, evaluations, + classifications, analyses, mappings…) and produces one line per + bucket that actually saw additions. Modified/deleted files are + noted separately for auditability. + """ + default = "Extract entities, map to VSM, and synthesize analysis." + try: + result = subprocess.run( + ["git", "diff", "--cached", "--name-status", "--", "output"], + cwd=str(self.root), + check=True, + capture_output=True, + text=True, + ) + except subprocess.CalledProcessError: + return default + + added_by_bucket: Dict[str, int] = {} + modified = 0 + deleted = 0 + for line in result.stdout.splitlines(): + parts = line.split("\t") + if len(parts) < 2: + continue + status = parts[0] + path = parts[-1] + if status.startswith("A"): + bucket = self._bucket_for(path) + if bucket: + added_by_bucket[bucket] = added_by_bucket.get(bucket, 0) + 1 + elif status.startswith("M"): + modified += 1 + elif status.startswith("D"): + deleted += 1 + + if not added_by_bucket and not modified and not deleted: + return default + + # Emit buckets in a deterministic, reader-friendly order. + order = ["entities", "mappings", "analyses", "evaluations", + "classifications", "metrics", "logs", "other"] + lines: List[str] = [] + for bucket in order: + n = added_by_bucket.get(bucket, 0) + if n: + lines.append(f"- {bucket}: +{n}") + if modified: + lines.append(f"- modified: {modified}") + if deleted: + lines.append(f"- deleted: {deleted}") + return "\n".join(lines) if lines else default + + def _bucket_for(self, path: str) -> Optional[str]: + """Map an ``output/...`` path to a commit-summary bucket name.""" + # Use configured directory basenames where possible so non-default + # layouts still bucket correctly. + buckets = { + Path(self.config.entities_dir).name: "entities", + Path(self.config.evaluations_dir).name: "evaluations", + Path(self.config.classifications_dir).name: "classifications", + } + parts = Path(path).parts + if len(parts) < 2 or parts[0] != "output": + return None + sub = parts[1] + if sub in buckets: + return buckets[sub] + # Heuristic fallback for common additional output subdirectories. + known = {"mappings", "analyses", "metrics", "logs"} + if sub in known: + return sub + return "other" diff --git a/tests/unit/infospace/test_cli.py b/tests/unit/infospace/test_cli.py index b077b9b9..bd9c3636 100644 --- a/tests/unit/infospace/test_cli.py +++ b/tests/unit/infospace/test_cli.py @@ -223,3 +223,129 @@ class TestViabilityCommand: ) assert result.exit_code == 0 assert "No viability thresholds" in result.output + + +# ── chapters (per-source triage view) ──────────────────────────────── + + +class TestChaptersCommand: + @pytest.fixture + def chapters_dir(self, tmp_path): + """Infospace with 2 source files and matching entities.""" + config_yaml = """\ +topic: + name: "WoN" + domain: "Economics" + sources: artifacts/sources +""" + (tmp_path / "infospace.yaml").write_text(config_yaml) + + sources = tmp_path / "artifacts" / "sources" + sources.mkdir(parents=True) + (sources / "book-1-chapter-01.md").write_text("# Chapter 1\n\nText.\n") + (sources / "book-1-chapter-02.md").write_text("# Chapter 2\n\nText.\n") + + entities = tmp_path / "output" / "entities" + entities.mkdir(parents=True) + (entities / "alpha.md").write_text( + "# Alpha\n\n## Definition\n\nX.\n\n" + "## Source Chapter\n\nBook I, Chapter 1\n" + ) + (entities / "beta.md").write_text( + "# Beta\n\n## Definition\n\nY.\n\n" + "## Source Chapter\n\nBook I, Chapter 2\n" + ) + (entities / "gamma.md").write_text( + "# Gamma\n\n## Definition\n\nZ.\n\n" + "## Source Chapter\n\nBook I, Chapter 2\n" + ) + return tmp_path + + def test_lists_sources_with_counts(self, runner, chapters_dir): + result = runner.invoke( + infospace_commands, + ["chapters", "--config", str(chapters_dir / "infospace.yaml")], + ) + assert result.exit_code == 0 + assert "book-1-chapter-01" in result.output + assert "book-1-chapter-02" in result.output + # ch 1 -> 1 entity, ch 2 -> 2 entities + assert "2 source file(s); 3 entities" in result.output + + def test_json_format(self, runner, chapters_dir): + result = runner.invoke( + infospace_commands, + ["chapters", "--config", str(chapters_dir / "infospace.yaml"), + "--format", "json"], + ) + assert result.exit_code == 0 + import json + rows = json.loads(result.output) + by_id = {r["source_id"]: r for r in rows} + assert by_id["book-1-chapter-01"]["entities"] == 1 + assert by_id["book-1-chapter-02"]["entities"] == 2 + + def test_no_sources_dir(self, runner, tmp_path): + (tmp_path / "infospace.yaml").write_text( + "topic:\n name: X\n sources: missing\n" + ) + result = runner.invoke( + infospace_commands, + ["chapters", "--config", str(tmp_path / "infospace.yaml")], + ) + assert result.exit_code == 1 + + +# ── process: eval-after-source / classify-after-source flags ───────── + + +class TestProcessAfterSourceFlags: + def test_flags_registered_in_help(self, runner): + result = runner.invoke(infospace_commands, ["process", "--help"]) + assert result.exit_code == 0 + assert "--eval-after-source" in result.output + assert "--classify-after-source" in result.output + + def test_flags_require_provider(self, runner, tmp_path): + (tmp_path / "infospace.yaml").write_text( + "topic:\n name: X\n sources: sources\n" + "pipeline:\n stages:\n - template: extract-entities\n" + ) + sources = tmp_path / "sources" + sources.mkdir() + (sources / "s1.md").write_text("source") + result = runner.invoke( + infospace_commands, + ["process", "--all", + "--config", str(tmp_path / "infospace.yaml"), + "--eval-after-source"], + ) + assert result.exit_code == 1 + assert "require --provider" in result.output + + +# ── pipeline: commit body composition ──────────────────────────────── + + +class TestCommitBodyComposition: + def test_bucket_for(self, tmp_path): + from markitect.infospace.config import InfospaceConfig, TopicConfig + from markitect.infospace.pipeline import SourcePipeline + cfg = InfospaceConfig(topic=TopicConfig(name="T", domain="D")) + p = SourcePipeline(cfg, tmp_path) + assert p._bucket_for("output/entities/x.md") == "entities" + assert p._bucket_for("output/evaluations/x.md") == "evaluations" + assert p._bucket_for("output/classifications/x.md") == "classifications" + assert p._bucket_for("output/mappings/x.md") == "mappings" + assert p._bucket_for("output/notes/x.md") == "other" + assert p._bucket_for("README.md") is None # not under output/ + + def test_compose_body_uses_default_on_no_diff(self, tmp_path): + """When git diff fails or returns empty, fall back to the default blurb.""" + from markitect.infospace.config import InfospaceConfig, TopicConfig + from markitect.infospace.pipeline import SourcePipeline + cfg = InfospaceConfig(topic=TopicConfig(name="T", domain="D")) + # Not a git repo, so `git diff --cached` will raise CalledProcessError. + p = SourcePipeline(cfg, tmp_path) + body = p._compose_commit_body("some-source") + assert "Extract entities" in body