From 0a494b20113316c02ffc955853190b865ef7b1e8 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sun, 8 Feb 2026 07:41:47 +0100 Subject: [PATCH] feat(spaces): implement Phase 2 Event System Week 4 - Event Infrastructure: - Create SpaceEventType enum with 18 event types covering space lifecycle, document operations, variables, references, rendering, sync, and cache - Create SpaceEvent dataclass with serialization/deserialization - Create EventBus with sync/async handler support, priority ordering, global handlers, and optional event history - Add event factory functions for common events Week 5 - Event Integration: - Wire EventBus into SpaceService as optional dependency - Emit events for all space operations: - SPACE_CREATED, SPACE_UPDATED, SPACE_DELETED, SPACE_ACTIVATED, SPACE_ARCHIVED - DOCUMENT_ADDED, DOCUMENT_REMOVED, DOCUMENT_MOVED, DOCUMENT_CONTENT_CHANGED - VARIABLE_SET, VARIABLE_DELETED - Create integration tests for event propagation patterns Test coverage: 187 tests total - 43 unit tests for event system - 20 integration tests for event propagation - 124 existing tests continue to pass Capabilities delivered: - CAP-010: SpaceEvent base with type, payload, timestamp - CAP-011: EventBus with in-process publish/subscribe - CAP-012: Event handlers registry with priority support - CAP-013: Change detection via content hash comparison Co-Authored-By: Claude Opus 4.5 --- markitect/spaces/__init__.py | 15 + markitect/spaces/events/__init__.py | 53 +- markitect/spaces/events/bus.py | 402 ++++++++++++ markitect/spaces/events/models.py | 239 +++++++ markitect/spaces/services/space_service.py | 129 +++- .../spaces/test_event_propagation.py | 407 ++++++++++++ tests/unit/spaces/test_events.py | 581 ++++++++++++++++++ 7 files changed, 1807 insertions(+), 19 deletions(-) create mode 100644 markitect/spaces/events/bus.py create mode 100644 markitect/spaces/events/models.py create mode 100644 tests/integration/spaces/test_event_propagation.py create mode 100644 tests/unit/spaces/test_events.py diff --git a/markitect/spaces/__init__.py b/markitect/spaces/__init__.py index 5f82ecff..a66f6eaf 100644 --- a/markitect/spaces/__init__.py +++ b/markitect/spaces/__init__.py @@ -51,6 +51,15 @@ from .repositories import ( initialize_space_tables, ) +# Phase 2: Event System +from .events import ( + SpaceEvent, + SpaceEventType, + EventBus, + get_event_bus, + reset_event_bus, +) + __all__ = [ # Models "InformationSpace", @@ -73,4 +82,10 @@ __all__ = [ "SqliteVariableRepository", "SqliteReferenceRepository", "initialize_space_tables", + # Event System + "SpaceEvent", + "SpaceEventType", + "EventBus", + "get_event_bus", + "reset_event_bus", ] diff --git a/markitect/spaces/events/__init__.py b/markitect/spaces/events/__init__.py index 9d092aed..7b0cb547 100644 --- a/markitect/spaces/events/__init__.py +++ b/markitect/spaces/events/__init__.py @@ -3,14 +3,55 @@ Event system for Information Spaces. This package provides event-driven architecture for space operations: - SpaceEvent: Event dataclass with type, payload, timestamp +- SpaceEventType: Enum of all event types - EventBus: In-process publish/subscribe for space events -- Event handlers and registration +- Event factory functions for common events Events emitted: -- SPACE_CREATED, SPACE_UPDATED, SPACE_DELETED -- DOCUMENT_ADDED, DOCUMENT_UPDATED, DOCUMENT_REMOVED -- RENDER_COMPLETED, SYNC_COMPLETED +- Space lifecycle: SPACE_CREATED, SPACE_UPDATED, SPACE_DELETED, SPACE_ACTIVATED, SPACE_ARCHIVED +- Document: DOCUMENT_ADDED, DOCUMENT_UPDATED, DOCUMENT_REMOVED, DOCUMENT_MOVED, DOCUMENT_CONTENT_CHANGED +- Variable: VARIABLE_SET, VARIABLE_DELETED +- Reference: REFERENCE_ADDED, REFERENCE_CLEARED +- Rendering: RENDER_STARTED, RENDER_COMPLETED, RENDER_FAILED +- Sync: SYNC_STARTED, SYNC_COMPLETED, SYNC_CONFLICT +- Cache: CACHE_INVALIDATED """ -# Events will be implemented in Phase 2 -__all__ = [] +from .models import ( + SpaceEvent, + SpaceEventType, + space_created_event, + space_updated_event, + space_deleted_event, + document_added_event, + document_updated_event, + document_removed_event, + document_content_changed_event, + cache_invalidated_event, +) +from .bus import ( + EventBus, + HandlerRegistration, + get_event_bus, + reset_event_bus, +) + +__all__ = [ + # Event models + "SpaceEvent", + "SpaceEventType", + # Event factories + "space_created_event", + "space_updated_event", + "space_deleted_event", + "document_added_event", + "document_updated_event", + "document_removed_event", + "document_content_changed_event", + "cache_invalidated_event", + # Event bus + "EventBus", + "HandlerRegistration", + "get_event_bus", + "reset_event_bus", +] diff --git a/markitect/spaces/events/bus.py b/markitect/spaces/events/bus.py new file mode 100644 index 00000000..816f7db5 --- /dev/null +++ b/markitect/spaces/events/bus.py @@ -0,0 +1,402 @@ +""" +EventBus for Information Spaces. + +This module provides an in-process publish/subscribe system +for space events. It supports both synchronous and asynchronous +event handlers. +""" + +import asyncio +import logging +from collections import defaultdict +from typing import Callable, Dict, List, Optional, Set, Union, Any +from dataclasses import dataclass, field +import weakref + +from .models import SpaceEvent, SpaceEventType + + +# Type aliases for handlers +SyncHandler = Callable[[SpaceEvent], None] +AsyncHandler = Callable[[SpaceEvent], Any] # Coroutine +Handler = Union[SyncHandler, AsyncHandler] + + +logger = logging.getLogger(__name__) + + +@dataclass +class HandlerRegistration: + """ + Registration info for an event handler. + + Attributes: + handler: The handler function + is_async: Whether the handler is async + priority: Handler priority (lower = earlier execution) + handler_id: Unique ID for this registration + """ + + handler: Handler + is_async: bool = False + priority: int = 100 + handler_id: str = field(default_factory=lambda: str(id(None))) + + def __post_init__(self): + self.handler_id = str(id(self.handler)) + + +class EventBus: + """ + In-process event bus for space events. + + Provides publish/subscribe functionality for space operations. + Handlers can be registered for specific event types or for all events. + + Usage: + bus = EventBus() + + # Register a handler + def on_space_created(event: SpaceEvent): + print(f"Space created: {event.payload['name']}") + + bus.subscribe(SpaceEventType.SPACE_CREATED, on_space_created) + + # Emit an event + bus.emit(SpaceEvent( + event_type=SpaceEventType.SPACE_CREATED, + space_id="space-123", + payload={"name": "my-docs"} + )) + + # Unsubscribe + bus.unsubscribe(SpaceEventType.SPACE_CREATED, on_space_created) + """ + + def __init__(self): + """Initialize the event bus.""" + # Handlers indexed by event type + self._handlers: Dict[SpaceEventType, List[HandlerRegistration]] = defaultdict( + list + ) + # Handlers that receive all events + self._global_handlers: List[HandlerRegistration] = [] + # Track all handler IDs for quick lookup + self._handler_ids: Set[str] = set() + # Event history (optional, for debugging) + self._history: List[SpaceEvent] = [] + self._history_enabled: bool = False + self._max_history: int = 1000 + + def subscribe( + self, + event_type: Optional[SpaceEventType], + handler: Handler, + priority: int = 100, + ) -> str: + """ + Subscribe a handler to events. + + Args: + event_type: The event type to subscribe to, or None for all events + handler: The handler function (sync or async) + priority: Handler priority (lower = earlier execution) + + Returns: + Handler ID that can be used to unsubscribe + """ + is_async = asyncio.iscoroutinefunction(handler) + registration = HandlerRegistration( + handler=handler, + is_async=is_async, + priority=priority, + ) + + if event_type is None: + self._global_handlers.append(registration) + self._global_handlers.sort(key=lambda r: r.priority) + else: + self._handlers[event_type].append(registration) + self._handlers[event_type].sort(key=lambda r: r.priority) + + self._handler_ids.add(registration.handler_id) + return registration.handler_id + + def subscribe_all(self, handler: Handler, priority: int = 100) -> str: + """ + Subscribe a handler to all events. + + Args: + handler: The handler function + priority: Handler priority + + Returns: + Handler ID + """ + return self.subscribe(None, handler, priority) + + def unsubscribe( + self, + event_type: Optional[SpaceEventType], + handler: Handler, + ) -> bool: + """ + Unsubscribe a handler from events. + + Args: + event_type: The event type, or None for global handlers + handler: The handler to remove + + Returns: + True if handler was found and removed + """ + handler_id = str(id(handler)) + + if event_type is None: + for i, reg in enumerate(self._global_handlers): + if reg.handler_id == handler_id: + self._global_handlers.pop(i) + self._handler_ids.discard(handler_id) + return True + else: + handlers = self._handlers.get(event_type, []) + for i, reg in enumerate(handlers): + if reg.handler_id == handler_id: + handlers.pop(i) + self._handler_ids.discard(handler_id) + return True + + return False + + def unsubscribe_by_id(self, handler_id: str) -> bool: + """ + Unsubscribe a handler by its ID. + + Args: + handler_id: The handler ID returned by subscribe + + Returns: + True if handler was found and removed + """ + # Check global handlers + for i, reg in enumerate(self._global_handlers): + if reg.handler_id == handler_id: + self._global_handlers.pop(i) + self._handler_ids.discard(handler_id) + return True + + # Check type-specific handlers + for handlers in self._handlers.values(): + for i, reg in enumerate(handlers): + if reg.handler_id == handler_id: + handlers.pop(i) + self._handler_ids.discard(handler_id) + return True + + return False + + def emit(self, event: SpaceEvent) -> List[Exception]: + """ + Emit an event synchronously. + + Calls all registered handlers for the event type. + Async handlers are run in a new event loop if needed. + + Args: + event: The event to emit + + Returns: + List of exceptions raised by handlers (empty if all succeeded) + """ + if self._history_enabled: + self._record_history(event) + + exceptions: List[Exception] = [] + + # Collect all handlers to call + handlers = list(self._global_handlers) + handlers.extend(self._handlers.get(event.event_type, [])) + handlers.sort(key=lambda r: r.priority) + + for registration in handlers: + try: + if registration.is_async: + # Run async handler + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop and loop.is_running(): + # Create a task but don't await it here + asyncio.create_task(registration.handler(event)) + else: + asyncio.run(registration.handler(event)) + else: + registration.handler(event) + except Exception as e: + logger.exception( + f"Handler {registration.handler_id} raised exception for event {event.event_type}" + ) + exceptions.append(e) + + return exceptions + + async def emit_async(self, event: SpaceEvent) -> List[Exception]: + """ + Emit an event asynchronously. + + Args: + event: The event to emit + + Returns: + List of exceptions raised by handlers + """ + if self._history_enabled: + self._record_history(event) + + exceptions: List[Exception] = [] + + # Collect all handlers to call + handlers = list(self._global_handlers) + handlers.extend(self._handlers.get(event.event_type, [])) + handlers.sort(key=lambda r: r.priority) + + for registration in handlers: + try: + if registration.is_async: + await registration.handler(event) + else: + registration.handler(event) + except Exception as e: + logger.exception( + f"Handler {registration.handler_id} raised exception for event {event.event_type}" + ) + exceptions.append(e) + + return exceptions + + def has_handlers(self, event_type: SpaceEventType) -> bool: + """ + Check if there are any handlers for an event type. + + Args: + event_type: The event type to check + + Returns: + True if there are handlers (including global handlers) + """ + return bool(self._global_handlers) or bool( + self._handlers.get(event_type, []) + ) + + def handler_count(self, event_type: Optional[SpaceEventType] = None) -> int: + """ + Get the number of handlers registered. + + Args: + event_type: Specific event type, or None for total count + + Returns: + Number of handlers + """ + if event_type is None: + count = len(self._global_handlers) + for handlers in self._handlers.values(): + count += len(handlers) + return count + else: + return len(self._handlers.get(event_type, [])) + len( + self._global_handlers + ) + + def clear(self, event_type: Optional[SpaceEventType] = None) -> int: + """ + Clear handlers. + + Args: + event_type: Specific event type, or None to clear all + + Returns: + Number of handlers removed + """ + if event_type is None: + count = self.handler_count() + self._handlers.clear() + self._global_handlers.clear() + self._handler_ids.clear() + return count + else: + handlers = self._handlers.pop(event_type, []) + for reg in handlers: + self._handler_ids.discard(reg.handler_id) + return len(handlers) + + # History management + + def enable_history(self, max_events: int = 1000) -> None: + """Enable event history recording.""" + self._history_enabled = True + self._max_history = max_events + + def disable_history(self) -> None: + """Disable event history recording.""" + self._history_enabled = False + + def clear_history(self) -> None: + """Clear the event history.""" + self._history.clear() + + def get_history( + self, + event_type: Optional[SpaceEventType] = None, + space_id: Optional[str] = None, + limit: Optional[int] = None, + ) -> List[SpaceEvent]: + """ + Get event history. + + Args: + event_type: Filter by event type + space_id: Filter by space ID + limit: Maximum number of events to return + + Returns: + List of events (newest first) + """ + events = list(reversed(self._history)) + + if event_type is not None: + events = [e for e in events if e.event_type == event_type] + + if space_id is not None: + events = [e for e in events if e.space_id == space_id] + + if limit is not None: + events = events[:limit] + + return events + + def _record_history(self, event: SpaceEvent) -> None: + """Record an event in history.""" + self._history.append(event) + if len(self._history) > self._max_history: + self._history = self._history[-self._max_history :] + + +# Global event bus instance (optional singleton pattern) +_default_bus: Optional[EventBus] = None + + +def get_event_bus() -> EventBus: + """Get the default global event bus instance.""" + global _default_bus + if _default_bus is None: + _default_bus = EventBus() + return _default_bus + + +def reset_event_bus() -> None: + """Reset the default global event bus (useful for testing).""" + global _default_bus + _default_bus = None diff --git a/markitect/spaces/events/models.py b/markitect/spaces/events/models.py new file mode 100644 index 00000000..e00cb957 --- /dev/null +++ b/markitect/spaces/events/models.py @@ -0,0 +1,239 @@ +""" +Event models for Information Spaces. + +This module defines the event types and data structures used +by the event system for space operations. +""" + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Dict, Optional +import uuid + + +class SpaceEventType(Enum): + """ + Types of events that can occur in the space system. + + Events are organized by category: + - Space lifecycle: creation, updates, deletion, status changes + - Document operations: add, update, remove, move + - Variable operations: set, delete + - Rendering: start, complete, fail + - Sync: start, complete, conflict + """ + + # Space lifecycle events + SPACE_CREATED = "space.created" + SPACE_UPDATED = "space.updated" + SPACE_DELETED = "space.deleted" + SPACE_ACTIVATED = "space.activated" + SPACE_ARCHIVED = "space.archived" + + # Document events + DOCUMENT_ADDED = "document.added" + DOCUMENT_UPDATED = "document.updated" + DOCUMENT_REMOVED = "document.removed" + DOCUMENT_MOVED = "document.moved" + DOCUMENT_CONTENT_CHANGED = "document.content_changed" + + # Variable events + VARIABLE_SET = "variable.set" + VARIABLE_DELETED = "variable.deleted" + + # Reference events + REFERENCE_ADDED = "reference.added" + REFERENCE_CLEARED = "reference.cleared" + + # Rendering events + RENDER_STARTED = "render.started" + RENDER_COMPLETED = "render.completed" + RENDER_FAILED = "render.failed" + + # Sync events + SYNC_STARTED = "sync.started" + SYNC_COMPLETED = "sync.completed" + SYNC_CONFLICT = "sync.conflict" + + # Cache events + CACHE_INVALIDATED = "cache.invalidated" + + +@dataclass +class SpaceEvent: + """ + Represents an event in the space system. + + Events are immutable records of operations that have occurred. + They carry enough information for handlers to react appropriately. + + Attributes: + event_type: The type of event + space_id: The ID of the affected space + payload: Event-specific data + event_id: Unique identifier for this event + timestamp: When the event occurred + source: Optional identifier of the event source + correlation_id: Optional ID to correlate related events + """ + + event_type: SpaceEventType + space_id: str + payload: Dict[str, Any] = field(default_factory=dict) + event_id: str = field(default_factory=lambda: str(uuid.uuid4())) + timestamp: datetime = field(default_factory=datetime.now) + source: Optional[str] = None + correlation_id: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert the event to a dictionary for serialization.""" + return { + "event_id": self.event_id, + "event_type": self.event_type.value, + "space_id": self.space_id, + "payload": self.payload, + "timestamp": self.timestamp.isoformat(), + "source": self.source, + "correlation_id": self.correlation_id, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "SpaceEvent": + """Create an event from a dictionary.""" + return cls( + event_id=data.get("event_id", str(uuid.uuid4())), + event_type=SpaceEventType(data["event_type"]), + space_id=data["space_id"], + payload=data.get("payload", {}), + timestamp=datetime.fromisoformat(data["timestamp"]) + if data.get("timestamp") + else datetime.now(), + source=data.get("source"), + correlation_id=data.get("correlation_id"), + ) + + +# Convenience factory functions for common events + + +def space_created_event( + space_id: str, + name: str, + source: Optional[str] = None, +) -> SpaceEvent: + """Create a SPACE_CREATED event.""" + return SpaceEvent( + event_type=SpaceEventType.SPACE_CREATED, + space_id=space_id, + payload={"name": name}, + source=source, + ) + + +def space_updated_event( + space_id: str, + changes: Dict[str, Any], + source: Optional[str] = None, +) -> SpaceEvent: + """Create a SPACE_UPDATED event.""" + return SpaceEvent( + event_type=SpaceEventType.SPACE_UPDATED, + space_id=space_id, + payload={"changes": changes}, + source=source, + ) + + +def space_deleted_event( + space_id: str, + name: str, + source: Optional[str] = None, +) -> SpaceEvent: + """Create a SPACE_DELETED event.""" + return SpaceEvent( + event_type=SpaceEventType.SPACE_DELETED, + space_id=space_id, + payload={"name": name}, + source=source, + ) + + +def document_added_event( + space_id: str, + document_id: str, + space_path: str, + source: Optional[str] = None, +) -> SpaceEvent: + """Create a DOCUMENT_ADDED event.""" + return SpaceEvent( + event_type=SpaceEventType.DOCUMENT_ADDED, + space_id=space_id, + payload={"document_id": document_id, "space_path": space_path}, + source=source, + ) + + +def document_updated_event( + space_id: str, + document_id: str, + changes: Dict[str, Any], + source: Optional[str] = None, +) -> SpaceEvent: + """Create a DOCUMENT_UPDATED event.""" + return SpaceEvent( + event_type=SpaceEventType.DOCUMENT_UPDATED, + space_id=space_id, + payload={"document_id": document_id, "changes": changes}, + source=source, + ) + + +def document_removed_event( + space_id: str, + document_id: str, + space_path: str, + source: Optional[str] = None, +) -> SpaceEvent: + """Create a DOCUMENT_REMOVED event.""" + return SpaceEvent( + event_type=SpaceEventType.DOCUMENT_REMOVED, + space_id=space_id, + payload={"document_id": document_id, "space_path": space_path}, + source=source, + ) + + +def document_content_changed_event( + space_id: str, + document_id: str, + old_hash: Optional[str], + new_hash: str, + source: Optional[str] = None, +) -> SpaceEvent: + """Create a DOCUMENT_CONTENT_CHANGED event.""" + return SpaceEvent( + event_type=SpaceEventType.DOCUMENT_CONTENT_CHANGED, + space_id=space_id, + payload={ + "document_id": document_id, + "old_hash": old_hash, + "new_hash": new_hash, + }, + source=source, + ) + + +def cache_invalidated_event( + space_id: str, + document_ids: list, + reason: str, + source: Optional[str] = None, +) -> SpaceEvent: + """Create a CACHE_INVALIDATED event.""" + return SpaceEvent( + event_type=SpaceEventType.CACHE_INVALIDATED, + space_id=space_id, + payload={"document_ids": document_ids, "reason": reason}, + source=source, + ) diff --git a/markitect/spaces/services/space_service.py b/markitect/spaces/services/space_service.py index 995a7c3b..93ed8c0c 100644 --- a/markitect/spaces/services/space_service.py +++ b/markitect/spaces/services/space_service.py @@ -23,6 +23,17 @@ from ..repositories.interfaces import ( IVariableRepository, IReferenceRepository, ) +from ..events import ( + EventBus, + SpaceEvent, + SpaceEventType, + space_created_event, + space_updated_event, + space_deleted_event, + document_added_event, + document_removed_event, + document_content_changed_event, +) class SpaceService: @@ -54,6 +65,7 @@ class SpaceService: document_repo: IDocumentAssociationRepository, variable_repo: IVariableRepository, reference_repo: IReferenceRepository, + event_bus: Optional[EventBus] = None, ): """ Initialize the SpaceService. @@ -63,11 +75,18 @@ class SpaceService: document_repo: Repository for document associations variable_repo: Repository for space variables reference_repo: Repository for transclusion references + event_bus: Optional event bus for emitting events """ self._space_repo = space_repo self._document_repo = document_repo self._variable_repo = variable_repo self._reference_repo = reference_repo + self._event_bus = event_bus + + def _emit(self, event: SpaceEvent) -> None: + """Emit an event if event bus is configured.""" + if self._event_bus: + self._event_bus.emit(event) # ========================================================================= # Space CRUD Operations @@ -114,7 +133,9 @@ class SpaceService: parent_space_id=parent_space_id, ) - return self._space_repo.create(space) + created_space = self._space_repo.create(space) + self._emit(space_created_event(created_space.id, created_space.name)) + return created_space def get_space(self, space_id: str) -> Optional[InformationSpace]: """ @@ -198,7 +219,21 @@ class SpaceService: if metadata is not None: space.metadata = metadata - return self._space_repo.update(space) + updated_space = self._space_repo.update(space) + + # Build changes dict for event + changes = {} + if name is not None: + changes["name"] = name + if description is not None: + changes["description"] = description + if config is not None: + changes["config"] = "updated" + if metadata is not None: + changes["metadata"] = "updated" + + self._emit(space_updated_event(space_id, changes)) + return updated_space def delete_space(self, space_id: str, cascade: bool = True) -> bool: """ @@ -230,7 +265,11 @@ class SpaceService: for child in children: self.delete_space(child.id, cascade=True) - return self._space_repo.delete(space_id) + space_name = space.name + result = self._space_repo.delete(space_id) + if result: + self._emit(space_deleted_event(space_id, space_name)) + return result def activate_space(self, space_id: str) -> InformationSpace: """ @@ -250,7 +289,13 @@ class SpaceService: raise ValueError(f"Space '{space_id}' not found") space.activate() - return self._space_repo.update(space) + updated_space = self._space_repo.update(space) + self._emit(SpaceEvent( + event_type=SpaceEventType.SPACE_ACTIVATED, + space_id=space_id, + payload={"status": "active"}, + )) + return updated_space def archive_space(self, space_id: str) -> InformationSpace: """ @@ -270,7 +315,13 @@ class SpaceService: raise ValueError(f"Space '{space_id}' not found") space.archive() - return self._space_repo.update(space) + updated_space = self._space_repo.update(space) + self._emit(SpaceEvent( + event_type=SpaceEventType.SPACE_ARCHIVED, + space_id=space_id, + payload={"status": "archived"}, + )) + return updated_space def get_child_spaces(self, parent_space_id: str) -> List[InformationSpace]: """ @@ -330,7 +381,9 @@ class SpaceService: content_hash=content_hash, ) - return self._document_repo.add_document(document) + added_doc = self._document_repo.add_document(document) + self._emit(document_added_event(space_id, added_doc.id, space_path)) + return added_doc def get_document(self, document_id: str) -> Optional[SpaceDocument]: """ @@ -384,12 +437,20 @@ class SpaceService: Returns: True if removed, False if not found """ - # Clear any references from this document first + # Get document info before removal for event document = self._document_repo.get_document(document_id) - if document: - self._reference_repo.clear_references_from(document_id, document.space_id) + if not document: + return False - return self._document_repo.remove_document(document_id) + # Clear any references from this document first + self._reference_repo.clear_references_from(document_id, document.space_id) + + result = self._document_repo.remove_document(document_id) + if result: + self._emit(document_removed_event( + document.space_id, document_id, document.space_path + )) + return result def move_document(self, document_id: str, new_path: str) -> SpaceDocument: """ @@ -405,9 +466,26 @@ class SpaceService: Raises: ValueError: If document not found or new path exists """ + # Get old path for event + old_doc = self._document_repo.get_document(document_id) + old_path = old_doc.space_path if old_doc else None + if not new_path.startswith("/"): new_path = "/" + new_path - return self._document_repo.move_document(document_id, new_path) + + moved_doc = self._document_repo.move_document(document_id, new_path) + + if old_doc: + self._emit(SpaceEvent( + event_type=SpaceEventType.DOCUMENT_MOVED, + space_id=moved_doc.space_id, + payload={ + "document_id": document_id, + "old_path": old_path, + "new_path": new_path, + }, + )) + return moved_doc def reorder_documents(self, space_id: str, document_ids: List[str]) -> None: """ @@ -427,8 +505,20 @@ class SpaceService: document_id: The document association ID content_hash: The new content hash """ + # Get old hash for event + document = self._document_repo.get_document(document_id) + old_hash = document.content_hash if document else None + self._document_repo.update_content_hash(document_id, content_hash) + if document and old_hash != content_hash: + self._emit(document_content_changed_event( + document.space_id, + document_id, + old_hash, + content_hash, + )) + # ========================================================================= # Variable Operations # ========================================================================= @@ -465,7 +555,13 @@ class SpaceService: scope=scope, ) - return self._variable_repo.set_variable(variable) + saved_var = self._variable_repo.set_variable(variable) + self._emit(SpaceEvent( + event_type=SpaceEventType.VARIABLE_SET, + space_id=space_id, + payload={"name": name, "scope": scope}, + )) + return saved_var def get_variable(self, space_id: str, name: str) -> Optional[SpaceVariable]: """ @@ -506,7 +602,14 @@ class SpaceService: Returns: True if deleted, False if not found """ - return self._variable_repo.delete_variable(space_id, name) + result = self._variable_repo.delete_variable(space_id, name) + if result: + self._emit(SpaceEvent( + event_type=SpaceEventType.VARIABLE_DELETED, + space_id=space_id, + payload={"name": name}, + )) + return result def get_variables_dict(self, space_id: str) -> Dict[str, Any]: """ diff --git a/tests/integration/spaces/test_event_propagation.py b/tests/integration/spaces/test_event_propagation.py new file mode 100644 index 00000000..f5d6f36a --- /dev/null +++ b/tests/integration/spaces/test_event_propagation.py @@ -0,0 +1,407 @@ +""" +Integration tests for event propagation in SpaceService. + +Tests that events are correctly emitted for all space operations. +""" + +import pytest +import tempfile +import os +from unittest.mock import Mock + +from markitect.spaces import ( + SpaceService, + SqliteSpaceRepository, + SqliteDocumentRepository, + SqliteVariableRepository, + SqliteReferenceRepository, +) +from markitect.spaces.events import ( + EventBus, + SpaceEvent, + SpaceEventType, +) + + +@pytest.fixture +def temp_db(): + """Create a temporary database file for testing.""" + fd, path = tempfile.mkstemp(suffix=".db") + os.close(fd) + yield path + if os.path.exists(path): + os.unlink(path) + + +@pytest.fixture +def event_bus(): + """Create an EventBus for testing.""" + return EventBus() + + +@pytest.fixture +def space_service(temp_db, event_bus): + """Create a SpaceService with EventBus for testing.""" + return SpaceService( + space_repo=SqliteSpaceRepository(temp_db), + document_repo=SqliteDocumentRepository(temp_db), + variable_repo=SqliteVariableRepository(temp_db), + reference_repo=SqliteReferenceRepository(temp_db), + event_bus=event_bus, + ) + + +class TestSpaceEvents: + """Tests for space lifecycle events.""" + + def test_space_created_event(self, space_service, event_bus): + """Test that SPACE_CREATED event is emitted.""" + received_events = [] + event_bus.subscribe(SpaceEventType.SPACE_CREATED, received_events.append) + + space = space_service.create_space(name="test-space") + + assert len(received_events) == 1 + event = received_events[0] + assert event.event_type == SpaceEventType.SPACE_CREATED + assert event.space_id == space.id + assert event.payload["name"] == "test-space" + + def test_space_updated_event(self, space_service, event_bus): + """Test that SPACE_UPDATED event is emitted.""" + space = space_service.create_space(name="original") + + received_events = [] + event_bus.subscribe(SpaceEventType.SPACE_UPDATED, received_events.append) + + space_service.update_space(space.id, name="updated", description="New desc") + + assert len(received_events) == 1 + event = received_events[0] + assert event.event_type == SpaceEventType.SPACE_UPDATED + assert event.space_id == space.id + assert event.payload["changes"]["name"] == "updated" + assert event.payload["changes"]["description"] == "New desc" + + def test_space_deleted_event(self, space_service, event_bus): + """Test that SPACE_DELETED event is emitted.""" + space = space_service.create_space(name="to-delete") + + received_events = [] + event_bus.subscribe(SpaceEventType.SPACE_DELETED, received_events.append) + + space_service.delete_space(space.id) + + assert len(received_events) == 1 + event = received_events[0] + assert event.event_type == SpaceEventType.SPACE_DELETED + assert event.space_id == space.id + assert event.payload["name"] == "to-delete" + + def test_space_activated_event(self, space_service, event_bus): + """Test that SPACE_ACTIVATED event is emitted.""" + space = space_service.create_space(name="to-activate") + + received_events = [] + event_bus.subscribe(SpaceEventType.SPACE_ACTIVATED, received_events.append) + + space_service.activate_space(space.id) + + assert len(received_events) == 1 + event = received_events[0] + assert event.event_type == SpaceEventType.SPACE_ACTIVATED + assert event.space_id == space.id + + def test_space_archived_event(self, space_service, event_bus): + """Test that SPACE_ARCHIVED event is emitted.""" + space = space_service.create_space(name="to-archive") + + received_events = [] + event_bus.subscribe(SpaceEventType.SPACE_ARCHIVED, received_events.append) + + space_service.archive_space(space.id) + + assert len(received_events) == 1 + event = received_events[0] + assert event.event_type == SpaceEventType.SPACE_ARCHIVED + assert event.space_id == space.id + + def test_cascade_delete_emits_multiple_events(self, space_service, event_bus): + """Test that cascade delete emits events for all deleted spaces.""" + parent = space_service.create_space(name="parent") + child1 = space_service.create_space(name="child1", parent_space_id=parent.id) + child2 = space_service.create_space(name="child2", parent_space_id=parent.id) + + received_events = [] + event_bus.subscribe(SpaceEventType.SPACE_DELETED, received_events.append) + + space_service.delete_space(parent.id, cascade=True) + + # Should have events for parent and both children + assert len(received_events) == 3 + deleted_ids = {e.space_id for e in received_events} + assert parent.id in deleted_ids + assert child1.id in deleted_ids + assert child2.id in deleted_ids + + +class TestDocumentEvents: + """Tests for document events.""" + + def test_document_added_event(self, space_service, event_bus): + """Test that DOCUMENT_ADDED event is emitted.""" + space = space_service.create_space(name="doc-space") + + received_events = [] + event_bus.subscribe(SpaceEventType.DOCUMENT_ADDED, received_events.append) + + doc = space_service.add_document(space.id, "/intro.md") + + assert len(received_events) == 1 + event = received_events[0] + assert event.event_type == SpaceEventType.DOCUMENT_ADDED + assert event.space_id == space.id + assert event.payload["document_id"] == doc.id + assert event.payload["space_path"] == "/intro.md" + + def test_document_removed_event(self, space_service, event_bus): + """Test that DOCUMENT_REMOVED event is emitted.""" + space = space_service.create_space(name="doc-space") + doc = space_service.add_document(space.id, "/to-remove.md") + + received_events = [] + event_bus.subscribe(SpaceEventType.DOCUMENT_REMOVED, received_events.append) + + space_service.remove_document(doc.id) + + assert len(received_events) == 1 + event = received_events[0] + assert event.event_type == SpaceEventType.DOCUMENT_REMOVED + assert event.space_id == space.id + assert event.payload["document_id"] == doc.id + assert event.payload["space_path"] == "/to-remove.md" + + def test_document_moved_event(self, space_service, event_bus): + """Test that DOCUMENT_MOVED event is emitted.""" + space = space_service.create_space(name="doc-space") + doc = space_service.add_document(space.id, "/old-path.md") + + received_events = [] + event_bus.subscribe(SpaceEventType.DOCUMENT_MOVED, received_events.append) + + space_service.move_document(doc.id, "/new-path.md") + + assert len(received_events) == 1 + event = received_events[0] + assert event.event_type == SpaceEventType.DOCUMENT_MOVED + assert event.space_id == space.id + assert event.payload["old_path"] == "/old-path.md" + assert event.payload["new_path"] == "/new-path.md" + + def test_document_content_changed_event(self, space_service, event_bus): + """Test that DOCUMENT_CONTENT_CHANGED event is emitted.""" + space = space_service.create_space(name="doc-space") + doc = space_service.add_document( + space.id, "/content.md", content_hash="hash-v1" + ) + + received_events = [] + event_bus.subscribe( + SpaceEventType.DOCUMENT_CONTENT_CHANGED, received_events.append + ) + + space_service.update_document_hash(doc.id, "hash-v2") + + assert len(received_events) == 1 + event = received_events[0] + assert event.event_type == SpaceEventType.DOCUMENT_CONTENT_CHANGED + assert event.space_id == space.id + assert event.payload["old_hash"] == "hash-v1" + assert event.payload["new_hash"] == "hash-v2" + + def test_no_event_if_hash_unchanged(self, space_service, event_bus): + """Test that no event is emitted if hash is unchanged.""" + space = space_service.create_space(name="doc-space") + doc = space_service.add_document( + space.id, "/content.md", content_hash="same-hash" + ) + + received_events = [] + event_bus.subscribe( + SpaceEventType.DOCUMENT_CONTENT_CHANGED, received_events.append + ) + + space_service.update_document_hash(doc.id, "same-hash") + + assert len(received_events) == 0 + + +class TestVariableEvents: + """Tests for variable events.""" + + def test_variable_set_event(self, space_service, event_bus): + """Test that VARIABLE_SET event is emitted.""" + space = space_service.create_space(name="var-space") + + received_events = [] + event_bus.subscribe(SpaceEventType.VARIABLE_SET, received_events.append) + + space_service.set_variable(space.id, "version", "1.0.0") + + assert len(received_events) == 1 + event = received_events[0] + assert event.event_type == SpaceEventType.VARIABLE_SET + assert event.space_id == space.id + assert event.payload["name"] == "version" + + def test_variable_deleted_event(self, space_service, event_bus): + """Test that VARIABLE_DELETED event is emitted.""" + space = space_service.create_space(name="var-space") + space_service.set_variable(space.id, "temp", "value") + + received_events = [] + event_bus.subscribe(SpaceEventType.VARIABLE_DELETED, received_events.append) + + space_service.delete_variable(space.id, "temp") + + assert len(received_events) == 1 + event = received_events[0] + assert event.event_type == SpaceEventType.VARIABLE_DELETED + assert event.space_id == space.id + assert event.payload["name"] == "temp" + + def test_no_event_if_variable_not_found(self, space_service, event_bus): + """Test that no event is emitted if variable doesn't exist.""" + space = space_service.create_space(name="var-space") + + received_events = [] + event_bus.subscribe(SpaceEventType.VARIABLE_DELETED, received_events.append) + + space_service.delete_variable(space.id, "non-existent") + + assert len(received_events) == 0 + + +class TestEventHandlerIntegration: + """Tests for event handler integration patterns.""" + + def test_global_handler_receives_all_events(self, space_service, event_bus): + """Test that a global handler receives all event types.""" + received_events = [] + event_bus.subscribe_all(received_events.append) + + space = space_service.create_space(name="test") + space_service.add_document(space.id, "/doc.md") + space_service.set_variable(space.id, "var", "value") + space_service.delete_space(space.id) + + # Should have: SPACE_CREATED, DOCUMENT_ADDED, VARIABLE_SET, SPACE_DELETED + assert len(received_events) >= 4 + + event_types = {e.event_type for e in received_events} + assert SpaceEventType.SPACE_CREATED in event_types + assert SpaceEventType.DOCUMENT_ADDED in event_types + assert SpaceEventType.VARIABLE_SET in event_types + assert SpaceEventType.SPACE_DELETED in event_types + + def test_event_history_records_all_events(self, space_service, event_bus): + """Test that event history captures all events.""" + event_bus.enable_history() + + space = space_service.create_space(name="history-test") + space_service.add_document(space.id, "/doc1.md") + space_service.add_document(space.id, "/doc2.md") + space_service.activate_space(space.id) + + history = event_bus.get_history() + assert len(history) == 4 + + # Newest first + assert history[0].event_type == SpaceEventType.SPACE_ACTIVATED + assert history[1].event_type == SpaceEventType.DOCUMENT_ADDED + assert history[2].event_type == SpaceEventType.DOCUMENT_ADDED + assert history[3].event_type == SpaceEventType.SPACE_CREATED + + def test_filter_history_by_space(self, space_service, event_bus): + """Test filtering event history by space ID.""" + event_bus.enable_history() + + space1 = space_service.create_space(name="space1") + space2 = space_service.create_space(name="space2") + space_service.add_document(space1.id, "/doc.md") + space_service.add_document(space2.id, "/doc.md") + + history_space1 = event_bus.get_history(space_id=space1.id) + assert len(history_space1) == 2 # create + add_document + + history_space2 = event_bus.get_history(space_id=space2.id) + assert len(history_space2) == 2 + + def test_multiple_handlers_for_same_event(self, space_service, event_bus): + """Test that multiple handlers can listen to the same event.""" + handler1_events = [] + handler2_events = [] + + event_bus.subscribe(SpaceEventType.SPACE_CREATED, handler1_events.append) + event_bus.subscribe(SpaceEventType.SPACE_CREATED, handler2_events.append) + + space_service.create_space(name="multi-handler-test") + + assert len(handler1_events) == 1 + assert len(handler2_events) == 1 + assert handler1_events[0].event_id == handler2_events[0].event_id + + +class TestEventDrivenWorkflow: + """Tests for event-driven workflow patterns.""" + + def test_cache_invalidation_pattern(self, space_service, event_bus): + """Test a cache invalidation pattern using events.""" + cache = {} + + def on_content_changed(event: SpaceEvent): + doc_id = event.payload["document_id"] + if doc_id in cache: + del cache[doc_id] + + event_bus.subscribe( + SpaceEventType.DOCUMENT_CONTENT_CHANGED, on_content_changed + ) + + # Setup + space = space_service.create_space(name="cache-test") + doc = space_service.add_document(space.id, "/doc.md", content_hash="v1") + + # Simulate cached content + cache[doc.id] = "cached content" + + # Update triggers cache invalidation via event + space_service.update_document_hash(doc.id, "v2") + + assert doc.id not in cache + + def test_audit_log_pattern(self, space_service, event_bus): + """Test an audit log pattern using events.""" + audit_log = [] + + def log_event(event: SpaceEvent): + audit_log.append({ + "event_type": event.event_type.value, + "space_id": event.space_id, + "timestamp": event.timestamp.isoformat(), + }) + + event_bus.subscribe_all(log_event) + + space = space_service.create_space(name="audit-test") + space_service.activate_space(space.id) + space_service.archive_space(space.id) + space_service.delete_space(space.id) + + # Should have complete audit trail + assert len(audit_log) == 4 + event_types = [entry["event_type"] for entry in audit_log] + assert "space.created" in event_types + assert "space.activated" in event_types + assert "space.archived" in event_types + assert "space.deleted" in event_types diff --git a/tests/unit/spaces/test_events.py b/tests/unit/spaces/test_events.py new file mode 100644 index 00000000..476ee954 --- /dev/null +++ b/tests/unit/spaces/test_events.py @@ -0,0 +1,581 @@ +""" +Unit tests for the space event system. + +Tests the event models and EventBus functionality. +""" + +import pytest +import asyncio +from datetime import datetime +from unittest.mock import Mock, AsyncMock + +from markitect.spaces.events import ( + SpaceEvent, + SpaceEventType, + EventBus, + get_event_bus, + reset_event_bus, + space_created_event, + space_updated_event, + space_deleted_event, + document_added_event, + document_updated_event, + document_removed_event, + document_content_changed_event, + cache_invalidated_event, +) + + +class TestSpaceEventType: + """Tests for SpaceEventType enum.""" + + def test_event_type_values(self): + """Test that event types have expected string values.""" + assert SpaceEventType.SPACE_CREATED.value == "space.created" + assert SpaceEventType.DOCUMENT_ADDED.value == "document.added" + assert SpaceEventType.CACHE_INVALIDATED.value == "cache.invalidated" + + def test_all_event_types_have_values(self): + """Test that all event types have non-empty values.""" + for event_type in SpaceEventType: + assert event_type.value + assert "." in event_type.value + + +class TestSpaceEvent: + """Tests for SpaceEvent dataclass.""" + + def test_event_creation(self): + """Test basic event creation.""" + event = SpaceEvent( + event_type=SpaceEventType.SPACE_CREATED, + space_id="space-123", + payload={"name": "test-space"}, + ) + + assert event.event_type == SpaceEventType.SPACE_CREATED + assert event.space_id == "space-123" + assert event.payload["name"] == "test-space" + assert event.event_id is not None + assert event.timestamp is not None + + def test_event_with_source_and_correlation(self): + """Test event with source and correlation ID.""" + event = SpaceEvent( + event_type=SpaceEventType.DOCUMENT_ADDED, + space_id="space-123", + source="api", + correlation_id="req-456", + ) + + assert event.source == "api" + assert event.correlation_id == "req-456" + + def test_event_to_dict(self): + """Test event serialization.""" + event = SpaceEvent( + event_type=SpaceEventType.SPACE_UPDATED, + space_id="space-123", + payload={"changes": {"name": "new-name"}}, + source="cli", + ) + + data = event.to_dict() + + assert data["event_type"] == "space.updated" + assert data["space_id"] == "space-123" + assert data["payload"]["changes"]["name"] == "new-name" + assert data["source"] == "cli" + assert "event_id" in data + assert "timestamp" in data + + def test_event_from_dict(self): + """Test event deserialization.""" + data = { + "event_id": "evt-123", + "event_type": "document.added", + "space_id": "space-456", + "payload": {"document_id": "doc-1", "space_path": "/intro.md"}, + "timestamp": "2025-01-15T10:30:00", + "source": "sync", + } + + event = SpaceEvent.from_dict(data) + + assert event.event_id == "evt-123" + assert event.event_type == SpaceEventType.DOCUMENT_ADDED + assert event.space_id == "space-456" + assert event.payload["document_id"] == "doc-1" + assert event.source == "sync" + + def test_event_roundtrip(self): + """Test that to_dict and from_dict are inverses.""" + original = SpaceEvent( + event_type=SpaceEventType.RENDER_COMPLETED, + space_id="space-123", + payload={"output_path": "/tmp/output"}, + source="renderer", + correlation_id="batch-1", + ) + + data = original.to_dict() + restored = SpaceEvent.from_dict(data) + + assert restored.event_type == original.event_type + assert restored.space_id == original.space_id + assert restored.payload == original.payload + assert restored.source == original.source + assert restored.correlation_id == original.correlation_id + + +class TestEventFactories: + """Tests for event factory functions.""" + + def test_space_created_event(self): + """Test space_created_event factory.""" + event = space_created_event("space-1", "my-docs", source="api") + + assert event.event_type == SpaceEventType.SPACE_CREATED + assert event.space_id == "space-1" + assert event.payload["name"] == "my-docs" + assert event.source == "api" + + def test_space_updated_event(self): + """Test space_updated_event factory.""" + event = space_updated_event( + "space-1", + changes={"description": "Updated description"}, + ) + + assert event.event_type == SpaceEventType.SPACE_UPDATED + assert event.payload["changes"]["description"] == "Updated description" + + def test_space_deleted_event(self): + """Test space_deleted_event factory.""" + event = space_deleted_event("space-1", "old-space") + + assert event.event_type == SpaceEventType.SPACE_DELETED + assert event.payload["name"] == "old-space" + + def test_document_added_event(self): + """Test document_added_event factory.""" + event = document_added_event("space-1", "doc-1", "/intro.md") + + assert event.event_type == SpaceEventType.DOCUMENT_ADDED + assert event.payload["document_id"] == "doc-1" + assert event.payload["space_path"] == "/intro.md" + + def test_document_content_changed_event(self): + """Test document_content_changed_event factory.""" + event = document_content_changed_event( + "space-1", "doc-1", old_hash="abc", new_hash="xyz" + ) + + assert event.event_type == SpaceEventType.DOCUMENT_CONTENT_CHANGED + assert event.payload["old_hash"] == "abc" + assert event.payload["new_hash"] == "xyz" + + def test_cache_invalidated_event(self): + """Test cache_invalidated_event factory.""" + event = cache_invalidated_event( + "space-1", + document_ids=["doc-1", "doc-2"], + reason="dependency_changed", + ) + + assert event.event_type == SpaceEventType.CACHE_INVALIDATED + assert event.payload["document_ids"] == ["doc-1", "doc-2"] + assert event.payload["reason"] == "dependency_changed" + + +class TestEventBus: + """Tests for EventBus.""" + + @pytest.fixture + def bus(self): + """Create a fresh EventBus for each test.""" + return EventBus() + + def test_subscribe_and_emit(self, bus): + """Test basic subscribe and emit.""" + received_events = [] + + def handler(event: SpaceEvent): + received_events.append(event) + + bus.subscribe(SpaceEventType.SPACE_CREATED, handler) + + event = space_created_event("space-1", "test") + bus.emit(event) + + assert len(received_events) == 1 + assert received_events[0].event_type == SpaceEventType.SPACE_CREATED + + def test_subscribe_returns_handler_id(self, bus): + """Test that subscribe returns a handler ID.""" + handler_id = bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None) + assert handler_id is not None + assert isinstance(handler_id, str) + + def test_unsubscribe(self, bus): + """Test unsubscribing a handler.""" + received_events = [] + + def handler(event: SpaceEvent): + received_events.append(event) + + bus.subscribe(SpaceEventType.SPACE_CREATED, handler) + bus.emit(space_created_event("s1", "test1")) + + assert len(received_events) == 1 + + result = bus.unsubscribe(SpaceEventType.SPACE_CREATED, handler) + assert result is True + + bus.emit(space_created_event("s2", "test2")) + assert len(received_events) == 1 # No new events + + def test_unsubscribe_by_id(self, bus): + """Test unsubscribing by handler ID.""" + handler = Mock() + handler_id = bus.subscribe(SpaceEventType.SPACE_CREATED, handler) + + bus.emit(space_created_event("s1", "test")) + assert handler.call_count == 1 + + result = bus.unsubscribe_by_id(handler_id) + assert result is True + + bus.emit(space_created_event("s2", "test")) + assert handler.call_count == 1 # No additional calls + + def test_multiple_handlers(self, bus): + """Test multiple handlers for same event type.""" + handler1 = Mock() + handler2 = Mock() + + bus.subscribe(SpaceEventType.SPACE_CREATED, handler1) + bus.subscribe(SpaceEventType.SPACE_CREATED, handler2) + + bus.emit(space_created_event("s1", "test")) + + handler1.assert_called_once() + handler2.assert_called_once() + + def test_handler_receives_correct_event(self, bus): + """Test that handlers receive the emitted event.""" + received = [] + + def handler(event: SpaceEvent): + received.append(event) + + bus.subscribe(SpaceEventType.DOCUMENT_ADDED, handler) + + event = document_added_event("s1", "doc-1", "/intro.md") + bus.emit(event) + + assert len(received) == 1 + assert received[0].event_id == event.event_id + + def test_handler_isolation_by_event_type(self, bus): + """Test that handlers only receive their subscribed event types.""" + space_handler = Mock() + doc_handler = Mock() + + bus.subscribe(SpaceEventType.SPACE_CREATED, space_handler) + bus.subscribe(SpaceEventType.DOCUMENT_ADDED, doc_handler) + + bus.emit(space_created_event("s1", "test")) + + space_handler.assert_called_once() + doc_handler.assert_not_called() + + def test_global_handler(self, bus): + """Test handlers subscribed to all events.""" + global_handler = Mock() + specific_handler = Mock() + + bus.subscribe_all(global_handler) + bus.subscribe(SpaceEventType.SPACE_CREATED, specific_handler) + + bus.emit(space_created_event("s1", "test")) + bus.emit(document_added_event("s1", "doc-1", "/intro.md")) + + assert global_handler.call_count == 2 + assert specific_handler.call_count == 1 + + def test_handler_priority(self, bus): + """Test that handlers are called in priority order.""" + call_order = [] + + def handler_low(event): + call_order.append("low") + + def handler_high(event): + call_order.append("high") + + def handler_medium(event): + call_order.append("medium") + + bus.subscribe(SpaceEventType.SPACE_CREATED, handler_low, priority=100) + bus.subscribe(SpaceEventType.SPACE_CREATED, handler_high, priority=10) + bus.subscribe(SpaceEventType.SPACE_CREATED, handler_medium, priority=50) + + bus.emit(space_created_event("s1", "test")) + + assert call_order == ["high", "medium", "low"] + + def test_handler_exception_does_not_stop_others(self, bus): + """Test that one handler's exception doesn't prevent other handlers.""" + handler1 = Mock() + handler2 = Mock(side_effect=ValueError("Test error")) + handler3 = Mock() + + bus.subscribe(SpaceEventType.SPACE_CREATED, handler1, priority=1) + bus.subscribe(SpaceEventType.SPACE_CREATED, handler2, priority=2) + bus.subscribe(SpaceEventType.SPACE_CREATED, handler3, priority=3) + + exceptions = bus.emit(space_created_event("s1", "test")) + + handler1.assert_called_once() + handler2.assert_called_once() + handler3.assert_called_once() + assert len(exceptions) == 1 + assert isinstance(exceptions[0], ValueError) + + def test_has_handlers(self, bus): + """Test checking for handlers.""" + assert bus.has_handlers(SpaceEventType.SPACE_CREATED) is False + + bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None) + + assert bus.has_handlers(SpaceEventType.SPACE_CREATED) is True + assert bus.has_handlers(SpaceEventType.DOCUMENT_ADDED) is False + + def test_has_handlers_with_global(self, bus): + """Test that has_handlers considers global handlers.""" + assert bus.has_handlers(SpaceEventType.DOCUMENT_ADDED) is False + + bus.subscribe_all(lambda e: None) + + assert bus.has_handlers(SpaceEventType.DOCUMENT_ADDED) is True + + def test_handler_count(self, bus): + """Test counting handlers.""" + assert bus.handler_count() == 0 + assert bus.handler_count(SpaceEventType.SPACE_CREATED) == 0 + + bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None) + bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None) + bus.subscribe(SpaceEventType.DOCUMENT_ADDED, lambda e: None) + + assert bus.handler_count() == 3 + assert bus.handler_count(SpaceEventType.SPACE_CREATED) == 2 + assert bus.handler_count(SpaceEventType.DOCUMENT_ADDED) == 1 + + def test_clear_all(self, bus): + """Test clearing all handlers.""" + bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None) + bus.subscribe(SpaceEventType.DOCUMENT_ADDED, lambda e: None) + bus.subscribe_all(lambda e: None) + + count = bus.clear() + + assert count == 3 + assert bus.handler_count() == 0 + + def test_clear_by_event_type(self, bus): + """Test clearing handlers for specific event type.""" + bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None) + bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None) + bus.subscribe(SpaceEventType.DOCUMENT_ADDED, lambda e: None) + + count = bus.clear(SpaceEventType.SPACE_CREATED) + + assert count == 2 + assert bus.handler_count(SpaceEventType.SPACE_CREATED) == 0 + assert bus.handler_count(SpaceEventType.DOCUMENT_ADDED) == 1 + + +class TestEventBusHistory: + """Tests for EventBus history feature.""" + + @pytest.fixture + def bus(self): + """Create a fresh EventBus with history enabled.""" + bus = EventBus() + bus.enable_history() + return bus + + def test_history_disabled_by_default(self): + """Test that history is disabled by default.""" + bus = EventBus() + bus.emit(space_created_event("s1", "test")) + + history = bus.get_history() + assert len(history) == 0 + + def test_history_records_events(self, bus): + """Test that history records emitted events.""" + bus.emit(space_created_event("s1", "test1")) + bus.emit(space_created_event("s2", "test2")) + + history = bus.get_history() + assert len(history) == 2 + + def test_history_newest_first(self, bus): + """Test that history returns newest events first.""" + bus.emit(space_created_event("s1", "first")) + bus.emit(space_created_event("s2", "second")) + + history = bus.get_history() + assert history[0].payload["name"] == "second" + assert history[1].payload["name"] == "first" + + def test_history_filter_by_event_type(self, bus): + """Test filtering history by event type.""" + bus.emit(space_created_event("s1", "test")) + bus.emit(document_added_event("s1", "doc-1", "/intro.md")) + bus.emit(space_created_event("s2", "test2")) + + history = bus.get_history(event_type=SpaceEventType.SPACE_CREATED) + assert len(history) == 2 + + def test_history_filter_by_space_id(self, bus): + """Test filtering history by space ID.""" + bus.emit(space_created_event("s1", "test1")) + bus.emit(space_created_event("s2", "test2")) + bus.emit(document_added_event("s1", "doc-1", "/intro.md")) + + history = bus.get_history(space_id="s1") + assert len(history) == 2 + + def test_history_limit(self, bus): + """Test history limit.""" + for i in range(10): + bus.emit(space_created_event(f"s{i}", f"test{i}")) + + history = bus.get_history(limit=3) + assert len(history) == 3 + + def test_history_max_size(self): + """Test that history respects max size.""" + bus = EventBus() + bus.enable_history(max_events=5) + + for i in range(10): + bus.emit(space_created_event(f"s{i}", f"test{i}")) + + history = bus.get_history() + assert len(history) == 5 + # Should have the most recent events + assert history[0].payload["name"] == "test9" + + def test_clear_history(self, bus): + """Test clearing history.""" + bus.emit(space_created_event("s1", "test")) + bus.emit(space_created_event("s2", "test")) + + bus.clear_history() + + history = bus.get_history() + assert len(history) == 0 + + def test_disable_history(self, bus): + """Test disabling history.""" + bus.emit(space_created_event("s1", "test")) + bus.disable_history() + bus.emit(space_created_event("s2", "test")) + + history = bus.get_history() + assert len(history) == 1 + + +class TestEventBusAsync: + """Tests for async EventBus functionality.""" + + @pytest.fixture + def bus(self): + """Create a fresh EventBus.""" + return EventBus() + + @pytest.mark.asyncio + async def test_async_handler(self, bus): + """Test async event handler.""" + received = [] + + async def async_handler(event: SpaceEvent): + await asyncio.sleep(0.01) + received.append(event) + + bus.subscribe(SpaceEventType.SPACE_CREATED, async_handler) + + await bus.emit_async(space_created_event("s1", "test")) + + assert len(received) == 1 + + @pytest.mark.asyncio + async def test_mixed_sync_async_handlers(self, bus): + """Test mixing sync and async handlers.""" + sync_received = [] + async_received = [] + + def sync_handler(event: SpaceEvent): + sync_received.append(event) + + async def async_handler(event: SpaceEvent): + await asyncio.sleep(0.01) + async_received.append(event) + + bus.subscribe(SpaceEventType.SPACE_CREATED, sync_handler) + bus.subscribe(SpaceEventType.SPACE_CREATED, async_handler) + + await bus.emit_async(space_created_event("s1", "test")) + + assert len(sync_received) == 1 + assert len(async_received) == 1 + + @pytest.mark.asyncio + async def test_async_handler_exception(self, bus): + """Test that async handler exceptions are caught.""" + async def failing_handler(event: SpaceEvent): + raise ValueError("Async error") + + bus.subscribe(SpaceEventType.SPACE_CREATED, failing_handler) + + exceptions = await bus.emit_async(space_created_event("s1", "test")) + + assert len(exceptions) == 1 + assert isinstance(exceptions[0], ValueError) + + +class TestGlobalEventBus: + """Tests for global event bus singleton.""" + + def test_get_event_bus_returns_same_instance(self): + """Test that get_event_bus returns the same instance.""" + reset_event_bus() + bus1 = get_event_bus() + bus2 = get_event_bus() + + assert bus1 is bus2 + + def test_reset_event_bus(self): + """Test that reset_event_bus creates new instance.""" + bus1 = get_event_bus() + reset_event_bus() + bus2 = get_event_bus() + + assert bus1 is not bus2 + + def test_reset_clears_handlers(self): + """Test that reset clears all handlers.""" + bus = get_event_bus() + bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None) + + assert bus.handler_count() == 1 + + reset_event_bus() + new_bus = get_event_bus() + + assert new_bus.handler_count() == 0