Files
markitect-main/markitect/infospace/cli.py
tegwick 2d45425b25 feat(infospace): add L3 relation graph with VSM-aware triplets (S2.8)
Implements the L3 relation graph layer — a directed graph of (Subject,
Predicate, Object) triplets annotated with VSM channel codes and feedback
roles. Triplets are authored as markdown files under output/relations/,
parsed into RelationMeta dataclasses, and analysed with networkx.

New modules:
- markitect/infospace/relation_models.py — RelationMeta dataclass +
  RELATION_TYPES controlled vocabulary (15 relation classes → VSM codes)
- markitect/infospace/relation_parser.py — parse_relation_file() and
  parse_relations_directory()

New schema: examples/infospace-with-history/schemas/relation-schema-v1.0.md
  — file naming convention, required sections, controlled vocabulary table

15 seed relation files covering the three core WoN feedback loops:
  - Capital Accumulation loop (positive reinforcement, S1/S3)
  - Market Price Balancing loop (negative feedback, S2/S3)
  - Market Extent mutual dependency (S1/S2)
  Plus structural relations: wages regulation, rent residual, price
  decomposition, invisible hand coordination

CLI: markitect infospace relations [--entity SLUG] [--vsm FILTER]
     [--loops] [--stats]
  - Builds directed graph from parsed files
  - Detects feedback loops via nx.simple_cycles()
  - 6 loops found from 15 seed relations (3 intended + 3 emergent)
  - --stats aggregates by VSM system code (strips parentheticals)

Config: InfospaceConfig gains relations_dir (default output/relations)
infospace.yaml: schemas.relation references relation-schema-v1.0.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-23 06:04:28 +01:00

843 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CLI commands for infospace lifecycle management.
Provides ``markitect infospace`` subcommands for initialising,
inspecting, and evaluating infospaces.
"""
from __future__ import annotations
from pathlib import Path
from typing import Optional
import click
from markitect.infospace.config import (
DisciplineBinding,
InfospaceConfig,
SchemaRegistry,
TopicConfig,
find_infospace_config,
load_infospace_config,
save_infospace_config,
)
from markitect.infospace.entity_parser import parse_entity_directory
from markitect.infospace.state import build_state
def _load_config_or_exit(config_path: Optional[str] = None) -> tuple:
"""Resolve and load infospace.yaml, or exit with an error."""
if config_path:
p = Path(config_path)
else:
p = find_infospace_config()
if p is None:
click.echo("Error: No infospace.yaml found. Run 'markitect infospace init' first.", err=True)
raise SystemExit(1)
cfg = load_infospace_config(p)
return cfg, p
@click.group(name="infospace")
def infospace_commands():
"""Manage infospaces — create, inspect, evaluate."""
pass
# ── init ─────────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--topic", required=True, help="Topic name for the infospace.")
@click.option("--domain", default="", help="Knowledge domain.")
@click.option("--sources", default="", help="Path to source material directory.")
@click.option("--discipline", multiple=True, help="Discipline name (repeatable).")
@click.option("--output", "-o", default="infospace.yaml", help="Output config file path.")
def init(topic: str, domain: str, sources: str, discipline: tuple, output: str):
"""Initialise a new infospace configuration file."""
out_path = Path(output)
if out_path.exists():
click.echo(f"Error: {out_path} already exists.", err=True)
raise SystemExit(1)
disciplines = [DisciplineBinding(name=d) for d in discipline]
config = InfospaceConfig(
topic=TopicConfig(name=topic, domain=domain, sources=sources),
disciplines=disciplines,
)
save_infospace_config(config, out_path)
click.echo(f"Created {out_path}")
# ── status ───────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def status(config_path: Optional[str]):
"""Show infospace status — entity count, domains, evaluation state."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
# Parse entities
entities_dir = root / cfg.entities_dir
entities = []
if entities_dir.is_dir():
entities = parse_entity_directory(entities_dir)
# Load latest snapshot if available
snapshot = None
history_path = root / cfg.metrics_dir / "history.yaml"
if history_path.is_file():
from markitect.infospace.evaluation_io import read_history
history = read_history(history_path)
if history:
snapshot = history[-1]
state = build_state(cfg, entities=entities, snapshot=snapshot)
click.echo(f"Infospace: {state.topic_name}")
if cfg.topic.domain:
click.echo(f"Domain: {cfg.topic.domain}")
click.echo(f"Entities: {state.entity_count}")
if state.domains:
click.echo(f"Domains: {', '.join(state.domains)}")
if cfg.disciplines:
names = [d.name for d in cfg.disciplines]
click.echo(f"Disciplines: {', '.join(names)}")
if state.has_evaluations:
click.echo(f"Last evaluated: {state.latest_snapshot.created_at.isoformat()}")
else:
click.echo("Evaluations: none")
# ── entities ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option(
"--sort-by", "sort_key",
type=click.Choice(["slug", "domain", "words"]),
default="slug",
help="Sort entities by field.",
)
def entities(config_path: Optional[str], sort_key: str):
"""List entities with metadata summary."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("No entities directory found.")
return
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities found.")
return
# Sort
if sort_key == "domain":
entity_list.sort(key=lambda e: (e.domain or "", e.slug))
elif sort_key == "words":
entity_list.sort(key=lambda e: e.total_word_count, reverse=True)
else:
entity_list.sort(key=lambda e: e.slug)
# Format as table
click.echo(f"{'Slug':<40} {'Domain':<20} {'Words':>6}")
click.echo("-" * 68)
for e in entity_list:
click.echo(f"{e.slug:<40} {(e.domain or '-'):<20} {e.total_word_count:>6}")
click.echo(f"\nTotal: {len(entity_list)} entities")
# ── evaluate ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--provider", default="openrouter", help="LLM provider (openrouter, openai, etc.).")
@click.option("--model", default=None, help="LLM model name.")
@click.option("--entity", "entity_slug", default=None, help="Evaluate a single entity by slug.")
@click.option("--chapter", default=None, help="Evaluate entities from a specific chapter.")
def evaluate(config_path, provider, model, entity_slug, chapter):
"""Evaluate entities using LLM-based quality assessment."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("Error: No entities directory found.", err=True)
raise SystemExit(1)
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities to evaluate.")
return
# Filter
if entity_slug:
entity_list = [e for e in entity_list if e.slug == entity_slug]
if not entity_list:
click.echo(f"Error: Entity '{entity_slug}' not found.", err=True)
raise SystemExit(1)
elif chapter:
entity_list = [e for e in entity_list if chapter in e.source_chapter]
if not entity_list:
click.echo(f"No entities found for chapter '{chapter}'.")
return
# Skip entities that already have evaluation files (incremental resume)
from markitect.infospace.evaluate import run_entity_evaluation
output_dir = root / cfg.evaluations_dir
if not entity_slug and not chapter and output_dir.is_dir():
previous_digests = {
p.stem: "" # non-empty sentinel → triggers skip in BatchEvaluator
for p in output_dir.glob("*.md")
}
entity_list = [e for e in entity_list if e.slug not in previous_digests]
if not entity_list:
click.echo("All entities already evaluated. Nothing to do.")
return
if previous_digests:
click.echo(f"Skipping {len(previous_digests)} already-evaluated entities.")
# Create adapter
from markitect.llm import create_adapter
from markitect.prompts.execution.models import RunConfig
adapter = create_adapter(provider, model=model)
run_config = RunConfig(model_name=model, temperature=0.3, max_tokens=2000)
# Progress callback
def on_progress(done, total, result):
status = result.status.upper()
click.echo(f" [{done}/{total}] {result.key}: {status}")
click.echo(f"Evaluating {len(entity_list)} entities via {provider}...")
summary = run_entity_evaluation(
config=cfg,
entities=entity_list,
adapter=adapter,
run_config=run_config,
output_dir=output_dir,
progress_callback=on_progress,
)
click.echo(f"\nDone: {summary.succeeded} succeeded, {summary.failed} failed, {summary.skipped} skipped")
if summary.total_tokens > 0:
click.echo(f"Tokens used: {summary.total_tokens}")
# ── eval-summary ──────────────────────────────────────────────────────
@infospace_commands.command(name="eval-summary")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--update-metrics", is_flag=True, default=False,
help="Merge per_entity_mean into metrics.yaml for viability checks.")
def eval_summary(config_path: Optional[str], update_metrics: bool):
"""Show aggregate statistics from per-entity evaluation files."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
evaluations_dir = root / cfg.evaluations_dir
if not evaluations_dir.is_dir():
click.echo("No evaluations directory found. Run 'markitect infospace evaluate' first.")
return
from markitect.infospace.evaluation_io import read_entity_evaluation
eval_files = sorted(evaluations_dir.glob("*.md"))
if not eval_files:
click.echo("No evaluation files found.")
return
overall_scores: list = []
dim_scores: dict = {}
failed: list = []
for ef in eval_files:
try:
ev = read_entity_evaluation(ef)
overall_scores.append(ev.overall_score)
for s in ev.scores:
dim_scores.setdefault(s.name, []).append(s.value)
except Exception as exc:
failed.append((ef.stem, str(exc)))
n = len(overall_scores)
if n == 0:
click.echo("No evaluations could be read.")
return
mean_overall = sum(overall_scores) / n
click.echo(f"Evaluation summary — {n} entities evaluated")
if failed:
click.echo(f" (failed to read: {len(failed)})")
click.echo()
click.echo(f" {'Dimension':<30} {'Mean':>6}")
click.echo(" " + "-" * 38)
click.echo(f" {'overall':<30} {mean_overall:>6.3f}")
for dim, vals in sorted(dim_scores.items()):
click.echo(f" {dim:<30} {sum(vals)/len(vals):>6.3f}")
score_min = min(overall_scores)
score_max = max(overall_scores)
click.echo()
click.echo(f" Range: {score_min:.2f} {score_max:.2f}")
if update_metrics:
from markitect.infospace.history import read_metrics_file, write_metrics_file
metrics_file = root / cfg.metrics_dir / "metrics.yaml"
existing = read_metrics_file(metrics_file)
existing["per_entity_mean"] = round(mean_overall, 6)
write_metrics_file(existing, metrics_file)
click.echo(f"\nUpdated metrics.yaml: per_entity_mean = {mean_overall:.4f}")
# ── relations ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--entity", "entity_slug", default=None,
help="Show only relations involving this entity slug.")
@click.option("--vsm", "vsm_filter", default=None,
help="Show only relations whose VSM channel contains this string (e.g. S2, S3).")
@click.option("--loops", "loops_only", is_flag=True, default=False,
help="Show only feedback loops (cycles in the relation graph).")
@click.option("--stats", "stats_only", is_flag=True, default=False,
help="Show aggregate statistics only, no individual relations.")
def relations(config_path: Optional[str], entity_slug: Optional[str],
vsm_filter: Optional[str], loops_only: bool, stats_only: bool):
"""Show the L3 relation graph — triplets, feedback loops, and VSM channels."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.relation_parser import parse_relations_directory
relations_dir = root / cfg.relations_dir
if not relations_dir.is_dir():
click.echo("No relations directory found. Create output/relations/ and add relation files.")
return
all_relations = parse_relations_directory(relations_dir)
if not all_relations:
click.echo("No relation files found in " + str(relations_dir))
return
# Build directed graph for cycle detection
try:
import networkx as nx
G = nx.DiGraph()
for r in all_relations:
G.add_edge(r.subject_slug, r.object_slug,
predicate=r.predicate,
relation_type=r.relation_type,
vsm_channel=r.vsm_channel,
slug=r.slug)
except ImportError:
G = None
# Find feedback loops
loops = []
if G is not None:
try:
loops = list(nx.simple_cycles(G))
except Exception:
loops = []
# Stats summary
import re as _re
def _vsm_code(channel: str) -> str:
"""Strip parenthetical description, returning just the system code (e.g. 'S3 → S1')."""
return _re.sub(r'\s*\(.*', '', channel).strip() or channel
n = len(all_relations)
vsm_counts: dict = {}
type_counts: dict = {}
for r in all_relations:
vsm_counts[_vsm_code(r.vsm_channel)] = vsm_counts.get(_vsm_code(r.vsm_channel), 0) + 1
type_counts[r.relation_type] = type_counts.get(r.relation_type, 0) + 1
click.echo(f"Relation graph — {n} relations")
if G is not None:
click.echo(f" Entities in graph: {G.number_of_nodes()}")
click.echo(f" Feedback loops: {len(loops)}")
click.echo()
if stats_only:
click.echo("Relation types:")
for rt, count in sorted(type_counts.items(), key=lambda x: -x[1]):
click.echo(f" {rt:<25} {count:>4}")
click.echo()
click.echo("VSM channels:")
for ch, count in sorted(vsm_counts.items(), key=lambda x: -x[1]):
click.echo(f" {ch:<20} {count:>4}")
return
# Feedback loops section
if loops or loops_only:
if loops:
click.echo(f"Feedback loops ({len(loops)}):")
for i, cycle in enumerate(loops, 1):
click.echo(f" Loop {i}: {''.join(cycle)}{cycle[0]}")
click.echo()
elif loops_only:
click.echo("No feedback loops detected in current relation set.")
return
if loops_only:
return
# Filter relations
filtered = all_relations
if entity_slug:
filtered = [r for r in filtered
if entity_slug in (r.subject_slug, r.object_slug)]
if not filtered:
click.echo(f"No relations found involving '{entity_slug}'.")
return
if vsm_filter:
filtered = [r for r in filtered if vsm_filter in r.vsm_channel]
if not filtered:
click.echo(f"No relations with VSM channel containing '{vsm_filter}'.")
return
# Display relations
click.echo(f"{'Subject':<35} {'Predicate':<30} {'Object':<35} {'VSM'}")
click.echo("-" * 110)
for r in filtered:
subj = r.subject[:33] + ".." if len(r.subject) > 35 else r.subject
obj = r.object[:33] + ".." if len(r.object) > 35 else r.object
pred = r.predicate[:28] + ".." if len(r.predicate) > 30 else r.predicate
click.echo(f"{subj:<35} {pred:<30} {obj:<35} {r.vsm_channel}")
# ── viability ────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def viability(config_path: Optional[str]):
"""Show viability dashboard — threshold checks and pass/fail."""
cfg, cfg_path = _load_config_or_exit(config_path)
if not cfg.viability:
click.echo("No viability thresholds configured in infospace.yaml.")
return
# Try to load latest metrics
root = cfg_path.parent
metrics: dict = {}
metrics_file = root / cfg.metrics_dir / "metrics.yaml"
if metrics_file.is_file():
import yaml
raw = yaml.safe_load(metrics_file.read_text(encoding="utf-8"))
if isinstance(raw, dict):
metrics = {k: float(v) for k, v in raw.items() if isinstance(v, (int, float))}
state = build_state(cfg, metrics=metrics if metrics else None)
if not state.viability_results:
click.echo("No metrics available. Run evaluations first.")
click.echo("\nConfigured thresholds:")
for name, t in cfg.viability.items():
bounds = []
if t.min is not None:
bounds.append(f"min={t.min}")
if t.max is not None:
bounds.append(f"max={t.max}")
click.echo(f" {name}: {', '.join(bounds)}")
return
click.echo(f"{'Metric':<30} {'Value':>8} {'Threshold':>15} {'Status':>8}")
click.echo("-" * 63)
for r in state.viability_results:
bounds = []
if r.threshold.min is not None:
bounds.append(f"min={r.threshold.min}")
if r.threshold.max is not None:
bounds.append(f"max={r.threshold.max}")
status_str = "PASS" if r.passed else "FAIL"
click.echo(
f"{r.metric:<30} {r.value:>8.4f} {', '.join(bounds):>15} {status_str:>8}"
)
click.echo()
if state.is_viable:
click.echo(f"Viable: YES ({state.viability_pass_count}/{state.viability_total_count} thresholds met)")
else:
click.echo(f"Viable: NO ({state.viability_pass_count}/{state.viability_total_count} thresholds met)")
# ── check ───────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option(
"--concern", "concerns", multiple=True,
type=click.Choice(["redundancy", "coverage", "coherence", "consistency", "granularity"]),
help="Run specific concern(s). Omit to run all five.",
)
@click.option("--json", "as_json", is_flag=True, help="Output results as JSON.")
def check(config_path: Optional[str], concerns: tuple, as_json: bool):
"""Run collection-level quality checks (C1C5)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("Error: No entities directory found.", err=True)
raise SystemExit(1)
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities to check.")
return
from markitect.infospace.checks import run_all_checks
checks_list = list(concerns) if concerns else None
report = run_all_checks(
entities=entity_list,
checks=checks_list,
)
if as_json:
import json
click.echo(json.dumps(report.to_dict(), indent=2))
else:
click.echo(f"Collection checks — {len(entity_list)} entities\n")
d = report.to_dict()
for concern_name, concern_data in d.items():
label = concern_data.get("concern", concern_name.upper())
click.echo(f" {label}{concern_name}")
for k, v in concern_data.items():
if k == "concern":
continue
click.echo(f" {k}: {v}")
click.echo()
# Show summary metrics
m = report.metrics()
if m and not as_json:
click.echo("Metrics summary:")
for k, v in sorted(m.items()):
click.echo(f" {k}: {v:.4f}")
# Record to history
if m:
from markitect.infospace.history import record_check_results
snap = record_check_results(report, cfg, root, entity_count=len(entity_list))
if not as_json:
click.echo(f"\nRecorded snapshot {snap.snapshot_id}")
# ── history ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--metric", default=None, help="Show trend for a specific metric.")
@click.option("--json", "as_json", is_flag=True, help="Output as JSON.")
def history(config_path: Optional[str], metric: Optional[str], as_json: bool):
"""Show metrics history — snapshots over time."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.history import get_history, metric_trend
snapshots = get_history(cfg, root)
if not snapshots:
click.echo("No history found. Run 'markitect infospace check' first.")
return
if metric:
trend = metric_trend(snapshots, metric)
if not trend:
click.echo(f"No data for metric '{metric}'.")
return
if as_json:
import json
click.echo(json.dumps(trend, indent=2))
else:
click.echo(f"Trend: {metric}\n")
for entry in trend:
click.echo(f" {entry['date'][:19]} {entry['value']:.4f}")
return
if as_json:
import json
click.echo(json.dumps([s.to_dict() for s in snapshots], indent=2, default=str))
return
click.echo(f"History: {len(snapshots)} snapshot(s)\n")
click.echo(f"{'#':<4} {'Date':<20} {'Entities':>8} {'Metrics':>8}")
click.echo("-" * 42)
for i, snap in enumerate(snapshots, 1):
date_str = snap.created_at.isoformat()[:19]
n_metrics = len(snap.collection_metrics)
click.echo(f"{i:<4} {date_str:<20} {snap.entity_count:>8} {n_metrics:>8}")
@infospace_commands.command(name="history-diff")
@click.argument("date_a")
@click.argument("date_b")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def history_diff(date_a: str, date_b: str, config_path: Optional[str]):
"""Compare two history snapshots by date (YYYY-MM-DD)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.history import find_snapshot_by_date, get_history
from markitect.infospace.evaluation_io import diff_snapshots
snapshots = get_history(cfg, root)
if len(snapshots) < 2:
click.echo("Need at least two snapshots to diff.")
return
snap_a = find_snapshot_by_date(snapshots, date_a)
snap_b = find_snapshot_by_date(snapshots, date_b)
if snap_a is None:
click.echo(f"No snapshot found near '{date_a}'.")
return
if snap_b is None:
click.echo(f"No snapshot found near '{date_b}'.")
return
if snap_a.snapshot_id == snap_b.snapshot_id:
click.echo("Both dates resolve to the same snapshot.")
return
diff = diff_snapshots(snap_a, snap_b)
click.echo(diff.summary())
# ── bind-discipline ─────────────────────────────────────────────────
@infospace_commands.command(name="bind-discipline")
@click.argument("discipline_path")
@click.option("--name", required=True, help="Name for the discipline.")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def bind_discipline_cmd(discipline_path: str, name: str, config_path: Optional[str]):
"""Bind a discipline infospace to the current infospace."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.composition import bind_discipline
status = bind_discipline(cfg, name=name, path=discipline_path, root=root)
if status.error:
click.echo(f"Error: {status.error}", err=True)
raise SystemExit(1)
# Persist updated config
save_infospace_config(cfg, cfg_path)
click.echo(f"Bound discipline '{name}' from {discipline_path}")
click.echo(f" Entities: {status.entity_count}")
if status.has_config:
viable_str = "YES" if status.is_viable else "NO"
click.echo(f" Viable: {viable_str}")
# ── disciplines ─────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def disciplines(config_path: Optional[str]):
"""List bound disciplines and their viability status."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
if not cfg.disciplines:
click.echo("No disciplines bound.")
return
from markitect.infospace.composition import check_discipline_status
click.echo(f"{'Name':<30} {'Entities':>8} {'Viable':>8} {'Path'}")
click.echo("-" * 70)
for binding in cfg.disciplines:
status = check_discipline_status(binding, root)
viable_str = "YES" if status.is_viable else ("NO" if status.has_config else "?")
click.echo(
f"{status.name:<30} {status.entity_count:>8} {viable_str:>8} {status.path}"
)
if status.error:
click.echo(f" Error: {status.error}")
# ── process ─────────────────────────────────────────────────────
@infospace_commands.command()
@click.argument("glob_pattern", default=None, required=False)
@click.option("--all", "process_all", is_flag=True, help="Process all source files.")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--provider", default=None, help="LLM provider (openrouter, openai, etc.).")
@click.option("--model", default=None, help="LLM model name.")
@click.option(
"--check-after-each",
is_flag=True,
help="Run collection checks (C1C5) after each source file.",
)
@click.option("--no-commit", is_flag=True, help="Skip git commits.")
def process(
glob_pattern: Optional[str],
process_all: bool,
config_path: Optional[str],
provider: Optional[str],
model: Optional[str],
check_after_each: bool,
no_commit: bool,
):
"""Process source files through the pipeline defined in infospace.yaml.
GLOB_PATTERN is matched against the sources directory declared in
infospace.yaml (default ``*.md``). Use ``--all`` to process every
source file.
\b
Examples:
# Process chapters 1-3 from book 1
markitect infospace process "book-1-chapter-0[1-3].md" --provider openrouter
# Process all source files and check metrics after each
markitect infospace process --all --provider openrouter --check-after-each
# Dry run — load existing outputs only, no LLM calls
markitect infospace process --all
"""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
if not cfg.pipeline or not cfg.pipeline.stages:
click.echo(
"Error: No pipeline stages defined in infospace.yaml.\n"
"Add a 'pipeline.stages' section with at least one stage.",
err=True,
)
raise SystemExit(1)
# Resolve sources directory
sources_dir = root / cfg.topic.sources if cfg.topic.sources else root
if not sources_dir.is_dir():
click.echo(
f"Error: Sources directory not found: {sources_dir}\n"
f"Set 'topic.sources' in infospace.yaml.",
err=True,
)
raise SystemExit(1)
# Collect source files
if process_all:
source_files = sorted(sources_dir.glob("*.md"))
else:
pattern = glob_pattern or "*.md"
source_files = sorted(sources_dir.glob(pattern))
if not source_files:
if process_all:
click.echo(f"No source files found in {sources_dir}")
else:
click.echo(
f"No files matched: {glob_pattern or '*.md'}\n"
f"Sources directory: {sources_dir}"
)
return
click.echo(f"Found {len(source_files)} source file(s) in {sources_dir.name}/")
# Create LLM adapter
adapter = None
if provider:
from markitect.llm import create_adapter
_PROVIDER_DEFAULTS = {"openrouter": "arcee-ai/trinity-large-preview:free"}
resolved_model = model or _PROVIDER_DEFAULTS.get(provider)
adapter = create_adapter(provider, model=resolved_model)
click.echo(f"LLM: {provider} ({resolved_model or 'default'})")
else:
click.echo("No LLM provider — will use existing outputs only (manual mode).")
# Run pipeline
from markitect.infospace.pipeline import SourcePipeline
pipeline = SourcePipeline(
cfg, root,
adapter=adapter,
provider=provider or "",
model=(model or _PROVIDER_DEFAULTS.get(provider or "", "")) if provider else "",
no_commit=no_commit,
)
total = len(source_files)
completed = 0
for i, source_file in enumerate(source_files, 1):
click.echo(f"\n[{i}/{total}] {source_file.name}")
success = pipeline.process_source(source_file)
if success:
completed += 1
if check_after_each:
pipeline.run_collection_check()
click.echo(f"\nDone: {completed}/{total} source file(s) fully processed.")
# ── stale-mappings ──────────────────────────────────────────────────
@infospace_commands.command(name="stale-mappings")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def stale_mappings(config_path: Optional[str]):
"""Check for stale mappings due to discipline changes."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
if not cfg.disciplines:
click.echo("No disciplines bound — no mappings to check.")
return
from markitect.infospace.composition import find_stale_mappings
# Try to load mapping references from output
mapping_refs = _load_mapping_references(cfg, root)
stale = find_stale_mappings(cfg, root, mapping_references=mapping_refs)
if not stale:
click.echo("No stale mappings detected.")
return
click.echo(f"Found {len(stale)} stale mapping(s):\n")
for s in stale:
click.echo(f" {s.entity_slug} -> {s.discipline_entity}")
click.echo(f" {s.reason}")
def _load_mapping_references(
cfg: InfospaceConfig, root: Path
) -> Optional[dict]:
"""Try to load mapping references from YAML file in output dir."""
mapping_file = root / cfg.metrics_dir / "mapping-references.yaml"
if not mapping_file.is_file():
return None
import yaml
data = yaml.safe_load(mapping_file.read_text(encoding="utf-8"))
if isinstance(data, dict):
return data
return None