Files
markitect-main/markitect/infospace/cli.py
tegwick d44a4cd3df feat(infospace,llm): agent ergonomics — entity lookup, model fallback, better errors
- `markitect infospace entity <name>`: single-entity lookup tolerating
  hyphens/underscores/case, with substring matching, ambiguity listing,
  and near-match hints. Prints slug, source path, domain, chapter, word
  count, VSM system, overall score, evaluator, and evaluation file path.
- `markitect infospace evaluate --model-fallback <model>`: if any
  entities fail with a rate-limit error, retry just those with a fresh
  adapter on the fallback model (different free-tier models have
  separate quota buckets).
- `markitect llm-check`: advisory when `OPENROUTER_API_KEY` is set but
  not used by the resolved provider; targeted hint when OpenRouter
  returns 401 (almost always a stale env key).
- `build_state`: raises `TypeError` with actionable message if passed a
  path instead of an `InfospaceConfig` — prior failure mode was a
  confusing `AttributeError` deep in the stack.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 01:07:25 +02:00

1436 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CLI commands for infospace lifecycle management.
Provides ``markitect infospace`` subcommands for initialising,
inspecting, and evaluating infospaces.
"""
from __future__ import annotations
from pathlib import Path
from typing import Optional
import click
from markitect.infospace.config import (
DisciplineBinding,
InfospaceConfig,
SchemaRegistry,
TopicConfig,
find_infospace_config,
load_infospace_config,
save_infospace_config,
)
from markitect.infospace.entity_parser import parse_entity_directory
from markitect.infospace.state import build_state
def _load_config_or_exit(config_path: Optional[str] = None) -> tuple:
"""Resolve and load infospace.yaml, or exit with an error."""
if config_path:
p = Path(config_path)
else:
p = find_infospace_config()
if p is None:
click.echo("Error: No infospace.yaml found. Run 'markitect infospace init' first.", err=True)
raise SystemExit(1)
cfg = load_infospace_config(p)
return cfg, p
@click.group(name="infospace")
def infospace_commands():
"""Manage infospaces — create, inspect, evaluate."""
pass
# ── init ─────────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--topic", required=True, help="Topic name for the infospace.")
@click.option("--domain", default="", help="Knowledge domain.")
@click.option("--sources", default="", help="Path to source material directory.")
@click.option("--discipline", multiple=True, help="Discipline name (repeatable).")
@click.option("--output", "-o", default="infospace.yaml", help="Output config file path.")
def init(topic: str, domain: str, sources: str, discipline: tuple, output: str):
"""Initialise a new infospace configuration file."""
out_path = Path(output)
if out_path.exists():
click.echo(f"Error: {out_path} already exists.", err=True)
raise SystemExit(1)
disciplines = [DisciplineBinding(name=d) for d in discipline]
config = InfospaceConfig(
topic=TopicConfig(name=topic, domain=domain, sources=sources),
disciplines=disciplines,
)
save_infospace_config(config, out_path)
click.echo(f"Created {out_path}")
# ── status ───────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def status(config_path: Optional[str]):
"""Show infospace status — entity count, domains, evaluation state."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
# Parse entities
entities_dir = root / cfg.entities_dir
entities = []
if entities_dir.is_dir():
entities = parse_entity_directory(entities_dir)
# Load latest snapshot if available
snapshot = None
history_path = root / cfg.metrics_dir / "history.yaml"
if history_path.is_file():
from markitect.infospace.evaluation_io import read_history
history = read_history(history_path)
if history:
snapshot = history[-1]
state = build_state(cfg, entities=entities, snapshot=snapshot)
click.echo(f"Infospace: {state.topic_name}")
if cfg.topic.domain:
click.echo(f"Domain: {cfg.topic.domain}")
click.echo(f"Entities: {state.entity_count}")
if state.domains:
click.echo(f"Domains: {', '.join(state.domains)}")
if cfg.disciplines:
names = [d.name for d in cfg.disciplines]
click.echo(f"Disciplines: {', '.join(names)}")
if state.has_evaluations:
click.echo(f"Last evaluated: {state.latest_snapshot.created_at.isoformat()}")
else:
click.echo("Evaluations: none")
# ── entities ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option(
"--sort-by", "sort_key",
type=click.Choice(["slug", "domain", "words"]),
default="slug",
help="Sort entities by field.",
)
@click.option("--by-type", "by_type", is_flag=True, default=False,
help="Group entities by L2 entity type.")
def entities(config_path: Optional[str], sort_key: str, by_type: bool):
"""List entities with metadata summary."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("No entities directory found.")
return
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities found.")
return
if by_type:
_entities_by_type(cfg, root, entity_list)
return
# Sort
if sort_key == "domain":
entity_list.sort(key=lambda e: (e.domain or "", e.slug))
elif sort_key == "words":
entity_list.sort(key=lambda e: e.total_word_count, reverse=True)
else:
entity_list.sort(key=lambda e: e.slug)
# Format as table
click.echo(f"{'Slug':<40} {'Domain':<20} {'Words':>6}")
click.echo("-" * 68)
for e in entity_list:
click.echo(f"{e.slug:<40} {(e.domain or '-'):<20} {e.total_word_count:>6}")
click.echo(f"\nTotal: {len(entity_list)} entities")
def _entities_by_type(cfg, root: "Path", entity_list: list) -> None:
"""Print entities grouped by L2 entity type."""
from markitect.infospace.classification import ENTITY_TYPES
from markitect.infospace.classification_io import read_classifications_directory
from markitect.infospace.evaluation_io import read_entity_evaluation
# Load classifications
cls_dir = root / cfg.classifications_dir
cls_map: dict = {}
if cls_dir.is_dir():
from markitect.infospace.classification_io import read_classifications_directory
for c in read_classifications_directory(cls_dir):
cls_map[c.entity_slug] = c
# Load evaluation scores (best-effort)
eval_dir = root / cfg.evaluations_dir
eval_scores: dict = {} # slug → overall_score
if eval_dir.is_dir():
for ef in eval_dir.glob("*.md"):
try:
ev = read_entity_evaluation(ef)
eval_scores[ev.entity_slug] = ev.overall_score
except Exception:
pass
# Build index: entity_type → list of (entity, classification)
entity_index = {
t: [] for t in ENTITY_TYPES
}
entity_index["Unclassified"] = []
entity_map = {e.slug: e for e in entity_list}
for e in entity_list:
cls = cls_map.get(e.slug)
if cls is None:
entity_index["Unclassified"].append((e, None))
else:
bucket = cls.entity_type if cls.entity_type in entity_index else "Unclassified"
entity_index[bucket].append((e, cls))
# Print each type group
type_order = list(ENTITY_TYPES) + ["Unclassified"]
total = 0
for etype in type_order:
group = entity_index.get(etype, [])
if not group:
continue
click.echo(f"\n=== {etype} ({len(group)} entities) ===")
group.sort(key=lambda x: x[0].slug)
for e, cls in group:
vsm = cls.vsm_system if cls else ""
domain = (e.domain or "-")[:18]
score = eval_scores.get(e.slug)
score_str = f" \u2605{score:.1f}" if score is not None else ""
slug_col = f"{e.slug:<40}"
click.echo(f" {slug_col} {domain:<18} {vsm:<4}{score_str}")
if cls and cls.entity_type == "Relation" and cls.links_mechanism:
subj = cls.links_subject or cls.links_subject_slug or "?"
obj = cls.links_object or cls.links_object_slug or "?"
click.echo(f" \u2192 links: {subj} \u2194 {obj}")
mech = cls.links_mechanism
if len(mech) > 80:
mech = mech[:77] + "..."
click.echo(f" \u2192 mechanism: {mech}")
total += len(group)
click.echo(f"\nTotal: {total} entities")
# ── entity (single lookup) ───────────────────────────────────────────
@infospace_commands.command()
@click.argument("name")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def entity(name: str, config_path: Optional[str]):
"""Look up one entity by name, tolerating case / hyphens / underscores.
Prints slug, source path, domain, chapter, word count, overall score,
VSM system (if classified), and evaluation-file path.
"""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("No entities directory found.", err=True)
raise SystemExit(1)
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities found.", err=True)
raise SystemExit(1)
# Normalize: lowercase, underscores.
def norm(s: str) -> str:
return s.lower().replace("-", "_").replace(" ", "_")
target = norm(name)
by_slug = {e.slug: e for e in entity_list}
match = by_slug.get(target)
if match is None:
# Substring fallback for partial input.
candidates = [e for e in entity_list if target in norm(e.slug)]
if len(candidates) == 1:
match = candidates[0]
elif len(candidates) > 1:
click.echo(f"Ambiguous — '{name}' matches multiple entities:", err=True)
for c in sorted(candidates, key=lambda e: e.slug)[:10]:
click.echo(f" {c.slug}", err=True)
if len(candidates) > 10:
click.echo(f" … and {len(candidates) - 10} more", err=True)
raise SystemExit(1)
else:
click.echo(f"No entity matching '{name}'.", err=True)
near = sorted(
e.slug for e in entity_list
if target.split("_", 1)[0] in e.slug
)[:5]
if near:
click.echo(f" Near matches: {', '.join(near)}", err=True)
raise SystemExit(1)
# Load score + classification (best-effort).
score: Optional[float] = None
evaluator: Optional[str] = None
eval_file = root / cfg.evaluations_dir / f"{match.slug}.md"
if eval_file.is_file():
try:
from markitect.infospace.evaluation_io import read_entity_evaluation
ev = read_entity_evaluation(eval_file)
score = ev.overall_score
evaluator = ev.evaluator
except Exception:
pass
vsm: Optional[str] = None
cls_file = root / cfg.classifications_dir / f"{match.slug}.md"
if cls_file.is_file():
try:
from markitect.infospace.classification_io import read_entity_classification
cls = read_entity_classification(cls_file)
vsm = cls.vsm_system
except Exception:
pass
# Output — one field per line so it's easy to grep or pipe.
click.echo(f"slug: {match.slug}")
click.echo(f"source_path: {match.source_path}")
click.echo(f"domain: {match.domain or '-'}")
click.echo(f"chapter: {match.source_chapter or '-'}")
click.echo(f"word_count: {match.total_word_count}")
click.echo(f"vsm_system: {vsm or '-'}")
if score is not None:
click.echo(f"overall_score: {score:.2f}")
click.echo(f"evaluator: {evaluator or '-'}")
click.echo(f"evaluation: {eval_file}")
else:
click.echo("evaluation: (not yet evaluated)")
# ── evaluate ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--provider", default="openrouter", help="LLM provider (openrouter, openai, etc.).")
@click.option("--model", default=None, help="LLM model name.")
@click.option("--entity", "entity_slug", default=None, help="Evaluate a single entity by slug.")
@click.option("--chapter", default=None, help="Evaluate entities from a specific chapter.")
@click.option("--force", is_flag=True, default=False,
help="Re-evaluate entities whose evaluation file already exists.")
@click.option("--model-fallback", "model_fallback", default=None,
help="If the primary model hits a rate limit (429), retry the "
"failed entities once with this model. Useful on free tiers "
"where models have separate quota buckets (e.g. "
"gemini-2.5-flash → gemini-2.5-flash-lite).")
def evaluate(config_path, provider, model, entity_slug, chapter, force, model_fallback):
"""Evaluate entities using LLM-based quality assessment."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("Error: No entities directory found.", err=True)
raise SystemExit(1)
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities to evaluate.")
return
# Filter. Accept hyphenated input for --entity by normalizing to the
# underscore slug format produced by parse_entity_directory.
if entity_slug:
normalized = entity_slug.replace("-", "_")
matches = [e for e in entity_list if e.slug == normalized]
if not matches:
# Build a short "did you mean…" list from entities sharing a stem.
stem = normalized.split("_", 1)[0]
near = sorted(e.slug for e in entity_list if e.slug.startswith(stem))[:5]
msg = f"Error: Entity '{entity_slug}' not found."
if near:
msg += f" Did you mean: {', '.join(near)} ?"
click.echo(msg, err=True)
raise SystemExit(1)
entity_list = matches
elif chapter:
entity_list = [e for e in entity_list if chapter in e.source_chapter]
if not entity_list:
click.echo(f"No entities found for chapter '{chapter}'.")
return
# Skip entities that already have evaluation files (incremental resume).
# Applies uniformly to full-pass, --entity, and --chapter runs unless
# --force is set.
from markitect.infospace.evaluate import run_entity_evaluation
output_dir = root / cfg.evaluations_dir
if not force and output_dir.is_dir():
existing = {p.stem for p in output_dir.glob("*.md")}
before = len(entity_list)
entity_list = [e for e in entity_list if e.slug not in existing]
skipped = before - len(entity_list)
if not entity_list:
click.echo("All selected entities already evaluated. "
"Re-run with --force to overwrite.")
return
if skipped:
click.echo(f"Skipping {skipped} already-evaluated entities. "
"Use --force to re-evaluate.")
# Create adapter
from markitect.llm import create_adapter
from markitect.prompts.execution.models import RunConfig
adapter = create_adapter(provider, model=model)
run_config = RunConfig(model_name=model, temperature=0.3, max_tokens=2000)
# Progress callback — surface error detail so agents don't have to
# drop into Python to see whether an ERROR was 429, 503, or auth.
def on_progress(done, total, result):
status = result.status.upper()
if status == "ERROR" and result.error:
click.echo(f" [{done}/{total}] {result.key}: ERROR — {result.error}")
else:
click.echo(f" [{done}/{total}] {result.key}: {status}")
click.echo(f"Evaluating {len(entity_list)} entities via {provider}...")
summary = run_entity_evaluation(
config=cfg,
entities=entity_list,
adapter=adapter,
run_config=run_config,
output_dir=output_dir,
progress_callback=on_progress,
)
# Model fallback: if any entities failed with a rate-limit-looking
# error and the user opted in with --model-fallback, retry them once
# with a fresh adapter on the fallback model. Different free-tier
# models have separate quota buckets, so this often succeeds when
# the primary is exhausted.
if model_fallback and summary.failed > 0:
rate_limited = [
r for r in summary.results
if r.status == "error"
and r.error
and ("429" in r.error or "rate" in r.error.lower())
]
if rate_limited:
retry_slugs = {r.key for r in rate_limited}
retry_entities = [e for e in entity_list if e.slug in retry_slugs]
click.echo(
f"\n{len(retry_entities)} rate-limited entities — "
f"retrying with --model-fallback {model_fallback}..."
)
fb_adapter = create_adapter(provider, model=model_fallback)
fb_run_config = RunConfig(
model_name=model_fallback, temperature=0.3, max_tokens=2000
)
fb_summary = run_entity_evaluation(
config=cfg,
entities=retry_entities,
adapter=fb_adapter,
run_config=fb_run_config,
output_dir=output_dir,
progress_callback=on_progress,
)
summary.succeeded += fb_summary.succeeded
summary.failed = (summary.failed - len(retry_entities)) + fb_summary.failed
summary.total_prompt_tokens += fb_summary.total_prompt_tokens
summary.total_completion_tokens += fb_summary.total_completion_tokens
click.echo(f"\nDone: {summary.succeeded} succeeded, {summary.failed} failed, {summary.skipped} skipped")
if summary.total_tokens > 0:
click.echo(f"Tokens used: {summary.total_tokens}")
# ── eval-summary ──────────────────────────────────────────────────────
@infospace_commands.command(name="eval-summary")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--update-metrics", is_flag=True, default=False,
help="Merge per_entity_mean into metrics.yaml for viability checks.")
def eval_summary(config_path: Optional[str], update_metrics: bool):
"""Show aggregate statistics from per-entity evaluation files."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
evaluations_dir = root / cfg.evaluations_dir
if not evaluations_dir.is_dir():
click.echo("No evaluations directory found. Run 'markitect infospace evaluate' first.")
return
from markitect.infospace.evaluation_io import read_entity_evaluation
eval_files = sorted(evaluations_dir.glob("*.md"))
if not eval_files:
click.echo("No evaluation files found.")
return
overall_scores: list = []
dim_scores: dict = {}
failed: list = []
for ef in eval_files:
try:
ev = read_entity_evaluation(ef)
overall_scores.append(ev.overall_score)
for s in ev.scores:
dim_scores.setdefault(s.name, []).append(s.value)
except Exception as exc:
failed.append((ef.stem, str(exc)))
n = len(overall_scores)
if n == 0:
click.echo("No evaluations could be read.")
return
mean_overall = sum(overall_scores) / n
click.echo(f"Evaluation summary — {n} entities evaluated")
if failed:
click.echo(f" (failed to read: {len(failed)})")
click.echo()
click.echo(f" {'Dimension':<30} {'Mean':>6}")
click.echo(" " + "-" * 38)
click.echo(f" {'overall':<30} {mean_overall:>6.3f}")
for dim, vals in sorted(dim_scores.items()):
click.echo(f" {dim:<30} {sum(vals)/len(vals):>6.3f}")
score_min = min(overall_scores)
score_max = max(overall_scores)
click.echo()
click.echo(f" Range: {score_min:.2f} {score_max:.2f}")
if update_metrics:
from markitect.infospace.history import read_metrics_file, write_metrics_file
metrics_file = root / cfg.metrics_dir / "metrics.yaml"
existing = read_metrics_file(metrics_file)
existing["per_entity_mean"] = round(mean_overall, 6)
write_metrics_file(existing, metrics_file)
click.echo(f"\nUpdated metrics.yaml: per_entity_mean = {mean_overall:.4f}")
# ── relations ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--entity", "entity_slug", default=None,
help="Show only relations involving this entity slug.")
@click.option("--vsm", "vsm_filter", default=None,
help="Show only relations whose VSM channel contains this string (e.g. S2, S3).")
@click.option("--loops", "loops_only", is_flag=True, default=False,
help="Show only feedback loops (cycles in the relation graph).")
@click.option("--stats", "stats_only", is_flag=True, default=False,
help="Show aggregate statistics only, no individual relations.")
def relations(config_path: Optional[str], entity_slug: Optional[str],
vsm_filter: Optional[str], loops_only: bool, stats_only: bool):
"""Show the L3 relation graph — triplets, feedback loops, and VSM channels."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.relation_parser import parse_relations_directory
relations_dir = root / cfg.relations_dir
if not relations_dir.is_dir():
click.echo("No relations directory found. Create output/relations/ and add relation files.")
return
all_relations = parse_relations_directory(relations_dir)
if not all_relations:
click.echo("No relation files found in " + str(relations_dir))
return
# Build directed graph for cycle detection
try:
import networkx as nx
G = nx.DiGraph()
for r in all_relations:
G.add_edge(r.subject_slug, r.object_slug,
predicate=r.predicate,
relation_type=r.relation_type,
vsm_channel=r.vsm_channel,
slug=r.slug)
except ImportError:
G = None
# Find feedback loops
loops = []
if G is not None:
try:
loops = list(nx.simple_cycles(G))
except Exception:
loops = []
# Stats summary
import re as _re
def _vsm_code(channel: str) -> str:
"""Strip parenthetical description, returning just the system code (e.g. 'S3 → S1')."""
return _re.sub(r'\s*\(.*', '', channel).strip() or channel
n = len(all_relations)
vsm_counts: dict = {}
type_counts: dict = {}
for r in all_relations:
vsm_counts[_vsm_code(r.vsm_channel)] = vsm_counts.get(_vsm_code(r.vsm_channel), 0) + 1
type_counts[r.relation_type] = type_counts.get(r.relation_type, 0) + 1
click.echo(f"Relation graph — {n} relations")
if G is not None:
click.echo(f" Entities in graph: {G.number_of_nodes()}")
click.echo(f" Feedback loops: {len(loops)}")
click.echo()
if stats_only:
click.echo("Relation types:")
for rt, count in sorted(type_counts.items(), key=lambda x: -x[1]):
click.echo(f" {rt:<25} {count:>4}")
click.echo()
click.echo("VSM channels:")
for ch, count in sorted(vsm_counts.items(), key=lambda x: -x[1]):
click.echo(f" {ch:<20} {count:>4}")
return
# Feedback loops section
if loops or loops_only:
if loops:
click.echo(f"Feedback loops ({len(loops)}):")
for i, cycle in enumerate(loops, 1):
click.echo(f" Loop {i}: {''.join(cycle)}{cycle[0]}")
click.echo()
elif loops_only:
click.echo("No feedback loops detected in current relation set.")
return
if loops_only:
return
# Filter relations
filtered = all_relations
if entity_slug:
filtered = [r for r in filtered
if entity_slug in (r.subject_slug, r.object_slug)]
if not filtered:
click.echo(f"No relations found involving '{entity_slug}'.")
return
if vsm_filter:
filtered = [r for r in filtered if vsm_filter in r.vsm_channel]
if not filtered:
click.echo(f"No relations with VSM channel containing '{vsm_filter}'.")
return
# Display relations
click.echo(f"{'Subject':<35} {'Predicate':<30} {'Object':<35} {'VSM'}")
click.echo("-" * 110)
for r in filtered:
subj = r.subject[:33] + ".." if len(r.subject) > 35 else r.subject
obj = r.object[:33] + ".." if len(r.object) > 35 else r.object
pred = r.predicate[:28] + ".." if len(r.predicate) > 30 else r.predicate
click.echo(f"{subj:<35} {pred:<30} {obj:<35} {r.vsm_channel}")
# ── classify ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--entity", "entity_slug", default=None,
help="Classify a single entity by slug.")
@click.option("--provider", default="openrouter",
help="LLM provider (openrouter, gemini, openai, …).")
@click.option("--model", default=None, help="Model name override.")
@click.option("--rpm", default=0, type=int,
help="Max requests per minute (0 = unlimited). Use 10 for Gemini free tier.")
def classify(config_path: Optional[str], entity_slug: Optional[str],
provider: str, model: Optional[str], rpm: int):
"""Classify entities with Entity Type and VSM System (L2)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.classifier import run_entity_classification
from markitect.llm import create_adapter
from markitect.prompts.execution.models import RunConfig
entity_list = parse_entity_directory(root / cfg.entities_dir)
if not entity_list:
click.echo("No entities found in " + str(root / cfg.entities_dir), err=True)
return
output_dir = root / cfg.classifications_dir
if entity_slug:
entity_list = [e for e in entity_list if e.slug == entity_slug]
if not entity_list:
click.echo(f"Entity '{entity_slug}' not found.", err=True)
return
else:
# Incremental skip — entities already classified are omitted
if output_dir.is_dir():
done_slugs = {p.stem for p in output_dir.glob("*.md")}
before = len(entity_list)
entity_list = [e for e in entity_list if e.slug not in done_slugs]
skipped = before - len(entity_list)
if skipped:
click.echo(f"Skipping {skipped} already-classified entities.")
if not entity_list:
click.echo("All entities already classified. Nothing to do.")
return
delay = (60.0 / rpm) if rpm > 0 else 0.0
click.echo(f"Classifying {len(entity_list)} entities …" +
(f" (rate: {rpm} RPM, {delay:.1f}s delay)" if delay else ""))
output_dir.mkdir(parents=True, exist_ok=True)
adapter = create_adapter(provider, model=model)
run_config = RunConfig(model_name=model, temperature=0.1, max_tokens=2000)
def _progress(done: int, total: int, result) -> None:
if result.status == "success":
click.echo(f" [{done}/{total}] {result.key}")
else:
click.echo(f" [{done}/{total}] {result.key} — FAILED: {result.error}")
summary = run_entity_classification(
config=cfg,
entities=entity_list,
adapter=adapter,
run_config=run_config,
output_dir=output_dir,
progress_callback=_progress,
delay_seconds=delay,
)
click.echo(f"\nDone: {summary.succeeded} classified, {summary.failed} failed.")
# ── classify-summary ──────────────────────────────────────────────────
@infospace_commands.command(name="classify-summary")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--update-metrics", "update_metrics", is_flag=True, default=False,
help="Write type_distribution metrics to metrics.yaml.")
def classify_summary(config_path: Optional[str], update_metrics: bool):
"""Show type × VSM distribution across all classified entities (L2)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.classification import ENTITY_TYPES, VSM_SYSTEMS
from markitect.infospace.classification_io import read_classifications_directory
cls_dir = root / cfg.classifications_dir
if not cls_dir.is_dir():
click.echo("No classifications directory found. Run 'classify' first.")
return
all_cls = read_classifications_directory(cls_dir)
if not all_cls:
click.echo("No classification files found.")
return
n = len(all_cls)
type_counts: dict = {}
vsm_counts: dict = {}
matrix: dict = {} # (entity_type, vsm_system) → count
for c in all_cls:
type_counts[c.entity_type] = type_counts.get(c.entity_type, 0) + 1
vsm_counts[c.vsm_system] = vsm_counts.get(c.vsm_system, 0) + 1
key = (c.entity_type, c.vsm_system)
matrix[key] = matrix.get(key, 0) + 1
click.echo(f"Classification summary — {n} entities\n")
click.echo("Entity types:")
for t, count in sorted(type_counts.items(), key=lambda x: -x[1]):
pct = 100 * count / n if n else 0.0
click.echo(f" {t:<15} {count:>4} ({pct:.1f}%)")
click.echo()
vsm_order = ["S1", "S2", "S3", "S3*", "S4", "S5"]
click.echo("VSM systems:")
for v in vsm_order:
if v in vsm_counts:
count = vsm_counts[v]
pct = 100 * count / n if n else 0.0
click.echo(f" {v:<6} {count:>4} ({pct:.1f}%)")
click.echo()
# Type × VSM matrix
header = f"{'':15}" + "".join(f"{v:>7}" for v in vsm_order)
sep = "-" * (15 + 7 * len(vsm_order))
click.echo(header)
click.echo(sep)
for t in ENTITY_TYPES:
row = f"{t:<15}"
for v in vsm_order:
c = matrix.get((t, v), 0)
row += f"{c if c else '.':>7}"
click.echo(row)
click.echo()
filled_cells = len(matrix)
total_cells = len(ENTITY_TYPES) * len(vsm_order)
click.echo(f"Matrix fill: {filled_cells}/{total_cells} cells occupied")
click.echo()
if update_metrics:
import math
from markitect.infospace.history import read_metrics_file, write_metrics_file
metrics_dir = root / cfg.metrics_dir
metrics_dir.mkdir(parents=True, exist_ok=True)
# Type entropy
type_entropy = 0.0
for count in type_counts.values():
p = count / n
if p > 0:
type_entropy -= p * math.log2(p)
existing = read_metrics_file(metrics_dir / "metrics.yaml")
new_metrics = {
"type_distribution": type_counts,
"vsm_type_matrix_cells": filled_cells,
"type_entropy": round(type_entropy, 4),
}
merged = {**existing, **new_metrics}
write_metrics_file(merged, metrics_dir / "metrics.yaml")
click.echo(
f"Updated metrics.yaml: type_entropy={type_entropy:.4f}, "
f"vsm_type_matrix_cells={filled_cells}"
)
# ── classify-links ────────────────────────────────────────────────────
@infospace_commands.command(name="classify-links")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--provider", default="openrouter",
help="LLM provider (openrouter, gemini, openai, …).")
@click.option("--model", default=None, help="Model name override.")
def classify_links(config_path: Optional[str], provider: str, model: Optional[str]):
"""Capture relation endpoint data (subject, object, mechanism) for Relation-type entities."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.classification import ENTITY_TYPES
from markitect.infospace.classification_io import read_classifications_directory
from markitect.infospace.classifier import run_relation_link_capture
from markitect.llm import create_adapter
from markitect.prompts.execution.models import RunConfig
cls_dir = root / cfg.classifications_dir
if not cls_dir.is_dir():
click.echo("No classifications directory found. Run 'classify' first.", err=True)
raise SystemExit(1)
all_cls = read_classifications_directory(cls_dir)
cls_map = {c.entity_slug: c for c in all_cls}
# Filter to Relation-type entities that are missing links_mechanism
relation_slugs = [
c.entity_slug for c in all_cls
if c.entity_type == "Relation" and not c.links_mechanism
]
if not relation_slugs:
click.echo("All Relation-type entities already have endpoint data. Nothing to do.")
return
# Load entity metadata for these slugs
entity_list = parse_entity_directory(root / cfg.entities_dir)
entity_map = {e.slug: e for e in entity_list}
relation_entities = [entity_map[s] for s in relation_slugs if s in entity_map]
missing_from_entities = [s for s in relation_slugs if s not in entity_map]
if missing_from_entities:
click.echo(f"Warning: {len(missing_from_entities)} Relation-type slugs not found in "
f"entities directory and will be skipped.")
if not relation_entities:
click.echo("No Relation-type entities found to enrich.")
return
click.echo(f"Capturing relation links for {len(relation_entities)} Relation-type entities …")
adapter = create_adapter(provider, model=model)
run_config = RunConfig(model_name=model, temperature=0.1, max_tokens=512)
def _progress(done: int, total: int, result) -> None:
if result.status == "success":
click.echo(f" [{done}/{total}] {result.key}")
else:
click.echo(f" [{done}/{total}] {result.key} — FAILED: {result.error}")
summary = run_relation_link_capture(
config=cfg,
relation_entities=relation_entities,
classifications=cls_map,
adapter=adapter,
run_config=run_config,
output_dir=cls_dir,
progress_callback=_progress,
)
click.echo(f"\nDone: {summary.succeeded} enriched, {summary.failed} failed.")
# ── viability ────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def viability(config_path: Optional[str]):
"""Show viability dashboard — threshold checks and pass/fail."""
cfg, cfg_path = _load_config_or_exit(config_path)
if not cfg.viability:
click.echo("No viability thresholds configured in infospace.yaml.")
return
# Try to load latest metrics
root = cfg_path.parent
metrics: dict = {}
metrics_file = root / cfg.metrics_dir / "metrics.yaml"
if metrics_file.is_file():
import yaml
raw = yaml.safe_load(metrics_file.read_text(encoding="utf-8"))
if isinstance(raw, dict):
metrics = {k: float(v) for k, v in raw.items() if isinstance(v, (int, float))}
state = build_state(cfg, metrics=metrics if metrics else None)
if not state.viability_results:
click.echo("No metrics available. Run evaluations first.")
click.echo("\nConfigured thresholds:")
for name, t in cfg.viability.items():
bounds = []
if t.min is not None:
bounds.append(f"min={t.min}")
if t.max is not None:
bounds.append(f"max={t.max}")
click.echo(f" {name}: {', '.join(bounds)}")
return
click.echo(f"{'Metric':<30} {'Value':>8} {'Threshold':>15} {'Status':>8}")
click.echo("-" * 63)
for r in state.viability_results:
bounds = []
if r.threshold.min is not None:
bounds.append(f"min={r.threshold.min}")
if r.threshold.max is not None:
bounds.append(f"max={r.threshold.max}")
status_str = "PASS" if r.passed else "FAIL"
click.echo(
f"{r.metric:<30} {r.value:>8.4f} {', '.join(bounds):>15} {status_str:>8}"
)
click.echo()
if state.is_viable:
click.echo(f"Viable: YES ({state.viability_pass_count}/{state.viability_total_count} thresholds met)")
else:
click.echo(f"Viable: NO ({state.viability_pass_count}/{state.viability_total_count} thresholds met)")
# ── check ───────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option(
"--concern", "concerns", multiple=True,
type=click.Choice(["redundancy", "coverage", "coherence", "consistency", "granularity"]),
help="Run specific concern(s). Omit to run all five.",
)
@click.option("--json", "as_json", is_flag=True, help="Output results as JSON.")
def check(config_path: Optional[str], concerns: tuple, as_json: bool):
"""Run collection-level quality checks (C1C5)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("Error: No entities directory found.", err=True)
raise SystemExit(1)
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities to check.")
return
from markitect.infospace.checks import run_all_checks
checks_list = list(concerns) if concerns else None
report = run_all_checks(
entities=entity_list,
checks=checks_list,
)
if as_json:
import json
click.echo(json.dumps(report.to_dict(), indent=2))
else:
click.echo(f"Collection checks — {len(entity_list)} entities\n")
d = report.to_dict()
for concern_name, concern_data in d.items():
label = concern_data.get("concern", concern_name.upper())
click.echo(f" {label}{concern_name}")
for k, v in concern_data.items():
if k == "concern":
continue
click.echo(f" {k}: {v}")
click.echo()
# Show summary metrics
m = report.metrics()
if m and not as_json:
click.echo("Metrics summary:")
for k, v in sorted(m.items()):
click.echo(f" {k}: {v:.4f}")
# Record to history
if m:
from markitect.infospace.history import record_check_results
snap = record_check_results(report, cfg, root, entity_count=len(entity_list))
if not as_json:
click.echo(f"\nRecorded snapshot {snap.snapshot_id}")
# ── history ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--metric", default=None, help="Show trend for a specific metric.")
@click.option("--json", "as_json", is_flag=True, help="Output as JSON.")
def history(config_path: Optional[str], metric: Optional[str], as_json: bool):
"""Show metrics history — snapshots over time."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.history import get_history, metric_trend
snapshots = get_history(cfg, root)
if not snapshots:
click.echo("No history found. Run 'markitect infospace check' first.")
return
if metric:
trend = metric_trend(snapshots, metric)
if not trend:
click.echo(f"No data for metric '{metric}'.")
return
if as_json:
import json
click.echo(json.dumps(trend, indent=2))
else:
click.echo(f"Trend: {metric}\n")
for entry in trend:
click.echo(f" {entry['date'][:19]} {entry['value']:.4f}")
return
if as_json:
import json
click.echo(json.dumps([s.to_dict() for s in snapshots], indent=2, default=str))
return
click.echo(f"History: {len(snapshots)} snapshot(s)\n")
click.echo(f"{'#':<4} {'Date':<20} {'Entities':>8} {'Metrics':>8}")
click.echo("-" * 42)
for i, snap in enumerate(snapshots, 1):
date_str = snap.created_at.isoformat()[:19]
n_metrics = len(snap.collection_metrics)
click.echo(f"{i:<4} {date_str:<20} {snap.entity_count:>8} {n_metrics:>8}")
@infospace_commands.command(name="history-diff")
@click.argument("date_a")
@click.argument("date_b")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def history_diff(date_a: str, date_b: str, config_path: Optional[str]):
"""Compare two history snapshots by date (YYYY-MM-DD)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.history import find_snapshot_by_date, get_history
from markitect.infospace.evaluation_io import diff_snapshots
snapshots = get_history(cfg, root)
if len(snapshots) < 2:
click.echo("Need at least two snapshots to diff.")
return
snap_a = find_snapshot_by_date(snapshots, date_a)
snap_b = find_snapshot_by_date(snapshots, date_b)
if snap_a is None:
click.echo(f"No snapshot found near '{date_a}'.")
return
if snap_b is None:
click.echo(f"No snapshot found near '{date_b}'.")
return
if snap_a.snapshot_id == snap_b.snapshot_id:
click.echo("Both dates resolve to the same snapshot.")
return
diff = diff_snapshots(snap_a, snap_b)
click.echo(diff.summary())
# ── bind-discipline ─────────────────────────────────────────────────
@infospace_commands.command(name="bind-discipline")
@click.argument("discipline_path")
@click.option("--name", required=True, help="Name for the discipline.")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def bind_discipline_cmd(discipline_path: str, name: str, config_path: Optional[str]):
"""Bind a discipline infospace to the current infospace."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.composition import bind_discipline
status = bind_discipline(cfg, name=name, path=discipline_path, root=root)
if status.error:
click.echo(f"Error: {status.error}", err=True)
raise SystemExit(1)
# Persist updated config
save_infospace_config(cfg, cfg_path)
click.echo(f"Bound discipline '{name}' from {discipline_path}")
click.echo(f" Entities: {status.entity_count}")
if status.has_config:
viable_str = "YES" if status.is_viable else "NO"
click.echo(f" Viable: {viable_str}")
# ── disciplines ─────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def disciplines(config_path: Optional[str]):
"""List bound disciplines and their viability status."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
if not cfg.disciplines:
click.echo("No disciplines bound.")
return
from markitect.infospace.composition import check_discipline_status
click.echo(f"{'Name':<30} {'Entities':>8} {'Viable':>8} {'Path'}")
click.echo("-" * 70)
for binding in cfg.disciplines:
status = check_discipline_status(binding, root)
viable_str = "YES" if status.is_viable else ("NO" if status.has_config else "?")
click.echo(
f"{status.name:<30} {status.entity_count:>8} {viable_str:>8} {status.path}"
)
if status.error:
click.echo(f" Error: {status.error}")
# ── process ─────────────────────────────────────────────────────
@infospace_commands.command()
@click.argument("glob_pattern", default=None, required=False)
@click.option("--all", "process_all", is_flag=True, help="Process all source files.")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--provider", default=None, help="LLM provider (openrouter, openai, etc.).")
@click.option("--model", default=None, help="LLM model name.")
@click.option(
"--check-after-each",
is_flag=True,
help="Run collection checks (C1C5) after each source file.",
)
@click.option("--no-commit", is_flag=True, help="Skip git commits.")
def process(
glob_pattern: Optional[str],
process_all: bool,
config_path: Optional[str],
provider: Optional[str],
model: Optional[str],
check_after_each: bool,
no_commit: bool,
):
"""Process source files through the pipeline defined in infospace.yaml.
GLOB_PATTERN is matched against the sources directory declared in
infospace.yaml (default ``*.md``). Use ``--all`` to process every
source file.
\b
Examples:
# Process chapters 1-3 from book 1
markitect infospace process "book-1-chapter-0[1-3].md" --provider openrouter
# Process all source files and check metrics after each
markitect infospace process --all --provider openrouter --check-after-each
# Dry run — load existing outputs only, no LLM calls
markitect infospace process --all
"""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
if not cfg.pipeline or not cfg.pipeline.stages:
click.echo(
"Error: No pipeline stages defined in infospace.yaml.\n"
"Add a 'pipeline.stages' section with at least one stage.",
err=True,
)
raise SystemExit(1)
# Resolve sources directory
sources_dir = root / cfg.topic.sources if cfg.topic.sources else root
if not sources_dir.is_dir():
click.echo(
f"Error: Sources directory not found: {sources_dir}\n"
f"Set 'topic.sources' in infospace.yaml.",
err=True,
)
raise SystemExit(1)
# Collect source files
if process_all:
source_files = sorted(sources_dir.glob("*.md"))
else:
pattern = glob_pattern or "*.md"
source_files = sorted(sources_dir.glob(pattern))
if not source_files:
if process_all:
click.echo(f"No source files found in {sources_dir}")
else:
click.echo(
f"No files matched: {glob_pattern or '*.md'}\n"
f"Sources directory: {sources_dir}"
)
return
click.echo(f"Found {len(source_files)} source file(s) in {sources_dir.name}/")
# Create LLM adapter
adapter = None
if provider:
from markitect.llm import create_adapter
_PROVIDER_DEFAULTS = {"openrouter": "arcee-ai/trinity-large-preview:free"}
resolved_model = model or _PROVIDER_DEFAULTS.get(provider)
adapter = create_adapter(provider, model=resolved_model)
click.echo(f"LLM: {provider} ({resolved_model or 'default'})")
else:
click.echo("No LLM provider — will use existing outputs only (manual mode).")
# Run pipeline
from markitect.infospace.pipeline import SourcePipeline
pipeline = SourcePipeline(
cfg, root,
adapter=adapter,
provider=provider or "",
model=(model or _PROVIDER_DEFAULTS.get(provider or "", "")) if provider else "",
no_commit=no_commit,
)
total = len(source_files)
completed = 0
for i, source_file in enumerate(source_files, 1):
click.echo(f"\n[{i}/{total}] {source_file.name}")
success = pipeline.process_source(source_file)
if success:
completed += 1
if check_after_each:
pipeline.run_collection_check()
click.echo(f"\nDone: {completed}/{total} source file(s) fully processed.")
# ── stale-mappings ──────────────────────────────────────────────────
@infospace_commands.command(name="stale-mappings")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def stale_mappings(config_path: Optional[str]):
"""Check for stale mappings due to discipline changes."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
if not cfg.disciplines:
click.echo("No disciplines bound — no mappings to check.")
return
from markitect.infospace.composition import find_stale_mappings
# Try to load mapping references from output
mapping_refs = _load_mapping_references(cfg, root)
stale = find_stale_mappings(cfg, root, mapping_references=mapping_refs)
if not stale:
click.echo("No stale mappings detected.")
return
click.echo(f"Found {len(stale)} stale mapping(s):\n")
for s in stale:
click.echo(f" {s.entity_slug} -> {s.discipline_entity}")
click.echo(f" {s.reason}")
# ── graph ──────────────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option(
"--format", "output_format",
type=click.Choice(["mermaid", "dot"]),
default="mermaid",
show_default=True,
help="Output format.",
)
@click.option(
"--color-by",
type=click.Choice(["type", "vsm"]),
default="type",
show_default=True,
help="Color nodes by entity type or VSM system.",
)
@click.option("--type", "filter_type", default=None,
help="Show only entities with this entity type (e.g. Relation, Process).")
@click.option("--vsm", "filter_vsm", default=None,
help="Show only entities with this VSM system (e.g. S1, S3).")
@click.option("--entity", "filter_entity", default=None,
help="Show neighborhood of a specific entity slug.")
@click.option("--loops", "loops_only", is_flag=True, default=False,
help="Show only the feedback loop subgraph.")
@click.option("--output", "-o", default=None,
help="Write to file instead of stdout.")
@click.option("--classified-only/--all-entities", "classified_only",
default=True, show_default=True,
help="Only include classified entities (default: true).")
def graph(
config_path: Optional[str],
output_format: str,
color_by: str,
filter_type: Optional[str],
filter_vsm: Optional[str],
filter_entity: Optional[str],
loops_only: bool,
output: Optional[str],
classified_only: bool,
):
"""Render the entity-relation graph as Mermaid or DOT."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.classification_io import read_classifications_directory
from markitect.infospace.relation_parser import parse_relations_directory
from markitect.infospace.graph_export import (
apply_filters,
build_entity_graph,
to_dot,
to_mermaid,
)
# Load classifications
cls_dir = root / cfg.classifications_dir
classifications = []
if cls_dir.is_dir():
classifications = read_classifications_directory(cls_dir)
classified_slugs = {c.entity_slug for c in classifications}
# Load relations
relations_dir = root / cfg.relations_dir
relations = []
if relations_dir.is_dir():
relations = parse_relations_directory(relations_dir)
if not classifications and not relations:
click.echo("No classifications or relations found. Run 'classify' and add relation files.")
return
# Detect feedback loops via networkx
feedback_cycles = []
if relations:
try:
import networkx as nx
G = nx.DiGraph()
for r in relations:
G.add_edge(r.subject_slug, r.object_slug)
feedback_cycles = list(nx.simple_cycles(G))
except ImportError:
pass
# Build graph
g = build_entity_graph(classifications, relations, feedback_cycles)
# Apply filters
filtered = apply_filters(
g,
filter_type=filter_type,
filter_vsm=filter_vsm,
filter_entity=filter_entity,
loops_only=loops_only,
classified_only=classified_only,
classified_slugs=classified_slugs,
)
if not filtered.nodes:
click.echo("No nodes match the given filters.")
return
# Export
if output_format == "dot":
result = to_dot(filtered, color_by=color_by)
else:
result = to_mermaid(filtered, color_by=color_by)
if output:
out_path = Path(output)
out_path.write_text(result, encoding="utf-8")
click.echo(
f"Wrote {output_format} graph ({len(filtered.nodes)} nodes, "
f"{sum(len(v) for v in filtered.edges.values())} edges) to {out_path}"
)
else:
click.echo(result, nl=False)
def _load_mapping_references(
cfg: InfospaceConfig, root: Path
) -> Optional[dict]:
"""Try to load mapping references from YAML file in output dir."""
mapping_file = root / cfg.metrics_dir / "mapping-references.yaml"
if not mapping_file.is_file():
return None
import yaml
data = yaml.safe_load(mapping_file.read_text(encoding="utf-8"))
if isinstance(data, dict):
return data
return None