""" CLI commands for infospace lifecycle management. Provides ``markitect infospace`` subcommands for initialising, inspecting, and evaluating infospaces. """ from __future__ import annotations from pathlib import Path from typing import Optional import click from markitect.infospace.config import ( DisciplineBinding, InfospaceConfig, SchemaRegistry, TopicConfig, find_infospace_config, load_infospace_config, save_infospace_config, ) from markitect.infospace.entity_parser import parse_entity_directory from markitect.infospace.state import build_state def _load_config_or_exit(config_path: Optional[str] = None) -> tuple: """Resolve and load infospace.yaml, or exit with an error.""" if config_path: p = Path(config_path) else: p = find_infospace_config() if p is None: click.echo("Error: No infospace.yaml found. Run 'markitect infospace init' first.", err=True) raise SystemExit(1) cfg = load_infospace_config(p) return cfg, p @click.group(name="infospace") def infospace_commands(): """Manage infospaces — create, inspect, evaluate.""" pass # ── init ───────────────────────────────────────────────────────────── @infospace_commands.command() @click.option("--topic", required=True, help="Topic name for the infospace.") @click.option("--domain", default="", help="Knowledge domain.") @click.option("--sources", default="", help="Path to source material directory.") @click.option("--discipline", multiple=True, help="Discipline name (repeatable).") @click.option("--output", "-o", default="infospace.yaml", help="Output config file path.") def init(topic: str, domain: str, sources: str, discipline: tuple, output: str): """Initialise a new infospace configuration file.""" out_path = Path(output) if out_path.exists(): click.echo(f"Error: {out_path} already exists.", err=True) raise SystemExit(1) disciplines = [DisciplineBinding(name=d) for d in discipline] config = InfospaceConfig( topic=TopicConfig(name=topic, domain=domain, sources=sources), disciplines=disciplines, ) save_infospace_config(config, out_path) click.echo(f"Created {out_path}") # ── status ─────────────────────────────────────────────────────────── @infospace_commands.command() @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") def status(config_path: Optional[str]): """Show infospace status — entity count, domains, evaluation state.""" cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent # Parse entities entities_dir = root / cfg.entities_dir entities = [] if entities_dir.is_dir(): entities = parse_entity_directory(entities_dir) # Load latest snapshot if available snapshot = None history_path = root / cfg.metrics_dir / "history.yaml" if history_path.is_file(): from markitect.infospace.evaluation_io import read_history history = read_history(history_path) if history: snapshot = history[-1] state = build_state(cfg, entities=entities, snapshot=snapshot) click.echo(f"Infospace: {state.topic_name}") if cfg.topic.domain: click.echo(f"Domain: {cfg.topic.domain}") click.echo(f"Entities: {state.entity_count}") if state.domains: click.echo(f"Domains: {', '.join(state.domains)}") if cfg.disciplines: names = [d.name for d in cfg.disciplines] click.echo(f"Disciplines: {', '.join(names)}") if state.has_evaluations: click.echo(f"Last evaluated: {state.latest_snapshot.created_at.isoformat()}") else: click.echo("Evaluations: none") # ── entities ───────────────────────────────────────────────────────── @infospace_commands.command() @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") @click.option( "--sort-by", "sort_key", type=click.Choice(["slug", "domain", "words"]), default="slug", help="Sort entities by field.", ) def entities(config_path: Optional[str], sort_key: str): """List entities with metadata summary.""" cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent entities_dir = root / cfg.entities_dir if not entities_dir.is_dir(): click.echo("No entities directory found.") return entity_list = parse_entity_directory(entities_dir) if not entity_list: click.echo("No entities found.") return # Sort if sort_key == "domain": entity_list.sort(key=lambda e: (e.domain or "", e.slug)) elif sort_key == "words": entity_list.sort(key=lambda e: e.total_word_count, reverse=True) else: entity_list.sort(key=lambda e: e.slug) # Format as table click.echo(f"{'Slug':<40} {'Domain':<20} {'Words':>6}") click.echo("-" * 68) for e in entity_list: click.echo(f"{e.slug:<40} {(e.domain or '-'):<20} {e.total_word_count:>6}") click.echo(f"\nTotal: {len(entity_list)} entities") # ── evaluate ───────────────────────────────────────────────────────── @infospace_commands.command() @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") @click.option("--provider", default="openrouter", help="LLM provider (openrouter, openai, etc.).") @click.option("--model", default=None, help="LLM model name.") @click.option("--entity", "entity_slug", default=None, help="Evaluate a single entity by slug.") @click.option("--chapter", default=None, help="Evaluate entities from a specific chapter.") def evaluate(config_path, provider, model, entity_slug, chapter): """Evaluate entities using LLM-based quality assessment.""" cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent entities_dir = root / cfg.entities_dir if not entities_dir.is_dir(): click.echo("Error: No entities directory found.", err=True) raise SystemExit(1) entity_list = parse_entity_directory(entities_dir) if not entity_list: click.echo("No entities to evaluate.") return # Filter if entity_slug: entity_list = [e for e in entity_list if e.slug == entity_slug] if not entity_list: click.echo(f"Error: Entity '{entity_slug}' not found.", err=True) raise SystemExit(1) elif chapter: entity_list = [e for e in entity_list if chapter in e.source_chapter] if not entity_list: click.echo(f"No entities found for chapter '{chapter}'.") return # Skip entities that already have evaluation files (incremental resume) from markitect.infospace.evaluate import run_entity_evaluation output_dir = root / cfg.evaluations_dir if not entity_slug and not chapter and output_dir.is_dir(): previous_digests = { p.stem: "" # non-empty sentinel → triggers skip in BatchEvaluator for p in output_dir.glob("*.md") } entity_list = [e for e in entity_list if e.slug not in previous_digests] if not entity_list: click.echo("All entities already evaluated. Nothing to do.") return if previous_digests: click.echo(f"Skipping {len(previous_digests)} already-evaluated entities.") # Create adapter from markitect.llm import create_adapter from markitect.prompts.execution.models import RunConfig adapter = create_adapter(provider, model=model) run_config = RunConfig(model_name=model, temperature=0.3, max_tokens=2000) # Progress callback def on_progress(done, total, result): status = result.status.upper() click.echo(f" [{done}/{total}] {result.key}: {status}") click.echo(f"Evaluating {len(entity_list)} entities via {provider}...") summary = run_entity_evaluation( config=cfg, entities=entity_list, adapter=adapter, run_config=run_config, output_dir=output_dir, progress_callback=on_progress, ) click.echo(f"\nDone: {summary.succeeded} succeeded, {summary.failed} failed, {summary.skipped} skipped") if summary.total_tokens > 0: click.echo(f"Tokens used: {summary.total_tokens}") # ── eval-summary ────────────────────────────────────────────────────── @infospace_commands.command(name="eval-summary") @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") @click.option("--update-metrics", is_flag=True, default=False, help="Merge per_entity_mean into metrics.yaml for viability checks.") def eval_summary(config_path: Optional[str], update_metrics: bool): """Show aggregate statistics from per-entity evaluation files.""" cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent evaluations_dir = root / cfg.evaluations_dir if not evaluations_dir.is_dir(): click.echo("No evaluations directory found. Run 'markitect infospace evaluate' first.") return from markitect.infospace.evaluation_io import read_entity_evaluation eval_files = sorted(evaluations_dir.glob("*.md")) if not eval_files: click.echo("No evaluation files found.") return overall_scores: list = [] dim_scores: dict = {} failed: list = [] for ef in eval_files: try: ev = read_entity_evaluation(ef) overall_scores.append(ev.overall_score) for s in ev.scores: dim_scores.setdefault(s.name, []).append(s.value) except Exception as exc: failed.append((ef.stem, str(exc))) n = len(overall_scores) if n == 0: click.echo("No evaluations could be read.") return mean_overall = sum(overall_scores) / n click.echo(f"Evaluation summary — {n} entities evaluated") if failed: click.echo(f" (failed to read: {len(failed)})") click.echo() click.echo(f" {'Dimension':<30} {'Mean':>6}") click.echo(" " + "-" * 38) click.echo(f" {'overall':<30} {mean_overall:>6.3f}") for dim, vals in sorted(dim_scores.items()): click.echo(f" {dim:<30} {sum(vals)/len(vals):>6.3f}") score_min = min(overall_scores) score_max = max(overall_scores) click.echo() click.echo(f" Range: {score_min:.2f} – {score_max:.2f}") if update_metrics: from markitect.infospace.history import read_metrics_file, write_metrics_file metrics_file = root / cfg.metrics_dir / "metrics.yaml" existing = read_metrics_file(metrics_file) existing["per_entity_mean"] = round(mean_overall, 6) write_metrics_file(existing, metrics_file) click.echo(f"\nUpdated metrics.yaml: per_entity_mean = {mean_overall:.4f}") # ── relations ───────────────────────────────────────────────────────── @infospace_commands.command() @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") @click.option("--entity", "entity_slug", default=None, help="Show only relations involving this entity slug.") @click.option("--vsm", "vsm_filter", default=None, help="Show only relations whose VSM channel contains this string (e.g. S2, S3).") @click.option("--loops", "loops_only", is_flag=True, default=False, help="Show only feedback loops (cycles in the relation graph).") @click.option("--stats", "stats_only", is_flag=True, default=False, help="Show aggregate statistics only, no individual relations.") def relations(config_path: Optional[str], entity_slug: Optional[str], vsm_filter: Optional[str], loops_only: bool, stats_only: bool): """Show the L3 relation graph — triplets, feedback loops, and VSM channels.""" cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent from markitect.infospace.relation_parser import parse_relations_directory relations_dir = root / cfg.relations_dir if not relations_dir.is_dir(): click.echo("No relations directory found. Create output/relations/ and add relation files.") return all_relations = parse_relations_directory(relations_dir) if not all_relations: click.echo("No relation files found in " + str(relations_dir)) return # Build directed graph for cycle detection try: import networkx as nx G = nx.DiGraph() for r in all_relations: G.add_edge(r.subject_slug, r.object_slug, predicate=r.predicate, relation_type=r.relation_type, vsm_channel=r.vsm_channel, slug=r.slug) except ImportError: G = None # Find feedback loops loops = [] if G is not None: try: loops = list(nx.simple_cycles(G)) except Exception: loops = [] # Stats summary import re as _re def _vsm_code(channel: str) -> str: """Strip parenthetical description, returning just the system code (e.g. 'S3 → S1').""" return _re.sub(r'\s*\(.*', '', channel).strip() or channel n = len(all_relations) vsm_counts: dict = {} type_counts: dict = {} for r in all_relations: vsm_counts[_vsm_code(r.vsm_channel)] = vsm_counts.get(_vsm_code(r.vsm_channel), 0) + 1 type_counts[r.relation_type] = type_counts.get(r.relation_type, 0) + 1 click.echo(f"Relation graph — {n} relations") if G is not None: click.echo(f" Entities in graph: {G.number_of_nodes()}") click.echo(f" Feedback loops: {len(loops)}") click.echo() if stats_only: click.echo("Relation types:") for rt, count in sorted(type_counts.items(), key=lambda x: -x[1]): click.echo(f" {rt:<25} {count:>4}") click.echo() click.echo("VSM channels:") for ch, count in sorted(vsm_counts.items(), key=lambda x: -x[1]): click.echo(f" {ch:<20} {count:>4}") return # Feedback loops section if loops or loops_only: if loops: click.echo(f"Feedback loops ({len(loops)}):") for i, cycle in enumerate(loops, 1): click.echo(f" Loop {i}: {' → '.join(cycle)} → {cycle[0]}") click.echo() elif loops_only: click.echo("No feedback loops detected in current relation set.") return if loops_only: return # Filter relations filtered = all_relations if entity_slug: filtered = [r for r in filtered if entity_slug in (r.subject_slug, r.object_slug)] if not filtered: click.echo(f"No relations found involving '{entity_slug}'.") return if vsm_filter: filtered = [r for r in filtered if vsm_filter in r.vsm_channel] if not filtered: click.echo(f"No relations with VSM channel containing '{vsm_filter}'.") return # Display relations click.echo(f"{'Subject':<35} {'Predicate':<30} {'Object':<35} {'VSM'}") click.echo("-" * 110) for r in filtered: subj = r.subject[:33] + ".." if len(r.subject) > 35 else r.subject obj = r.object[:33] + ".." if len(r.object) > 35 else r.object pred = r.predicate[:28] + ".." if len(r.predicate) > 30 else r.predicate click.echo(f"{subj:<35} {pred:<30} {obj:<35} {r.vsm_channel}") # ── classify ───────────────────────────────────────────────────────── @infospace_commands.command() @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") @click.option("--entity", "entity_slug", default=None, help="Classify a single entity by slug.") @click.option("--provider", default="openrouter", help="LLM provider (openrouter, gemini, openai, …).") @click.option("--model", default=None, help="Model name override.") def classify(config_path: Optional[str], entity_slug: Optional[str], provider: str, model: Optional[str]): """Classify entities with Entity Type and VSM System (L2).""" cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent from markitect.infospace.classifier import run_entity_classification from markitect.llm import create_adapter from markitect.prompts.execution.models import RunConfig entity_list = parse_entity_directory(root / cfg.entities_dir) if not entity_list: click.echo("No entities found in " + str(root / cfg.entities_dir), err=True) return output_dir = root / cfg.classifications_dir if entity_slug: entity_list = [e for e in entity_list if e.slug == entity_slug] if not entity_list: click.echo(f"Entity '{entity_slug}' not found.", err=True) return else: # Incremental skip — entities already classified are omitted if output_dir.is_dir(): done_slugs = {p.stem for p in output_dir.glob("*.md")} before = len(entity_list) entity_list = [e for e in entity_list if e.slug not in done_slugs] skipped = before - len(entity_list) if skipped: click.echo(f"Skipping {skipped} already-classified entities.") if not entity_list: click.echo("All entities already classified. Nothing to do.") return click.echo(f"Classifying {len(entity_list)} entities …") output_dir.mkdir(parents=True, exist_ok=True) adapter = create_adapter(provider, model=model) run_config = RunConfig(model_name=model, temperature=0.1, max_tokens=2000) def _progress(done: int, total: int, result) -> None: if result.status == "success": click.echo(f" [{done}/{total}] {result.key}") else: click.echo(f" [{done}/{total}] {result.key} — FAILED: {result.error}") summary = run_entity_classification( config=cfg, entities=entity_list, adapter=adapter, run_config=run_config, output_dir=output_dir, progress_callback=_progress, ) click.echo(f"\nDone: {summary.succeeded} classified, {summary.failed} failed.") # ── classify-summary ────────────────────────────────────────────────── @infospace_commands.command(name="classify-summary") @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") @click.option("--update-metrics", "update_metrics", is_flag=True, default=False, help="Write type_distribution metrics to metrics.yaml.") def classify_summary(config_path: Optional[str], update_metrics: bool): """Show type × VSM distribution across all classified entities (L2).""" cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent from markitect.infospace.classification import ENTITY_TYPES, VSM_SYSTEMS from markitect.infospace.classification_io import read_classifications_directory cls_dir = root / cfg.classifications_dir if not cls_dir.is_dir(): click.echo("No classifications directory found. Run 'classify' first.") return all_cls = read_classifications_directory(cls_dir) if not all_cls: click.echo("No classification files found.") return n = len(all_cls) type_counts: dict = {} vsm_counts: dict = {} matrix: dict = {} # (entity_type, vsm_system) → count for c in all_cls: type_counts[c.entity_type] = type_counts.get(c.entity_type, 0) + 1 vsm_counts[c.vsm_system] = vsm_counts.get(c.vsm_system, 0) + 1 key = (c.entity_type, c.vsm_system) matrix[key] = matrix.get(key, 0) + 1 click.echo(f"Classification summary — {n} entities\n") click.echo("Entity types:") for t, count in sorted(type_counts.items(), key=lambda x: -x[1]): pct = 100 * count / n if n else 0.0 click.echo(f" {t:<15} {count:>4} ({pct:.1f}%)") click.echo() vsm_order = ["S1", "S2", "S3", "S3*", "S4", "S5"] click.echo("VSM systems:") for v in vsm_order: if v in vsm_counts: count = vsm_counts[v] pct = 100 * count / n if n else 0.0 click.echo(f" {v:<6} {count:>4} ({pct:.1f}%)") click.echo() # Type × VSM matrix header = f"{'':15}" + "".join(f"{v:>7}" for v in vsm_order) sep = "-" * (15 + 7 * len(vsm_order)) click.echo(header) click.echo(sep) for t in ENTITY_TYPES: row = f"{t:<15}" for v in vsm_order: c = matrix.get((t, v), 0) row += f"{c if c else '.':>7}" click.echo(row) click.echo() filled_cells = len(matrix) total_cells = len(ENTITY_TYPES) * len(vsm_order) click.echo(f"Matrix fill: {filled_cells}/{total_cells} cells occupied") click.echo() if update_metrics: import math from markitect.infospace.history import read_metrics_file, write_metrics_file metrics_dir = root / cfg.metrics_dir metrics_dir.mkdir(parents=True, exist_ok=True) # Type entropy type_entropy = 0.0 for count in type_counts.values(): p = count / n if p > 0: type_entropy -= p * math.log2(p) existing = read_metrics_file(metrics_dir / "metrics.yaml") new_metrics = { "type_distribution": type_counts, "vsm_type_matrix_cells": filled_cells, "type_entropy": round(type_entropy, 4), } merged = {**existing, **new_metrics} write_metrics_file(merged, metrics_dir / "metrics.yaml") click.echo( f"Updated metrics.yaml: type_entropy={type_entropy:.4f}, " f"vsm_type_matrix_cells={filled_cells}" ) # ── viability ──────────────────────────────────────────────────────── @infospace_commands.command() @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") def viability(config_path: Optional[str]): """Show viability dashboard — threshold checks and pass/fail.""" cfg, cfg_path = _load_config_or_exit(config_path) if not cfg.viability: click.echo("No viability thresholds configured in infospace.yaml.") return # Try to load latest metrics root = cfg_path.parent metrics: dict = {} metrics_file = root / cfg.metrics_dir / "metrics.yaml" if metrics_file.is_file(): import yaml raw = yaml.safe_load(metrics_file.read_text(encoding="utf-8")) if isinstance(raw, dict): metrics = {k: float(v) for k, v in raw.items() if isinstance(v, (int, float))} state = build_state(cfg, metrics=metrics if metrics else None) if not state.viability_results: click.echo("No metrics available. Run evaluations first.") click.echo("\nConfigured thresholds:") for name, t in cfg.viability.items(): bounds = [] if t.min is not None: bounds.append(f"min={t.min}") if t.max is not None: bounds.append(f"max={t.max}") click.echo(f" {name}: {', '.join(bounds)}") return click.echo(f"{'Metric':<30} {'Value':>8} {'Threshold':>15} {'Status':>8}") click.echo("-" * 63) for r in state.viability_results: bounds = [] if r.threshold.min is not None: bounds.append(f"min={r.threshold.min}") if r.threshold.max is not None: bounds.append(f"max={r.threshold.max}") status_str = "PASS" if r.passed else "FAIL" click.echo( f"{r.metric:<30} {r.value:>8.4f} {', '.join(bounds):>15} {status_str:>8}" ) click.echo() if state.is_viable: click.echo(f"Viable: YES ({state.viability_pass_count}/{state.viability_total_count} thresholds met)") else: click.echo(f"Viable: NO ({state.viability_pass_count}/{state.viability_total_count} thresholds met)") # ── check ─────────────────────────────────────────────────────────── @infospace_commands.command() @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") @click.option( "--concern", "concerns", multiple=True, type=click.Choice(["redundancy", "coverage", "coherence", "consistency", "granularity"]), help="Run specific concern(s). Omit to run all five.", ) @click.option("--json", "as_json", is_flag=True, help="Output results as JSON.") def check(config_path: Optional[str], concerns: tuple, as_json: bool): """Run collection-level quality checks (C1–C5).""" cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent entities_dir = root / cfg.entities_dir if not entities_dir.is_dir(): click.echo("Error: No entities directory found.", err=True) raise SystemExit(1) entity_list = parse_entity_directory(entities_dir) if not entity_list: click.echo("No entities to check.") return from markitect.infospace.checks import run_all_checks checks_list = list(concerns) if concerns else None report = run_all_checks( entities=entity_list, checks=checks_list, ) if as_json: import json click.echo(json.dumps(report.to_dict(), indent=2)) else: click.echo(f"Collection checks — {len(entity_list)} entities\n") d = report.to_dict() for concern_name, concern_data in d.items(): label = concern_data.get("concern", concern_name.upper()) click.echo(f" {label} — {concern_name}") for k, v in concern_data.items(): if k == "concern": continue click.echo(f" {k}: {v}") click.echo() # Show summary metrics m = report.metrics() if m and not as_json: click.echo("Metrics summary:") for k, v in sorted(m.items()): click.echo(f" {k}: {v:.4f}") # Record to history if m: from markitect.infospace.history import record_check_results snap = record_check_results(report, cfg, root, entity_count=len(entity_list)) if not as_json: click.echo(f"\nRecorded snapshot {snap.snapshot_id}") # ── history ───────────────────────────────────────────────────────── @infospace_commands.command() @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") @click.option("--metric", default=None, help="Show trend for a specific metric.") @click.option("--json", "as_json", is_flag=True, help="Output as JSON.") def history(config_path: Optional[str], metric: Optional[str], as_json: bool): """Show metrics history — snapshots over time.""" cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent from markitect.infospace.history import get_history, metric_trend snapshots = get_history(cfg, root) if not snapshots: click.echo("No history found. Run 'markitect infospace check' first.") return if metric: trend = metric_trend(snapshots, metric) if not trend: click.echo(f"No data for metric '{metric}'.") return if as_json: import json click.echo(json.dumps(trend, indent=2)) else: click.echo(f"Trend: {metric}\n") for entry in trend: click.echo(f" {entry['date'][:19]} {entry['value']:.4f}") return if as_json: import json click.echo(json.dumps([s.to_dict() for s in snapshots], indent=2, default=str)) return click.echo(f"History: {len(snapshots)} snapshot(s)\n") click.echo(f"{'#':<4} {'Date':<20} {'Entities':>8} {'Metrics':>8}") click.echo("-" * 42) for i, snap in enumerate(snapshots, 1): date_str = snap.created_at.isoformat()[:19] n_metrics = len(snap.collection_metrics) click.echo(f"{i:<4} {date_str:<20} {snap.entity_count:>8} {n_metrics:>8}") @infospace_commands.command(name="history-diff") @click.argument("date_a") @click.argument("date_b") @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") def history_diff(date_a: str, date_b: str, config_path: Optional[str]): """Compare two history snapshots by date (YYYY-MM-DD).""" cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent from markitect.infospace.history import find_snapshot_by_date, get_history from markitect.infospace.evaluation_io import diff_snapshots snapshots = get_history(cfg, root) if len(snapshots) < 2: click.echo("Need at least two snapshots to diff.") return snap_a = find_snapshot_by_date(snapshots, date_a) snap_b = find_snapshot_by_date(snapshots, date_b) if snap_a is None: click.echo(f"No snapshot found near '{date_a}'.") return if snap_b is None: click.echo(f"No snapshot found near '{date_b}'.") return if snap_a.snapshot_id == snap_b.snapshot_id: click.echo("Both dates resolve to the same snapshot.") return diff = diff_snapshots(snap_a, snap_b) click.echo(diff.summary()) # ── bind-discipline ───────────────────────────────────────────────── @infospace_commands.command(name="bind-discipline") @click.argument("discipline_path") @click.option("--name", required=True, help="Name for the discipline.") @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") def bind_discipline_cmd(discipline_path: str, name: str, config_path: Optional[str]): """Bind a discipline infospace to the current infospace.""" cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent from markitect.infospace.composition import bind_discipline status = bind_discipline(cfg, name=name, path=discipline_path, root=root) if status.error: click.echo(f"Error: {status.error}", err=True) raise SystemExit(1) # Persist updated config save_infospace_config(cfg, cfg_path) click.echo(f"Bound discipline '{name}' from {discipline_path}") click.echo(f" Entities: {status.entity_count}") if status.has_config: viable_str = "YES" if status.is_viable else "NO" click.echo(f" Viable: {viable_str}") # ── disciplines ───────────────────────────────────────────────────── @infospace_commands.command() @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") def disciplines(config_path: Optional[str]): """List bound disciplines and their viability status.""" cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent if not cfg.disciplines: click.echo("No disciplines bound.") return from markitect.infospace.composition import check_discipline_status click.echo(f"{'Name':<30} {'Entities':>8} {'Viable':>8} {'Path'}") click.echo("-" * 70) for binding in cfg.disciplines: status = check_discipline_status(binding, root) viable_str = "YES" if status.is_viable else ("NO" if status.has_config else "?") click.echo( f"{status.name:<30} {status.entity_count:>8} {viable_str:>8} {status.path}" ) if status.error: click.echo(f" Error: {status.error}") # ── process ───────────────────────────────────────────────────── @infospace_commands.command() @click.argument("glob_pattern", default=None, required=False) @click.option("--all", "process_all", is_flag=True, help="Process all source files.") @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") @click.option("--provider", default=None, help="LLM provider (openrouter, openai, etc.).") @click.option("--model", default=None, help="LLM model name.") @click.option( "--check-after-each", is_flag=True, help="Run collection checks (C1–C5) after each source file.", ) @click.option("--no-commit", is_flag=True, help="Skip git commits.") def process( glob_pattern: Optional[str], process_all: bool, config_path: Optional[str], provider: Optional[str], model: Optional[str], check_after_each: bool, no_commit: bool, ): """Process source files through the pipeline defined in infospace.yaml. GLOB_PATTERN is matched against the sources directory declared in infospace.yaml (default ``*.md``). Use ``--all`` to process every source file. \b Examples: # Process chapters 1-3 from book 1 markitect infospace process "book-1-chapter-0[1-3].md" --provider openrouter # Process all source files and check metrics after each markitect infospace process --all --provider openrouter --check-after-each # Dry run — load existing outputs only, no LLM calls markitect infospace process --all """ cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent if not cfg.pipeline or not cfg.pipeline.stages: click.echo( "Error: No pipeline stages defined in infospace.yaml.\n" "Add a 'pipeline.stages' section with at least one stage.", err=True, ) raise SystemExit(1) # Resolve sources directory sources_dir = root / cfg.topic.sources if cfg.topic.sources else root if not sources_dir.is_dir(): click.echo( f"Error: Sources directory not found: {sources_dir}\n" f"Set 'topic.sources' in infospace.yaml.", err=True, ) raise SystemExit(1) # Collect source files if process_all: source_files = sorted(sources_dir.glob("*.md")) else: pattern = glob_pattern or "*.md" source_files = sorted(sources_dir.glob(pattern)) if not source_files: if process_all: click.echo(f"No source files found in {sources_dir}") else: click.echo( f"No files matched: {glob_pattern or '*.md'}\n" f"Sources directory: {sources_dir}" ) return click.echo(f"Found {len(source_files)} source file(s) in {sources_dir.name}/") # Create LLM adapter adapter = None if provider: from markitect.llm import create_adapter _PROVIDER_DEFAULTS = {"openrouter": "arcee-ai/trinity-large-preview:free"} resolved_model = model or _PROVIDER_DEFAULTS.get(provider) adapter = create_adapter(provider, model=resolved_model) click.echo(f"LLM: {provider} ({resolved_model or 'default'})") else: click.echo("No LLM provider — will use existing outputs only (manual mode).") # Run pipeline from markitect.infospace.pipeline import SourcePipeline pipeline = SourcePipeline( cfg, root, adapter=adapter, provider=provider or "", model=(model or _PROVIDER_DEFAULTS.get(provider or "", "")) if provider else "", no_commit=no_commit, ) total = len(source_files) completed = 0 for i, source_file in enumerate(source_files, 1): click.echo(f"\n[{i}/{total}] {source_file.name}") success = pipeline.process_source(source_file) if success: completed += 1 if check_after_each: pipeline.run_collection_check() click.echo(f"\nDone: {completed}/{total} source file(s) fully processed.") # ── stale-mappings ────────────────────────────────────────────────── @infospace_commands.command(name="stale-mappings") @click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") def stale_mappings(config_path: Optional[str]): """Check for stale mappings due to discipline changes.""" cfg, cfg_path = _load_config_or_exit(config_path) root = cfg_path.parent if not cfg.disciplines: click.echo("No disciplines bound — no mappings to check.") return from markitect.infospace.composition import find_stale_mappings # Try to load mapping references from output mapping_refs = _load_mapping_references(cfg, root) stale = find_stale_mappings(cfg, root, mapping_references=mapping_refs) if not stale: click.echo("No stale mappings detected.") return click.echo(f"Found {len(stale)} stale mapping(s):\n") for s in stale: click.echo(f" {s.entity_slug} -> {s.discipline_entity}") click.echo(f" {s.reason}") def _load_mapping_references( cfg: InfospaceConfig, root: Path ) -> Optional[dict]: """Try to load mapping references from YAML file in output dir.""" mapping_file = root / cfg.metrics_dir / "mapping-references.yaml" if not mapping_file.is_file(): return None import yaml data = yaml.safe_load(mapping_file.read_text(encoding="utf-8")) if isinstance(data, dict): return data return None