Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
New graph_export.py module supporting the `markitect infospace graph`
command added in the previous commit.
- build_entity_graph(): constructs node/edge graph from L2 classifications
and L3 relation triplets, with feedback loop detection via networkx
- apply_filters(): subgraph filters by entity type, VSM system, ego
neighbourhood, feedback-loops-only, and classified-only
- to_mermaid(): Mermaid flowchart export
- Uses "-- label -->" syntax for all edges (robust with parentheses);
"== label ==>" thick arrows for feedback loop edges
- markdown_fence=True wraps output in ```mermaid block (VS Code / GitHub)
- color_by="type" or "vsm" with distinct palettes for each
- to_dot(): Graphviz DOT export with fillcolor per type/VSM system
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
375 lines
13 KiB
Python
375 lines
13 KiB
Python
"""
|
|
Entity-relation graph export for infospace visualisation.
|
|
|
|
Builds a graph from L2 entity classifications and L3 relation triplets,
|
|
then exports it as Mermaid or DOT format for rendering.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
|
|
from .classification import EntityClassification
|
|
from .relation_models import RelationMeta
|
|
|
|
|
|
# ── Color palettes ────────────────────────────────────────────────────────────
|
|
|
|
# By entity type
|
|
_TYPE_COLORS: Dict[str, str] = {
|
|
"Element": "#4A90D9",
|
|
"Process": "#27AE60",
|
|
"Relation": "#F39C12",
|
|
"Principle": "#8E44AD",
|
|
"Institution": "#E74C3C",
|
|
"Unknown": "#95A5A6",
|
|
}
|
|
|
|
# By VSM system
|
|
_VSM_COLORS: Dict[str, str] = {
|
|
"S1": "#2ECC71",
|
|
"S2": "#3498DB",
|
|
"S3": "#E67E22",
|
|
"S3*": "#E74C3C",
|
|
"S4": "#9B59B6",
|
|
"S5": "#1ABC9C",
|
|
}
|
|
|
|
|
|
# ── Graph data structure ──────────────────────────────────────────────────────
|
|
|
|
class EntityGraph:
|
|
"""In-memory graph of entities (nodes) and relations (edges)."""
|
|
|
|
def __init__(self) -> None:
|
|
# slug → dict of metadata
|
|
self.nodes: Dict[str, dict] = {}
|
|
# (subject_slug, object_slug) → list of edge dicts (multi-edges allowed)
|
|
self.edges: Dict[Tuple[str, str], List[dict]] = {}
|
|
# slugs participating in a feedback loop
|
|
self.feedback_slugs: Set[str] = set()
|
|
|
|
def add_node(
|
|
self,
|
|
slug: str,
|
|
title: str,
|
|
entity_type: str,
|
|
vsm_system: str,
|
|
) -> None:
|
|
self.nodes[slug] = {
|
|
"title": title,
|
|
"entity_type": entity_type,
|
|
"vsm_system": vsm_system,
|
|
}
|
|
|
|
def add_edge(
|
|
self,
|
|
subject_slug: str,
|
|
object_slug: str,
|
|
predicate: str,
|
|
relation_type: str,
|
|
vsm_channel: str,
|
|
is_feedback: bool = False,
|
|
) -> None:
|
|
key = (subject_slug, object_slug)
|
|
if key not in self.edges:
|
|
self.edges[key] = []
|
|
self.edges[key].append({
|
|
"predicate": predicate,
|
|
"relation_type": relation_type,
|
|
"vsm_channel": vsm_channel,
|
|
"is_feedback": is_feedback,
|
|
})
|
|
|
|
def subgraph_for_slugs(self, slugs: Set[str]) -> "EntityGraph":
|
|
"""Return a new EntityGraph containing only nodes in *slugs* and
|
|
edges where BOTH endpoints are in *slugs*."""
|
|
g = EntityGraph()
|
|
g.feedback_slugs = self.feedback_slugs & slugs
|
|
for s in slugs:
|
|
if s in self.nodes:
|
|
n = self.nodes[s]
|
|
g.add_node(s, n["title"], n["entity_type"], n["vsm_system"])
|
|
for (subj, obj), edge_list in self.edges.items():
|
|
if subj in slugs and obj in slugs:
|
|
for e in edge_list:
|
|
g.add_edge(subj, obj, e["predicate"], e["relation_type"],
|
|
e["vsm_channel"], e["is_feedback"])
|
|
return g
|
|
|
|
|
|
# ── Builder ───────────────────────────────────────────────────────────────────
|
|
|
|
def build_entity_graph(
|
|
classifications: List[EntityClassification],
|
|
relations: List[RelationMeta],
|
|
feedback_cycles: Optional[List[List[str]]] = None,
|
|
) -> EntityGraph:
|
|
"""Build an EntityGraph from L2 classifications and L3 relations.
|
|
|
|
Args:
|
|
classifications: All loaded EntityClassification objects.
|
|
relations: All loaded RelationMeta objects.
|
|
feedback_cycles: Optional list of cycles (each a list of slugs)
|
|
returned by networkx.simple_cycles.
|
|
|
|
Returns:
|
|
An :class:`EntityGraph` ready for export.
|
|
"""
|
|
g = EntityGraph()
|
|
|
|
# Collect slugs in feedback loops
|
|
feedback_slugs: Set[str] = set()
|
|
if feedback_cycles:
|
|
for cycle in feedback_cycles:
|
|
feedback_slugs.update(cycle)
|
|
g.feedback_slugs = feedback_slugs
|
|
|
|
# Classify slugs for fast lookup
|
|
cls_map: Dict[str, EntityClassification] = {c.entity_slug: c for c in classifications}
|
|
|
|
# Add nodes from classifications
|
|
for c in classifications:
|
|
title = c.entity_slug.replace("_", " ").title()
|
|
g.add_node(c.entity_slug, title, c.entity_type, c.vsm_system)
|
|
|
|
# Add edge-referenced nodes that might not be classified
|
|
for r in relations:
|
|
for slug in (r.subject_slug, r.object_slug):
|
|
if slug not in g.nodes:
|
|
# unclassified node — add with Unknown type
|
|
title = slug.replace("_", " ").title()
|
|
g.add_node(slug, title, "Unknown", "")
|
|
|
|
# Collect feedback edge pairs
|
|
feedback_edge_pairs: Set[Tuple[str, str]] = set()
|
|
if feedback_cycles:
|
|
for cycle in feedback_cycles:
|
|
for i in range(len(cycle)):
|
|
feedback_edge_pairs.add((cycle[i], cycle[(i + 1) % len(cycle)]))
|
|
|
|
# Add edges from relations
|
|
for r in relations:
|
|
is_feedback = (r.subject_slug, r.object_slug) in feedback_edge_pairs
|
|
g.add_edge(
|
|
r.subject_slug,
|
|
r.object_slug,
|
|
r.predicate,
|
|
r.relation_type,
|
|
r.vsm_channel,
|
|
is_feedback=is_feedback,
|
|
)
|
|
|
|
return g
|
|
|
|
|
|
# ── Filters ───────────────────────────────────────────────────────────────────
|
|
|
|
def apply_filters(
|
|
g: EntityGraph,
|
|
filter_type: Optional[str] = None,
|
|
filter_vsm: Optional[str] = None,
|
|
filter_entity: Optional[str] = None,
|
|
loops_only: bool = False,
|
|
classified_only: bool = True,
|
|
classified_slugs: Optional[Set[str]] = None,
|
|
) -> EntityGraph:
|
|
"""Apply filters to an EntityGraph, returning a subgraph.
|
|
|
|
Filtering strategy:
|
|
- Start with all node slugs in the graph.
|
|
- Remove unclassified if classified_only (Unknown type = unclassified placeholder).
|
|
- If filter_type: keep only nodes with matching entity_type.
|
|
- If filter_vsm: keep only nodes with matching vsm_system.
|
|
- If filter_entity: keep only that entity plus its direct neighbors (via edges).
|
|
- If loops_only: keep only nodes participating in feedback loops.
|
|
- Finally include both endpoints of every edge that survives the filter.
|
|
"""
|
|
# Start with classified-only filter
|
|
candidate_slugs: Set[str] = set(g.nodes.keys())
|
|
|
|
if classified_only and classified_slugs is not None:
|
|
candidate_slugs &= classified_slugs
|
|
|
|
if filter_type:
|
|
candidate_slugs = {
|
|
s for s in candidate_slugs
|
|
if g.nodes[s]["entity_type"].lower() == filter_type.lower()
|
|
}
|
|
|
|
if filter_vsm:
|
|
candidate_slugs = {
|
|
s for s in candidate_slugs
|
|
if g.nodes[s]["vsm_system"].lower() == filter_vsm.lower()
|
|
}
|
|
|
|
if loops_only:
|
|
candidate_slugs &= g.feedback_slugs
|
|
|
|
if filter_entity:
|
|
if filter_entity not in g.nodes:
|
|
return EntityGraph()
|
|
# Ego subgraph: entity + direct neighbors
|
|
neighbors: Set[str] = {filter_entity}
|
|
for (subj, obj) in g.edges:
|
|
if subj == filter_entity:
|
|
neighbors.add(obj)
|
|
elif obj == filter_entity:
|
|
neighbors.add(subj)
|
|
candidate_slugs &= neighbors
|
|
|
|
# Expand candidate_slugs to include all edge endpoints that survive
|
|
# (so we don't create dangling edges)
|
|
final_slugs: Set[str] = set()
|
|
for (subj, obj) in g.edges:
|
|
if subj in candidate_slugs and obj in candidate_slugs:
|
|
final_slugs.add(subj)
|
|
final_slugs.add(obj)
|
|
|
|
# If filter_entity used, keep at minimum the entity itself even if no edges
|
|
if filter_entity and filter_entity in g.nodes:
|
|
final_slugs.add(filter_entity)
|
|
|
|
# If we removed all edges but still have type/vsm filter nodes, show those
|
|
if not final_slugs and candidate_slugs:
|
|
final_slugs = candidate_slugs
|
|
|
|
return g.subgraph_for_slugs(final_slugs)
|
|
|
|
|
|
# ── Mermaid export ────────────────────────────────────────────────────────────
|
|
|
|
def _mermaid_safe_id(slug: str) -> str:
|
|
"""Return a Mermaid-safe node ID (replace special chars)."""
|
|
return slug.replace("-", "_").replace("*", "s")
|
|
|
|
|
|
def to_mermaid(
|
|
g: EntityGraph,
|
|
color_by: str = "type",
|
|
markdown_fence: bool = True,
|
|
) -> str:
|
|
"""Export an EntityGraph to Mermaid flowchart format.
|
|
|
|
Args:
|
|
g: The graph to render.
|
|
color_by: ``"type"`` or ``"vsm"`` — determines node colour scheme.
|
|
markdown_fence: Wrap output in a ```mermaid fenced code block
|
|
(required for VS Code and GitHub markdown preview).
|
|
|
|
Returns:
|
|
A Mermaid diagram string, optionally wrapped in a markdown fence.
|
|
"""
|
|
lines: List[str] = ["graph LR"]
|
|
|
|
# Nodes
|
|
for slug, attrs in sorted(g.nodes.items()):
|
|
mid = _mermaid_safe_id(slug)
|
|
title = attrs["title"]
|
|
etype = attrs["entity_type"]
|
|
vsm = attrs["vsm_system"]
|
|
|
|
label_parts = [title]
|
|
if etype and etype != "Unknown":
|
|
label_parts.append(f"{etype}")
|
|
if vsm:
|
|
label_parts.append(vsm)
|
|
label = "\\n".join(label_parts)
|
|
|
|
css_class = etype.lower().replace("*", "s") if etype else "unknown"
|
|
lines.append(f' {mid}["{label}"]:::{css_class}')
|
|
|
|
lines.append("")
|
|
|
|
# Edges — always use "-- label -->" syntax (robust with parentheses/special chars).
|
|
# Feedback loop edges use "== label ==>" (thick arrow) to distinguish them visually.
|
|
for (subj, obj), edge_list in sorted(g.edges.items()):
|
|
smid = _mermaid_safe_id(subj)
|
|
omid = _mermaid_safe_id(obj)
|
|
for e in edge_list:
|
|
pred = e["predicate"]
|
|
if e["is_feedback"]:
|
|
lines.append(f" {smid} == {pred} ==> {omid}")
|
|
else:
|
|
lines.append(f" {smid} -- {pred} --> {omid}")
|
|
|
|
lines.append("")
|
|
|
|
# Class definitions
|
|
if color_by == "vsm":
|
|
for vsm, color in _VSM_COLORS.items():
|
|
css_id = vsm.lower().replace("*", "s")
|
|
lines.append(f" classDef {css_id} fill:{color},color:#fff")
|
|
lines.append(" classDef unknown fill:#95A5A6,color:#fff")
|
|
else:
|
|
for etype, color in _TYPE_COLORS.items():
|
|
css_id = etype.lower()
|
|
lines.append(f" classDef {css_id} fill:{color},color:#fff")
|
|
|
|
body = "\n".join(lines) + "\n"
|
|
if markdown_fence:
|
|
return f"```mermaid\n{body}```\n"
|
|
return body
|
|
|
|
|
|
# ── DOT export ────────────────────────────────────────────────────────────────
|
|
|
|
def _dot_safe_id(slug: str) -> str:
|
|
"""Return a DOT-safe node ID."""
|
|
return '"' + slug.replace('"', '\\"') + '"'
|
|
|
|
|
|
def to_dot(
|
|
g: EntityGraph,
|
|
color_by: str = "type",
|
|
) -> str:
|
|
"""Export an EntityGraph to Graphviz DOT format.
|
|
|
|
Args:
|
|
g: The graph to render.
|
|
color_by: ``"type"`` or ``"vsm"`` — determines node fill colour.
|
|
|
|
Returns:
|
|
A DOT digraph string.
|
|
"""
|
|
lines: List[str] = [
|
|
"digraph entity_graph {",
|
|
" rankdir=LR;",
|
|
" node [style=filled, fontname=Helvetica, fontsize=10];",
|
|
" edge [fontname=Helvetica, fontsize=9];",
|
|
"",
|
|
]
|
|
|
|
for slug, attrs in sorted(g.nodes.items()):
|
|
nid = _dot_safe_id(slug)
|
|
title = attrs["title"]
|
|
etype = attrs["entity_type"]
|
|
vsm = attrs["vsm_system"]
|
|
|
|
if color_by == "vsm":
|
|
color = _VSM_COLORS.get(vsm, "#95A5A6")
|
|
else:
|
|
color = _TYPE_COLORS.get(etype, "#95A5A6")
|
|
|
|
label_parts = [title]
|
|
if etype and etype != "Unknown":
|
|
label_parts.append(f"({etype})")
|
|
if vsm:
|
|
label_parts.append(vsm)
|
|
label = "\\n".join(label_parts)
|
|
|
|
lines.append(f' {nid} [label="{label}", fillcolor="{color}", fontcolor="white"];')
|
|
|
|
lines.append("")
|
|
|
|
for (subj, obj), edge_list in sorted(g.edges.items()):
|
|
sid = _dot_safe_id(subj)
|
|
oid = _dot_safe_id(obj)
|
|
for e in edge_list:
|
|
pred = e["predicate"].replace('"', '\\"')
|
|
style = "dashed" if e["is_feedback"] else "solid"
|
|
lines.append(f' {sid} -> {oid} [label="{pred}", style={style}];')
|
|
|
|
lines.append("}")
|
|
return "\n".join(lines) + "\n"
|