Files
markitect-main/markitect/infospace/cli.py
tegwick c594bc3a38 feat(infospace): add process command for batch source file processing
- Extend PipelineStage with name, output_dir, output_macro,
  split_entities, and macros fields for declarative pipeline config
- Add SourcePipeline class (pipeline.py) using simple @{macro}
  substitution — no SQLite dependency, skip-if-exists per stage,
  LLM retry on rate limits, git commit per source
- Add `markitect infospace process [GLOB_PATTERN]` CLI command with
  --all, --provider, --model, --check-after-each, --no-commit flags
- Update infospace.yaml with output_dir, output_macro, split_entities,
  and macros for each pipeline stage in the WoN example

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-19 13:29:32 +01:00

636 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CLI commands for infospace lifecycle management.
Provides ``markitect infospace`` subcommands for initialising,
inspecting, and evaluating infospaces.
"""
from __future__ import annotations
from pathlib import Path
from typing import Optional
import click
from markitect.infospace.config import (
DisciplineBinding,
InfospaceConfig,
SchemaRegistry,
TopicConfig,
find_infospace_config,
load_infospace_config,
save_infospace_config,
)
from markitect.infospace.entity_parser import parse_entity_directory
from markitect.infospace.state import build_state
def _load_config_or_exit(config_path: Optional[str] = None) -> tuple:
"""Resolve and load infospace.yaml, or exit with an error."""
if config_path:
p = Path(config_path)
else:
p = find_infospace_config()
if p is None:
click.echo("Error: No infospace.yaml found. Run 'markitect infospace init' first.", err=True)
raise SystemExit(1)
cfg = load_infospace_config(p)
return cfg, p
@click.group(name="infospace")
def infospace_commands():
"""Manage infospaces — create, inspect, evaluate."""
pass
# ── init ─────────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--topic", required=True, help="Topic name for the infospace.")
@click.option("--domain", default="", help="Knowledge domain.")
@click.option("--sources", default="", help="Path to source material directory.")
@click.option("--discipline", multiple=True, help="Discipline name (repeatable).")
@click.option("--output", "-o", default="infospace.yaml", help="Output config file path.")
def init(topic: str, domain: str, sources: str, discipline: tuple, output: str):
"""Initialise a new infospace configuration file."""
out_path = Path(output)
if out_path.exists():
click.echo(f"Error: {out_path} already exists.", err=True)
raise SystemExit(1)
disciplines = [DisciplineBinding(name=d) for d in discipline]
config = InfospaceConfig(
topic=TopicConfig(name=topic, domain=domain, sources=sources),
disciplines=disciplines,
)
save_infospace_config(config, out_path)
click.echo(f"Created {out_path}")
# ── status ───────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def status(config_path: Optional[str]):
"""Show infospace status — entity count, domains, evaluation state."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
# Parse entities
entities_dir = root / cfg.entities_dir
entities = []
if entities_dir.is_dir():
entities = parse_entity_directory(entities_dir)
# Load latest snapshot if available
snapshot = None
history_path = root / cfg.metrics_dir / "history.yaml"
if history_path.is_file():
from markitect.infospace.evaluation_io import read_history
history = read_history(history_path)
if history:
snapshot = history[-1]
state = build_state(cfg, entities=entities, snapshot=snapshot)
click.echo(f"Infospace: {state.topic_name}")
if cfg.topic.domain:
click.echo(f"Domain: {cfg.topic.domain}")
click.echo(f"Entities: {state.entity_count}")
if state.domains:
click.echo(f"Domains: {', '.join(state.domains)}")
if cfg.disciplines:
names = [d.name for d in cfg.disciplines]
click.echo(f"Disciplines: {', '.join(names)}")
if state.has_evaluations:
click.echo(f"Last evaluated: {state.latest_snapshot.created_at.isoformat()}")
else:
click.echo("Evaluations: none")
# ── entities ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option(
"--sort-by", "sort_key",
type=click.Choice(["slug", "domain", "words"]),
default="slug",
help="Sort entities by field.",
)
def entities(config_path: Optional[str], sort_key: str):
"""List entities with metadata summary."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("No entities directory found.")
return
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities found.")
return
# Sort
if sort_key == "domain":
entity_list.sort(key=lambda e: (e.domain or "", e.slug))
elif sort_key == "words":
entity_list.sort(key=lambda e: e.total_word_count, reverse=True)
else:
entity_list.sort(key=lambda e: e.slug)
# Format as table
click.echo(f"{'Slug':<40} {'Domain':<20} {'Words':>6}")
click.echo("-" * 68)
for e in entity_list:
click.echo(f"{e.slug:<40} {(e.domain or '-'):<20} {e.total_word_count:>6}")
click.echo(f"\nTotal: {len(entity_list)} entities")
# ── evaluate ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--provider", default="openrouter", help="LLM provider (openrouter, openai, etc.).")
@click.option("--model", default=None, help="LLM model name.")
@click.option("--entity", "entity_slug", default=None, help="Evaluate a single entity by slug.")
@click.option("--chapter", default=None, help="Evaluate entities from a specific chapter.")
def evaluate(config_path, provider, model, entity_slug, chapter):
"""Evaluate entities using LLM-based quality assessment."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("Error: No entities directory found.", err=True)
raise SystemExit(1)
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities to evaluate.")
return
# Filter
if entity_slug:
entity_list = [e for e in entity_list if e.slug == entity_slug]
if not entity_list:
click.echo(f"Error: Entity '{entity_slug}' not found.", err=True)
raise SystemExit(1)
elif chapter:
entity_list = [e for e in entity_list if chapter in e.source_chapter]
if not entity_list:
click.echo(f"No entities found for chapter '{chapter}'.")
return
# Create adapter
from markitect.llm import create_adapter
from markitect.prompts.execution.models import RunConfig
adapter = create_adapter(provider, model=model)
run_config = RunConfig(model_name=model or "default", temperature=0.3, max_tokens=2000)
# Progress callback
def on_progress(done, total, result):
status = result.status.upper()
click.echo(f" [{done}/{total}] {result.key}: {status}")
click.echo(f"Evaluating {len(entity_list)} entities via {provider}...")
from markitect.infospace.evaluate import run_entity_evaluation
output_dir = root / cfg.evaluations_dir
summary = run_entity_evaluation(
config=cfg,
entities=entity_list,
adapter=adapter,
run_config=run_config,
output_dir=output_dir,
progress_callback=on_progress,
)
click.echo(f"\nDone: {summary.succeeded} succeeded, {summary.failed} failed, {summary.skipped} skipped")
if summary.total_tokens > 0:
click.echo(f"Tokens used: {summary.total_tokens}")
# ── viability ────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def viability(config_path: Optional[str]):
"""Show viability dashboard — threshold checks and pass/fail."""
cfg, cfg_path = _load_config_or_exit(config_path)
if not cfg.viability:
click.echo("No viability thresholds configured in infospace.yaml.")
return
# Try to load latest metrics
root = cfg_path.parent
metrics: dict = {}
metrics_file = root / cfg.metrics_dir / "metrics.yaml"
if metrics_file.is_file():
import yaml
raw = yaml.safe_load(metrics_file.read_text(encoding="utf-8"))
if isinstance(raw, dict):
metrics = {k: float(v) for k, v in raw.items() if isinstance(v, (int, float))}
state = build_state(cfg, metrics=metrics if metrics else None)
if not state.viability_results:
click.echo("No metrics available. Run evaluations first.")
click.echo("\nConfigured thresholds:")
for name, t in cfg.viability.items():
bounds = []
if t.min is not None:
bounds.append(f"min={t.min}")
if t.max is not None:
bounds.append(f"max={t.max}")
click.echo(f" {name}: {', '.join(bounds)}")
return
click.echo(f"{'Metric':<30} {'Value':>8} {'Threshold':>15} {'Status':>8}")
click.echo("-" * 63)
for r in state.viability_results:
bounds = []
if r.threshold.min is not None:
bounds.append(f"min={r.threshold.min}")
if r.threshold.max is not None:
bounds.append(f"max={r.threshold.max}")
status_str = "PASS" if r.passed else "FAIL"
click.echo(
f"{r.metric:<30} {r.value:>8.4f} {', '.join(bounds):>15} {status_str:>8}"
)
click.echo()
if state.is_viable:
click.echo(f"Viable: YES ({state.viability_pass_count}/{state.viability_total_count} thresholds met)")
else:
click.echo(f"Viable: NO ({state.viability_pass_count}/{state.viability_total_count} thresholds met)")
# ── check ───────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option(
"--concern", "concerns", multiple=True,
type=click.Choice(["redundancy", "coverage", "coherence", "consistency", "granularity"]),
help="Run specific concern(s). Omit to run all five.",
)
@click.option("--json", "as_json", is_flag=True, help="Output results as JSON.")
def check(config_path: Optional[str], concerns: tuple, as_json: bool):
"""Run collection-level quality checks (C1C5)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("Error: No entities directory found.", err=True)
raise SystemExit(1)
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities to check.")
return
from markitect.infospace.checks import run_all_checks
checks_list = list(concerns) if concerns else None
report = run_all_checks(
entities=entity_list,
checks=checks_list,
)
if as_json:
import json
click.echo(json.dumps(report.to_dict(), indent=2))
else:
click.echo(f"Collection checks — {len(entity_list)} entities\n")
d = report.to_dict()
for concern_name, concern_data in d.items():
label = concern_data.get("concern", concern_name.upper())
click.echo(f" {label}{concern_name}")
for k, v in concern_data.items():
if k == "concern":
continue
click.echo(f" {k}: {v}")
click.echo()
# Show summary metrics
m = report.metrics()
if m and not as_json:
click.echo("Metrics summary:")
for k, v in sorted(m.items()):
click.echo(f" {k}: {v:.4f}")
# Record to history
if m:
from markitect.infospace.history import record_check_results
snap = record_check_results(report, cfg, root, entity_count=len(entity_list))
if not as_json:
click.echo(f"\nRecorded snapshot {snap.snapshot_id}")
# ── history ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--metric", default=None, help="Show trend for a specific metric.")
@click.option("--json", "as_json", is_flag=True, help="Output as JSON.")
def history(config_path: Optional[str], metric: Optional[str], as_json: bool):
"""Show metrics history — snapshots over time."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.history import get_history, metric_trend
snapshots = get_history(cfg, root)
if not snapshots:
click.echo("No history found. Run 'markitect infospace check' first.")
return
if metric:
trend = metric_trend(snapshots, metric)
if not trend:
click.echo(f"No data for metric '{metric}'.")
return
if as_json:
import json
click.echo(json.dumps(trend, indent=2))
else:
click.echo(f"Trend: {metric}\n")
for entry in trend:
click.echo(f" {entry['date'][:19]} {entry['value']:.4f}")
return
if as_json:
import json
click.echo(json.dumps([s.to_dict() for s in snapshots], indent=2, default=str))
return
click.echo(f"History: {len(snapshots)} snapshot(s)\n")
click.echo(f"{'#':<4} {'Date':<20} {'Entities':>8} {'Metrics':>8}")
click.echo("-" * 42)
for i, snap in enumerate(snapshots, 1):
date_str = snap.created_at.isoformat()[:19]
n_metrics = len(snap.collection_metrics)
click.echo(f"{i:<4} {date_str:<20} {snap.entity_count:>8} {n_metrics:>8}")
@infospace_commands.command(name="history-diff")
@click.argument("date_a")
@click.argument("date_b")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def history_diff(date_a: str, date_b: str, config_path: Optional[str]):
"""Compare two history snapshots by date (YYYY-MM-DD)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.history import find_snapshot_by_date, get_history
from markitect.infospace.evaluation_io import diff_snapshots
snapshots = get_history(cfg, root)
if len(snapshots) < 2:
click.echo("Need at least two snapshots to diff.")
return
snap_a = find_snapshot_by_date(snapshots, date_a)
snap_b = find_snapshot_by_date(snapshots, date_b)
if snap_a is None:
click.echo(f"No snapshot found near '{date_a}'.")
return
if snap_b is None:
click.echo(f"No snapshot found near '{date_b}'.")
return
if snap_a.snapshot_id == snap_b.snapshot_id:
click.echo("Both dates resolve to the same snapshot.")
return
diff = diff_snapshots(snap_a, snap_b)
click.echo(diff.summary())
# ── bind-discipline ─────────────────────────────────────────────────
@infospace_commands.command(name="bind-discipline")
@click.argument("discipline_path")
@click.option("--name", required=True, help="Name for the discipline.")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def bind_discipline_cmd(discipline_path: str, name: str, config_path: Optional[str]):
"""Bind a discipline infospace to the current infospace."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.composition import bind_discipline
status = bind_discipline(cfg, name=name, path=discipline_path, root=root)
if status.error:
click.echo(f"Error: {status.error}", err=True)
raise SystemExit(1)
# Persist updated config
save_infospace_config(cfg, cfg_path)
click.echo(f"Bound discipline '{name}' from {discipline_path}")
click.echo(f" Entities: {status.entity_count}")
if status.has_config:
viable_str = "YES" if status.is_viable else "NO"
click.echo(f" Viable: {viable_str}")
# ── disciplines ─────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def disciplines(config_path: Optional[str]):
"""List bound disciplines and their viability status."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
if not cfg.disciplines:
click.echo("No disciplines bound.")
return
from markitect.infospace.composition import check_discipline_status
click.echo(f"{'Name':<30} {'Entities':>8} {'Viable':>8} {'Path'}")
click.echo("-" * 70)
for binding in cfg.disciplines:
status = check_discipline_status(binding, root)
viable_str = "YES" if status.is_viable else ("NO" if status.has_config else "?")
click.echo(
f"{status.name:<30} {status.entity_count:>8} {viable_str:>8} {status.path}"
)
if status.error:
click.echo(f" Error: {status.error}")
# ── process ─────────────────────────────────────────────────────
@infospace_commands.command()
@click.argument("glob_pattern", default=None, required=False)
@click.option("--all", "process_all", is_flag=True, help="Process all source files.")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--provider", default=None, help="LLM provider (openrouter, openai, etc.).")
@click.option("--model", default=None, help="LLM model name.")
@click.option(
"--check-after-each",
is_flag=True,
help="Run collection checks (C1C5) after each source file.",
)
@click.option("--no-commit", is_flag=True, help="Skip git commits.")
def process(
glob_pattern: Optional[str],
process_all: bool,
config_path: Optional[str],
provider: Optional[str],
model: Optional[str],
check_after_each: bool,
no_commit: bool,
):
"""Process source files through the pipeline defined in infospace.yaml.
GLOB_PATTERN is matched against the sources directory declared in
infospace.yaml (default ``*.md``). Use ``--all`` to process every
source file.
\b
Examples:
# Process chapters 1-3 from book 1
markitect infospace process "book-1-chapter-0[1-3].md" --provider openrouter
# Process all source files and check metrics after each
markitect infospace process --all --provider openrouter --check-after-each
# Dry run — load existing outputs only, no LLM calls
markitect infospace process --all
"""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
if not cfg.pipeline or not cfg.pipeline.stages:
click.echo(
"Error: No pipeline stages defined in infospace.yaml.\n"
"Add a 'pipeline.stages' section with at least one stage.",
err=True,
)
raise SystemExit(1)
# Resolve sources directory
sources_dir = root / cfg.topic.sources if cfg.topic.sources else root
if not sources_dir.is_dir():
click.echo(
f"Error: Sources directory not found: {sources_dir}\n"
f"Set 'topic.sources' in infospace.yaml.",
err=True,
)
raise SystemExit(1)
# Collect source files
if process_all:
source_files = sorted(sources_dir.glob("*.md"))
else:
pattern = glob_pattern or "*.md"
source_files = sorted(sources_dir.glob(pattern))
if not source_files:
if process_all:
click.echo(f"No source files found in {sources_dir}")
else:
click.echo(
f"No files matched: {glob_pattern or '*.md'}\n"
f"Sources directory: {sources_dir}"
)
return
click.echo(f"Found {len(source_files)} source file(s) in {sources_dir.name}/")
# Create LLM adapter
adapter = None
if provider:
from markitect.llm import create_adapter
_PROVIDER_DEFAULTS = {"openrouter": "arcee-ai/trinity-large-preview:free"}
resolved_model = model or _PROVIDER_DEFAULTS.get(provider)
adapter = create_adapter(provider, model=resolved_model)
click.echo(f"LLM: {provider} ({resolved_model or 'default'})")
else:
click.echo("No LLM provider — will use existing outputs only (manual mode).")
# Run pipeline
from markitect.infospace.pipeline import SourcePipeline
pipeline = SourcePipeline(cfg, root, adapter=adapter, no_commit=no_commit)
total = len(source_files)
completed = 0
for i, source_file in enumerate(source_files, 1):
click.echo(f"\n[{i}/{total}] {source_file.name}")
success = pipeline.process_source(source_file)
if success:
completed += 1
if check_after_each:
pipeline.run_collection_check()
click.echo(f"\nDone: {completed}/{total} source file(s) fully processed.")
# ── stale-mappings ──────────────────────────────────────────────────
@infospace_commands.command(name="stale-mappings")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def stale_mappings(config_path: Optional[str]):
"""Check for stale mappings due to discipline changes."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
if not cfg.disciplines:
click.echo("No disciplines bound — no mappings to check.")
return
from markitect.infospace.composition import find_stale_mappings
# Try to load mapping references from output
mapping_refs = _load_mapping_references(cfg, root)
stale = find_stale_mappings(cfg, root, mapping_references=mapping_refs)
if not stale:
click.echo("No stale mappings detected.")
return
click.echo(f"Found {len(stale)} stale mapping(s):\n")
for s in stale:
click.echo(f" {s.entity_slug} -> {s.discipline_entity}")
click.echo(f" {s.reason}")
def _load_mapping_references(
cfg: InfospaceConfig, root: Path
) -> Optional[dict]:
"""Try to load mapping references from YAML file in output dir."""
mapping_file = root / cfg.metrics_dir / "mapping-references.yaml"
if not mapping_file.is_file():
return None
import yaml
data = yaml.safe_load(mapping_file.read_text(encoding="utf-8"))
if isinstance(data, dict):
return data
return None