""" Entity-relation graph export for infospace visualisation. Builds a graph from L2 entity classifications and L3 relation triplets, then exports it as Mermaid or DOT format for rendering. """ from __future__ import annotations from typing import Dict, List, Optional, Set, Tuple from .classification import EntityClassification from .relation_models import RelationMeta # ── Color palettes ──────────────────────────────────────────────────────────── # By entity type _TYPE_COLORS: Dict[str, str] = { "Element": "#4A90D9", "Process": "#27AE60", "Relation": "#F39C12", "Principle": "#8E44AD", "Institution": "#E74C3C", "Unknown": "#95A5A6", } # By VSM system _VSM_COLORS: Dict[str, str] = { "S1": "#2ECC71", "S2": "#3498DB", "S3": "#E67E22", "S3*": "#E74C3C", "S4": "#9B59B6", "S5": "#1ABC9C", } # ── Graph data structure ────────────────────────────────────────────────────── class EntityGraph: """In-memory graph of entities (nodes) and relations (edges).""" def __init__(self) -> None: # slug → dict of metadata self.nodes: Dict[str, dict] = {} # (subject_slug, object_slug) → list of edge dicts (multi-edges allowed) self.edges: Dict[Tuple[str, str], List[dict]] = {} # slugs participating in a feedback loop self.feedback_slugs: Set[str] = set() def add_node( self, slug: str, title: str, entity_type: str, vsm_system: str, ) -> None: self.nodes[slug] = { "title": title, "entity_type": entity_type, "vsm_system": vsm_system, } def add_edge( self, subject_slug: str, object_slug: str, predicate: str, relation_type: str, vsm_channel: str, is_feedback: bool = False, ) -> None: key = (subject_slug, object_slug) if key not in self.edges: self.edges[key] = [] self.edges[key].append({ "predicate": predicate, "relation_type": relation_type, "vsm_channel": vsm_channel, "is_feedback": is_feedback, }) def subgraph_for_slugs(self, slugs: Set[str]) -> "EntityGraph": """Return a new EntityGraph containing only nodes in *slugs* and edges where BOTH endpoints are in *slugs*.""" g = EntityGraph() g.feedback_slugs = self.feedback_slugs & slugs for s in slugs: if s in self.nodes: n = self.nodes[s] g.add_node(s, n["title"], n["entity_type"], n["vsm_system"]) for (subj, obj), edge_list in self.edges.items(): if subj in slugs and obj in slugs: for e in edge_list: g.add_edge(subj, obj, e["predicate"], e["relation_type"], e["vsm_channel"], e["is_feedback"]) return g # ── Builder ─────────────────────────────────────────────────────────────────── def build_entity_graph( classifications: List[EntityClassification], relations: List[RelationMeta], feedback_cycles: Optional[List[List[str]]] = None, ) -> EntityGraph: """Build an EntityGraph from L2 classifications and L3 relations. Args: classifications: All loaded EntityClassification objects. relations: All loaded RelationMeta objects. feedback_cycles: Optional list of cycles (each a list of slugs) returned by networkx.simple_cycles. Returns: An :class:`EntityGraph` ready for export. """ g = EntityGraph() # Collect slugs in feedback loops feedback_slugs: Set[str] = set() if feedback_cycles: for cycle in feedback_cycles: feedback_slugs.update(cycle) g.feedback_slugs = feedback_slugs # Classify slugs for fast lookup cls_map: Dict[str, EntityClassification] = {c.entity_slug: c for c in classifications} # Add nodes from classifications for c in classifications: title = c.entity_slug.replace("_", " ").title() g.add_node(c.entity_slug, title, c.entity_type, c.vsm_system) # Add edge-referenced nodes that might not be classified for r in relations: for slug in (r.subject_slug, r.object_slug): if slug not in g.nodes: # unclassified node — add with Unknown type title = slug.replace("_", " ").title() g.add_node(slug, title, "Unknown", "") # Collect feedback edge pairs feedback_edge_pairs: Set[Tuple[str, str]] = set() if feedback_cycles: for cycle in feedback_cycles: for i in range(len(cycle)): feedback_edge_pairs.add((cycle[i], cycle[(i + 1) % len(cycle)])) # Add edges from relations for r in relations: is_feedback = (r.subject_slug, r.object_slug) in feedback_edge_pairs g.add_edge( r.subject_slug, r.object_slug, r.predicate, r.relation_type, r.vsm_channel, is_feedback=is_feedback, ) return g # ── Filters ─────────────────────────────────────────────────────────────────── def apply_filters( g: EntityGraph, filter_type: Optional[str] = None, filter_vsm: Optional[str] = None, filter_entity: Optional[str] = None, loops_only: bool = False, classified_only: bool = True, classified_slugs: Optional[Set[str]] = None, ) -> EntityGraph: """Apply filters to an EntityGraph, returning a subgraph. Filtering strategy: - Start with all node slugs in the graph. - Remove unclassified if classified_only (Unknown type = unclassified placeholder). - If filter_type: keep only nodes with matching entity_type. - If filter_vsm: keep only nodes with matching vsm_system. - If filter_entity: keep only that entity plus its direct neighbors (via edges). - If loops_only: keep only nodes participating in feedback loops. - Finally include both endpoints of every edge that survives the filter. """ # Start with classified-only filter candidate_slugs: Set[str] = set(g.nodes.keys()) if classified_only and classified_slugs is not None: candidate_slugs &= classified_slugs if filter_type: candidate_slugs = { s for s in candidate_slugs if g.nodes[s]["entity_type"].lower() == filter_type.lower() } if filter_vsm: candidate_slugs = { s for s in candidate_slugs if g.nodes[s]["vsm_system"].lower() == filter_vsm.lower() } if loops_only: candidate_slugs &= g.feedback_slugs if filter_entity: if filter_entity not in g.nodes: return EntityGraph() # Ego subgraph: entity + direct neighbors neighbors: Set[str] = {filter_entity} for (subj, obj) in g.edges: if subj == filter_entity: neighbors.add(obj) elif obj == filter_entity: neighbors.add(subj) candidate_slugs &= neighbors # Expand candidate_slugs to include all edge endpoints that survive # (so we don't create dangling edges) final_slugs: Set[str] = set() for (subj, obj) in g.edges: if subj in candidate_slugs and obj in candidate_slugs: final_slugs.add(subj) final_slugs.add(obj) # If filter_entity used, keep at minimum the entity itself even if no edges if filter_entity and filter_entity in g.nodes: final_slugs.add(filter_entity) # If we removed all edges but still have type/vsm filter nodes, show those if not final_slugs and candidate_slugs: final_slugs = candidate_slugs return g.subgraph_for_slugs(final_slugs) # ── Mermaid export ──────────────────────────────────────────────────────────── def _mermaid_safe_id(slug: str) -> str: """Return a Mermaid-safe node ID (replace special chars).""" return slug.replace("-", "_").replace("*", "s") def to_mermaid( g: EntityGraph, color_by: str = "type", markdown_fence: bool = True, ) -> str: """Export an EntityGraph to Mermaid flowchart format. Args: g: The graph to render. color_by: ``"type"`` or ``"vsm"`` — determines node colour scheme. markdown_fence: Wrap output in a ```mermaid fenced code block (required for VS Code and GitHub markdown preview). Returns: A Mermaid diagram string, optionally wrapped in a markdown fence. """ lines: List[str] = ["graph LR"] # Nodes for slug, attrs in sorted(g.nodes.items()): mid = _mermaid_safe_id(slug) title = attrs["title"] etype = attrs["entity_type"] vsm = attrs["vsm_system"] label_parts = [title] if etype and etype != "Unknown": label_parts.append(f"{etype}") if vsm: label_parts.append(vsm) label = "\\n".join(label_parts) css_class = etype.lower().replace("*", "s") if etype else "unknown" lines.append(f' {mid}["{label}"]:::{css_class}') lines.append("") # Edges — always use "-- label -->" syntax (robust with parentheses/special chars). # Feedback loop edges use "== label ==>" (thick arrow) to distinguish them visually. for (subj, obj), edge_list in sorted(g.edges.items()): smid = _mermaid_safe_id(subj) omid = _mermaid_safe_id(obj) for e in edge_list: pred = e["predicate"] if e["is_feedback"]: lines.append(f" {smid} == {pred} ==> {omid}") else: lines.append(f" {smid} -- {pred} --> {omid}") lines.append("") # Class definitions if color_by == "vsm": for vsm, color in _VSM_COLORS.items(): css_id = vsm.lower().replace("*", "s") lines.append(f" classDef {css_id} fill:{color},color:#fff") lines.append(" classDef unknown fill:#95A5A6,color:#fff") else: for etype, color in _TYPE_COLORS.items(): css_id = etype.lower() lines.append(f" classDef {css_id} fill:{color},color:#fff") body = "\n".join(lines) + "\n" if markdown_fence: return f"```mermaid\n{body}```\n" return body # ── DOT export ──────────────────────────────────────────────────────────────── def _dot_safe_id(slug: str) -> str: """Return a DOT-safe node ID.""" return '"' + slug.replace('"', '\\"') + '"' def to_dot( g: EntityGraph, color_by: str = "type", ) -> str: """Export an EntityGraph to Graphviz DOT format. Args: g: The graph to render. color_by: ``"type"`` or ``"vsm"`` — determines node fill colour. Returns: A DOT digraph string. """ lines: List[str] = [ "digraph entity_graph {", " rankdir=LR;", " node [style=filled, fontname=Helvetica, fontsize=10];", " edge [fontname=Helvetica, fontsize=9];", "", ] for slug, attrs in sorted(g.nodes.items()): nid = _dot_safe_id(slug) title = attrs["title"] etype = attrs["entity_type"] vsm = attrs["vsm_system"] if color_by == "vsm": color = _VSM_COLORS.get(vsm, "#95A5A6") else: color = _TYPE_COLORS.get(etype, "#95A5A6") label_parts = [title] if etype and etype != "Unknown": label_parts.append(f"({etype})") if vsm: label_parts.append(vsm) label = "\\n".join(label_parts) lines.append(f' {nid} [label="{label}", fillcolor="{color}", fontcolor="white"];') lines.append("") for (subj, obj), edge_list in sorted(g.edges.items()): sid = _dot_safe_id(subj) oid = _dot_safe_id(obj) for e in edge_list: pred = e["predicate"].replace('"', '\\"') style = "dashed" if e["is_feedback"] else "solid" lines.append(f' {sid} -> {oid} [label="{pred}", style={style}];') lines.append("}") return "\n".join(lines) + "\n"