feat(spaces): implement Phase 2 Event System

Week 4 - Event Infrastructure:
- Create SpaceEventType enum with 18 event types covering space lifecycle,
  document operations, variables, references, rendering, sync, and cache
- Create SpaceEvent dataclass with serialization/deserialization
- Create EventBus with sync/async handler support, priority ordering,
  global handlers, and optional event history
- Add event factory functions for common events

Week 5 - Event Integration:
- Wire EventBus into SpaceService as optional dependency
- Emit events for all space operations:
  - SPACE_CREATED, SPACE_UPDATED, SPACE_DELETED, SPACE_ACTIVATED, SPACE_ARCHIVED
  - DOCUMENT_ADDED, DOCUMENT_REMOVED, DOCUMENT_MOVED, DOCUMENT_CONTENT_CHANGED
  - VARIABLE_SET, VARIABLE_DELETED
- Create integration tests for event propagation patterns

Test coverage: 187 tests total
- 43 unit tests for event system
- 20 integration tests for event propagation
- 124 existing tests continue to pass

Capabilities delivered:
- CAP-010: SpaceEvent base with type, payload, timestamp
- CAP-011: EventBus with in-process publish/subscribe
- CAP-012: Event handlers registry with priority support
- CAP-013: Change detection via content hash comparison

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-08 07:41:47 +01:00
parent 9b12875681
commit 0a494b2011
7 changed files with 1807 additions and 19 deletions

View File

@@ -51,6 +51,15 @@ from .repositories import (
initialize_space_tables,
)
# Phase 2: Event System
from .events import (
SpaceEvent,
SpaceEventType,
EventBus,
get_event_bus,
reset_event_bus,
)
__all__ = [
# Models
"InformationSpace",
@@ -73,4 +82,10 @@ __all__ = [
"SqliteVariableRepository",
"SqliteReferenceRepository",
"initialize_space_tables",
# Event System
"SpaceEvent",
"SpaceEventType",
"EventBus",
"get_event_bus",
"reset_event_bus",
]

View File

@@ -3,14 +3,55 @@ Event system for Information Spaces.
This package provides event-driven architecture for space operations:
- SpaceEvent: Event dataclass with type, payload, timestamp
- SpaceEventType: Enum of all event types
- EventBus: In-process publish/subscribe for space events
- Event handlers and registration
- Event factory functions for common events
Events emitted:
- SPACE_CREATED, SPACE_UPDATED, SPACE_DELETED
- DOCUMENT_ADDED, DOCUMENT_UPDATED, DOCUMENT_REMOVED
- RENDER_COMPLETED, SYNC_COMPLETED
- Space lifecycle: SPACE_CREATED, SPACE_UPDATED, SPACE_DELETED, SPACE_ACTIVATED, SPACE_ARCHIVED
- Document: DOCUMENT_ADDED, DOCUMENT_UPDATED, DOCUMENT_REMOVED, DOCUMENT_MOVED, DOCUMENT_CONTENT_CHANGED
- Variable: VARIABLE_SET, VARIABLE_DELETED
- Reference: REFERENCE_ADDED, REFERENCE_CLEARED
- Rendering: RENDER_STARTED, RENDER_COMPLETED, RENDER_FAILED
- Sync: SYNC_STARTED, SYNC_COMPLETED, SYNC_CONFLICT
- Cache: CACHE_INVALIDATED
"""
# Events will be implemented in Phase 2
__all__ = []
from .models import (
SpaceEvent,
SpaceEventType,
space_created_event,
space_updated_event,
space_deleted_event,
document_added_event,
document_updated_event,
document_removed_event,
document_content_changed_event,
cache_invalidated_event,
)
from .bus import (
EventBus,
HandlerRegistration,
get_event_bus,
reset_event_bus,
)
__all__ = [
# Event models
"SpaceEvent",
"SpaceEventType",
# Event factories
"space_created_event",
"space_updated_event",
"space_deleted_event",
"document_added_event",
"document_updated_event",
"document_removed_event",
"document_content_changed_event",
"cache_invalidated_event",
# Event bus
"EventBus",
"HandlerRegistration",
"get_event_bus",
"reset_event_bus",
]

View File

@@ -0,0 +1,402 @@
"""
EventBus for Information Spaces.
This module provides an in-process publish/subscribe system
for space events. It supports both synchronous and asynchronous
event handlers.
"""
import asyncio
import logging
from collections import defaultdict
from typing import Callable, Dict, List, Optional, Set, Union, Any
from dataclasses import dataclass, field
import weakref
from .models import SpaceEvent, SpaceEventType
# Type aliases for handlers
SyncHandler = Callable[[SpaceEvent], None]
AsyncHandler = Callable[[SpaceEvent], Any] # Coroutine
Handler = Union[SyncHandler, AsyncHandler]
logger = logging.getLogger(__name__)
@dataclass
class HandlerRegistration:
"""
Registration info for an event handler.
Attributes:
handler: The handler function
is_async: Whether the handler is async
priority: Handler priority (lower = earlier execution)
handler_id: Unique ID for this registration
"""
handler: Handler
is_async: bool = False
priority: int = 100
handler_id: str = field(default_factory=lambda: str(id(None)))
def __post_init__(self):
self.handler_id = str(id(self.handler))
class EventBus:
"""
In-process event bus for space events.
Provides publish/subscribe functionality for space operations.
Handlers can be registered for specific event types or for all events.
Usage:
bus = EventBus()
# Register a handler
def on_space_created(event: SpaceEvent):
print(f"Space created: {event.payload['name']}")
bus.subscribe(SpaceEventType.SPACE_CREATED, on_space_created)
# Emit an event
bus.emit(SpaceEvent(
event_type=SpaceEventType.SPACE_CREATED,
space_id="space-123",
payload={"name": "my-docs"}
))
# Unsubscribe
bus.unsubscribe(SpaceEventType.SPACE_CREATED, on_space_created)
"""
def __init__(self):
"""Initialize the event bus."""
# Handlers indexed by event type
self._handlers: Dict[SpaceEventType, List[HandlerRegistration]] = defaultdict(
list
)
# Handlers that receive all events
self._global_handlers: List[HandlerRegistration] = []
# Track all handler IDs for quick lookup
self._handler_ids: Set[str] = set()
# Event history (optional, for debugging)
self._history: List[SpaceEvent] = []
self._history_enabled: bool = False
self._max_history: int = 1000
def subscribe(
self,
event_type: Optional[SpaceEventType],
handler: Handler,
priority: int = 100,
) -> str:
"""
Subscribe a handler to events.
Args:
event_type: The event type to subscribe to, or None for all events
handler: The handler function (sync or async)
priority: Handler priority (lower = earlier execution)
Returns:
Handler ID that can be used to unsubscribe
"""
is_async = asyncio.iscoroutinefunction(handler)
registration = HandlerRegistration(
handler=handler,
is_async=is_async,
priority=priority,
)
if event_type is None:
self._global_handlers.append(registration)
self._global_handlers.sort(key=lambda r: r.priority)
else:
self._handlers[event_type].append(registration)
self._handlers[event_type].sort(key=lambda r: r.priority)
self._handler_ids.add(registration.handler_id)
return registration.handler_id
def subscribe_all(self, handler: Handler, priority: int = 100) -> str:
"""
Subscribe a handler to all events.
Args:
handler: The handler function
priority: Handler priority
Returns:
Handler ID
"""
return self.subscribe(None, handler, priority)
def unsubscribe(
self,
event_type: Optional[SpaceEventType],
handler: Handler,
) -> bool:
"""
Unsubscribe a handler from events.
Args:
event_type: The event type, or None for global handlers
handler: The handler to remove
Returns:
True if handler was found and removed
"""
handler_id = str(id(handler))
if event_type is None:
for i, reg in enumerate(self._global_handlers):
if reg.handler_id == handler_id:
self._global_handlers.pop(i)
self._handler_ids.discard(handler_id)
return True
else:
handlers = self._handlers.get(event_type, [])
for i, reg in enumerate(handlers):
if reg.handler_id == handler_id:
handlers.pop(i)
self._handler_ids.discard(handler_id)
return True
return False
def unsubscribe_by_id(self, handler_id: str) -> bool:
"""
Unsubscribe a handler by its ID.
Args:
handler_id: The handler ID returned by subscribe
Returns:
True if handler was found and removed
"""
# Check global handlers
for i, reg in enumerate(self._global_handlers):
if reg.handler_id == handler_id:
self._global_handlers.pop(i)
self._handler_ids.discard(handler_id)
return True
# Check type-specific handlers
for handlers in self._handlers.values():
for i, reg in enumerate(handlers):
if reg.handler_id == handler_id:
handlers.pop(i)
self._handler_ids.discard(handler_id)
return True
return False
def emit(self, event: SpaceEvent) -> List[Exception]:
"""
Emit an event synchronously.
Calls all registered handlers for the event type.
Async handlers are run in a new event loop if needed.
Args:
event: The event to emit
Returns:
List of exceptions raised by handlers (empty if all succeeded)
"""
if self._history_enabled:
self._record_history(event)
exceptions: List[Exception] = []
# Collect all handlers to call
handlers = list(self._global_handlers)
handlers.extend(self._handlers.get(event.event_type, []))
handlers.sort(key=lambda r: r.priority)
for registration in handlers:
try:
if registration.is_async:
# Run async handler
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
# Create a task but don't await it here
asyncio.create_task(registration.handler(event))
else:
asyncio.run(registration.handler(event))
else:
registration.handler(event)
except Exception as e:
logger.exception(
f"Handler {registration.handler_id} raised exception for event {event.event_type}"
)
exceptions.append(e)
return exceptions
async def emit_async(self, event: SpaceEvent) -> List[Exception]:
"""
Emit an event asynchronously.
Args:
event: The event to emit
Returns:
List of exceptions raised by handlers
"""
if self._history_enabled:
self._record_history(event)
exceptions: List[Exception] = []
# Collect all handlers to call
handlers = list(self._global_handlers)
handlers.extend(self._handlers.get(event.event_type, []))
handlers.sort(key=lambda r: r.priority)
for registration in handlers:
try:
if registration.is_async:
await registration.handler(event)
else:
registration.handler(event)
except Exception as e:
logger.exception(
f"Handler {registration.handler_id} raised exception for event {event.event_type}"
)
exceptions.append(e)
return exceptions
def has_handlers(self, event_type: SpaceEventType) -> bool:
"""
Check if there are any handlers for an event type.
Args:
event_type: The event type to check
Returns:
True if there are handlers (including global handlers)
"""
return bool(self._global_handlers) or bool(
self._handlers.get(event_type, [])
)
def handler_count(self, event_type: Optional[SpaceEventType] = None) -> int:
"""
Get the number of handlers registered.
Args:
event_type: Specific event type, or None for total count
Returns:
Number of handlers
"""
if event_type is None:
count = len(self._global_handlers)
for handlers in self._handlers.values():
count += len(handlers)
return count
else:
return len(self._handlers.get(event_type, [])) + len(
self._global_handlers
)
def clear(self, event_type: Optional[SpaceEventType] = None) -> int:
"""
Clear handlers.
Args:
event_type: Specific event type, or None to clear all
Returns:
Number of handlers removed
"""
if event_type is None:
count = self.handler_count()
self._handlers.clear()
self._global_handlers.clear()
self._handler_ids.clear()
return count
else:
handlers = self._handlers.pop(event_type, [])
for reg in handlers:
self._handler_ids.discard(reg.handler_id)
return len(handlers)
# History management
def enable_history(self, max_events: int = 1000) -> None:
"""Enable event history recording."""
self._history_enabled = True
self._max_history = max_events
def disable_history(self) -> None:
"""Disable event history recording."""
self._history_enabled = False
def clear_history(self) -> None:
"""Clear the event history."""
self._history.clear()
def get_history(
self,
event_type: Optional[SpaceEventType] = None,
space_id: Optional[str] = None,
limit: Optional[int] = None,
) -> List[SpaceEvent]:
"""
Get event history.
Args:
event_type: Filter by event type
space_id: Filter by space ID
limit: Maximum number of events to return
Returns:
List of events (newest first)
"""
events = list(reversed(self._history))
if event_type is not None:
events = [e for e in events if e.event_type == event_type]
if space_id is not None:
events = [e for e in events if e.space_id == space_id]
if limit is not None:
events = events[:limit]
return events
def _record_history(self, event: SpaceEvent) -> None:
"""Record an event in history."""
self._history.append(event)
if len(self._history) > self._max_history:
self._history = self._history[-self._max_history :]
# Global event bus instance (optional singleton pattern)
_default_bus: Optional[EventBus] = None
def get_event_bus() -> EventBus:
"""Get the default global event bus instance."""
global _default_bus
if _default_bus is None:
_default_bus = EventBus()
return _default_bus
def reset_event_bus() -> None:
"""Reset the default global event bus (useful for testing)."""
global _default_bus
_default_bus = None

View File

@@ -0,0 +1,239 @@
"""
Event models for Information Spaces.
This module defines the event types and data structures used
by the event system for space operations.
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Optional
import uuid
class SpaceEventType(Enum):
"""
Types of events that can occur in the space system.
Events are organized by category:
- Space lifecycle: creation, updates, deletion, status changes
- Document operations: add, update, remove, move
- Variable operations: set, delete
- Rendering: start, complete, fail
- Sync: start, complete, conflict
"""
# Space lifecycle events
SPACE_CREATED = "space.created"
SPACE_UPDATED = "space.updated"
SPACE_DELETED = "space.deleted"
SPACE_ACTIVATED = "space.activated"
SPACE_ARCHIVED = "space.archived"
# Document events
DOCUMENT_ADDED = "document.added"
DOCUMENT_UPDATED = "document.updated"
DOCUMENT_REMOVED = "document.removed"
DOCUMENT_MOVED = "document.moved"
DOCUMENT_CONTENT_CHANGED = "document.content_changed"
# Variable events
VARIABLE_SET = "variable.set"
VARIABLE_DELETED = "variable.deleted"
# Reference events
REFERENCE_ADDED = "reference.added"
REFERENCE_CLEARED = "reference.cleared"
# Rendering events
RENDER_STARTED = "render.started"
RENDER_COMPLETED = "render.completed"
RENDER_FAILED = "render.failed"
# Sync events
SYNC_STARTED = "sync.started"
SYNC_COMPLETED = "sync.completed"
SYNC_CONFLICT = "sync.conflict"
# Cache events
CACHE_INVALIDATED = "cache.invalidated"
@dataclass
class SpaceEvent:
"""
Represents an event in the space system.
Events are immutable records of operations that have occurred.
They carry enough information for handlers to react appropriately.
Attributes:
event_type: The type of event
space_id: The ID of the affected space
payload: Event-specific data
event_id: Unique identifier for this event
timestamp: When the event occurred
source: Optional identifier of the event source
correlation_id: Optional ID to correlate related events
"""
event_type: SpaceEventType
space_id: str
payload: Dict[str, Any] = field(default_factory=dict)
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
source: Optional[str] = None
correlation_id: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert the event to a dictionary for serialization."""
return {
"event_id": self.event_id,
"event_type": self.event_type.value,
"space_id": self.space_id,
"payload": self.payload,
"timestamp": self.timestamp.isoformat(),
"source": self.source,
"correlation_id": self.correlation_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "SpaceEvent":
"""Create an event from a dictionary."""
return cls(
event_id=data.get("event_id", str(uuid.uuid4())),
event_type=SpaceEventType(data["event_type"]),
space_id=data["space_id"],
payload=data.get("payload", {}),
timestamp=datetime.fromisoformat(data["timestamp"])
if data.get("timestamp")
else datetime.now(),
source=data.get("source"),
correlation_id=data.get("correlation_id"),
)
# Convenience factory functions for common events
def space_created_event(
space_id: str,
name: str,
source: Optional[str] = None,
) -> SpaceEvent:
"""Create a SPACE_CREATED event."""
return SpaceEvent(
event_type=SpaceEventType.SPACE_CREATED,
space_id=space_id,
payload={"name": name},
source=source,
)
def space_updated_event(
space_id: str,
changes: Dict[str, Any],
source: Optional[str] = None,
) -> SpaceEvent:
"""Create a SPACE_UPDATED event."""
return SpaceEvent(
event_type=SpaceEventType.SPACE_UPDATED,
space_id=space_id,
payload={"changes": changes},
source=source,
)
def space_deleted_event(
space_id: str,
name: str,
source: Optional[str] = None,
) -> SpaceEvent:
"""Create a SPACE_DELETED event."""
return SpaceEvent(
event_type=SpaceEventType.SPACE_DELETED,
space_id=space_id,
payload={"name": name},
source=source,
)
def document_added_event(
space_id: str,
document_id: str,
space_path: str,
source: Optional[str] = None,
) -> SpaceEvent:
"""Create a DOCUMENT_ADDED event."""
return SpaceEvent(
event_type=SpaceEventType.DOCUMENT_ADDED,
space_id=space_id,
payload={"document_id": document_id, "space_path": space_path},
source=source,
)
def document_updated_event(
space_id: str,
document_id: str,
changes: Dict[str, Any],
source: Optional[str] = None,
) -> SpaceEvent:
"""Create a DOCUMENT_UPDATED event."""
return SpaceEvent(
event_type=SpaceEventType.DOCUMENT_UPDATED,
space_id=space_id,
payload={"document_id": document_id, "changes": changes},
source=source,
)
def document_removed_event(
space_id: str,
document_id: str,
space_path: str,
source: Optional[str] = None,
) -> SpaceEvent:
"""Create a DOCUMENT_REMOVED event."""
return SpaceEvent(
event_type=SpaceEventType.DOCUMENT_REMOVED,
space_id=space_id,
payload={"document_id": document_id, "space_path": space_path},
source=source,
)
def document_content_changed_event(
space_id: str,
document_id: str,
old_hash: Optional[str],
new_hash: str,
source: Optional[str] = None,
) -> SpaceEvent:
"""Create a DOCUMENT_CONTENT_CHANGED event."""
return SpaceEvent(
event_type=SpaceEventType.DOCUMENT_CONTENT_CHANGED,
space_id=space_id,
payload={
"document_id": document_id,
"old_hash": old_hash,
"new_hash": new_hash,
},
source=source,
)
def cache_invalidated_event(
space_id: str,
document_ids: list,
reason: str,
source: Optional[str] = None,
) -> SpaceEvent:
"""Create a CACHE_INVALIDATED event."""
return SpaceEvent(
event_type=SpaceEventType.CACHE_INVALIDATED,
space_id=space_id,
payload={"document_ids": document_ids, "reason": reason},
source=source,
)

View File

@@ -23,6 +23,17 @@ from ..repositories.interfaces import (
IVariableRepository,
IReferenceRepository,
)
from ..events import (
EventBus,
SpaceEvent,
SpaceEventType,
space_created_event,
space_updated_event,
space_deleted_event,
document_added_event,
document_removed_event,
document_content_changed_event,
)
class SpaceService:
@@ -54,6 +65,7 @@ class SpaceService:
document_repo: IDocumentAssociationRepository,
variable_repo: IVariableRepository,
reference_repo: IReferenceRepository,
event_bus: Optional[EventBus] = None,
):
"""
Initialize the SpaceService.
@@ -63,11 +75,18 @@ class SpaceService:
document_repo: Repository for document associations
variable_repo: Repository for space variables
reference_repo: Repository for transclusion references
event_bus: Optional event bus for emitting events
"""
self._space_repo = space_repo
self._document_repo = document_repo
self._variable_repo = variable_repo
self._reference_repo = reference_repo
self._event_bus = event_bus
def _emit(self, event: SpaceEvent) -> None:
"""Emit an event if event bus is configured."""
if self._event_bus:
self._event_bus.emit(event)
# =========================================================================
# Space CRUD Operations
@@ -114,7 +133,9 @@ class SpaceService:
parent_space_id=parent_space_id,
)
return self._space_repo.create(space)
created_space = self._space_repo.create(space)
self._emit(space_created_event(created_space.id, created_space.name))
return created_space
def get_space(self, space_id: str) -> Optional[InformationSpace]:
"""
@@ -198,7 +219,21 @@ class SpaceService:
if metadata is not None:
space.metadata = metadata
return self._space_repo.update(space)
updated_space = self._space_repo.update(space)
# Build changes dict for event
changes = {}
if name is not None:
changes["name"] = name
if description is not None:
changes["description"] = description
if config is not None:
changes["config"] = "updated"
if metadata is not None:
changes["metadata"] = "updated"
self._emit(space_updated_event(space_id, changes))
return updated_space
def delete_space(self, space_id: str, cascade: bool = True) -> bool:
"""
@@ -230,7 +265,11 @@ class SpaceService:
for child in children:
self.delete_space(child.id, cascade=True)
return self._space_repo.delete(space_id)
space_name = space.name
result = self._space_repo.delete(space_id)
if result:
self._emit(space_deleted_event(space_id, space_name))
return result
def activate_space(self, space_id: str) -> InformationSpace:
"""
@@ -250,7 +289,13 @@ class SpaceService:
raise ValueError(f"Space '{space_id}' not found")
space.activate()
return self._space_repo.update(space)
updated_space = self._space_repo.update(space)
self._emit(SpaceEvent(
event_type=SpaceEventType.SPACE_ACTIVATED,
space_id=space_id,
payload={"status": "active"},
))
return updated_space
def archive_space(self, space_id: str) -> InformationSpace:
"""
@@ -270,7 +315,13 @@ class SpaceService:
raise ValueError(f"Space '{space_id}' not found")
space.archive()
return self._space_repo.update(space)
updated_space = self._space_repo.update(space)
self._emit(SpaceEvent(
event_type=SpaceEventType.SPACE_ARCHIVED,
space_id=space_id,
payload={"status": "archived"},
))
return updated_space
def get_child_spaces(self, parent_space_id: str) -> List[InformationSpace]:
"""
@@ -330,7 +381,9 @@ class SpaceService:
content_hash=content_hash,
)
return self._document_repo.add_document(document)
added_doc = self._document_repo.add_document(document)
self._emit(document_added_event(space_id, added_doc.id, space_path))
return added_doc
def get_document(self, document_id: str) -> Optional[SpaceDocument]:
"""
@@ -384,12 +437,20 @@ class SpaceService:
Returns:
True if removed, False if not found
"""
# Clear any references from this document first
# Get document info before removal for event
document = self._document_repo.get_document(document_id)
if document:
self._reference_repo.clear_references_from(document_id, document.space_id)
if not document:
return False
return self._document_repo.remove_document(document_id)
# Clear any references from this document first
self._reference_repo.clear_references_from(document_id, document.space_id)
result = self._document_repo.remove_document(document_id)
if result:
self._emit(document_removed_event(
document.space_id, document_id, document.space_path
))
return result
def move_document(self, document_id: str, new_path: str) -> SpaceDocument:
"""
@@ -405,9 +466,26 @@ class SpaceService:
Raises:
ValueError: If document not found or new path exists
"""
# Get old path for event
old_doc = self._document_repo.get_document(document_id)
old_path = old_doc.space_path if old_doc else None
if not new_path.startswith("/"):
new_path = "/" + new_path
return self._document_repo.move_document(document_id, new_path)
moved_doc = self._document_repo.move_document(document_id, new_path)
if old_doc:
self._emit(SpaceEvent(
event_type=SpaceEventType.DOCUMENT_MOVED,
space_id=moved_doc.space_id,
payload={
"document_id": document_id,
"old_path": old_path,
"new_path": new_path,
},
))
return moved_doc
def reorder_documents(self, space_id: str, document_ids: List[str]) -> None:
"""
@@ -427,8 +505,20 @@ class SpaceService:
document_id: The document association ID
content_hash: The new content hash
"""
# Get old hash for event
document = self._document_repo.get_document(document_id)
old_hash = document.content_hash if document else None
self._document_repo.update_content_hash(document_id, content_hash)
if document and old_hash != content_hash:
self._emit(document_content_changed_event(
document.space_id,
document_id,
old_hash,
content_hash,
))
# =========================================================================
# Variable Operations
# =========================================================================
@@ -465,7 +555,13 @@ class SpaceService:
scope=scope,
)
return self._variable_repo.set_variable(variable)
saved_var = self._variable_repo.set_variable(variable)
self._emit(SpaceEvent(
event_type=SpaceEventType.VARIABLE_SET,
space_id=space_id,
payload={"name": name, "scope": scope},
))
return saved_var
def get_variable(self, space_id: str, name: str) -> Optional[SpaceVariable]:
"""
@@ -506,7 +602,14 @@ class SpaceService:
Returns:
True if deleted, False if not found
"""
return self._variable_repo.delete_variable(space_id, name)
result = self._variable_repo.delete_variable(space_id, name)
if result:
self._emit(SpaceEvent(
event_type=SpaceEventType.VARIABLE_DELETED,
space_id=space_id,
payload={"name": name},
))
return result
def get_variables_dict(self, space_id: str) -> Dict[str, Any]:
"""

View File

@@ -0,0 +1,407 @@
"""
Integration tests for event propagation in SpaceService.
Tests that events are correctly emitted for all space operations.
"""
import pytest
import tempfile
import os
from unittest.mock import Mock
from markitect.spaces import (
SpaceService,
SqliteSpaceRepository,
SqliteDocumentRepository,
SqliteVariableRepository,
SqliteReferenceRepository,
)
from markitect.spaces.events import (
EventBus,
SpaceEvent,
SpaceEventType,
)
@pytest.fixture
def temp_db():
"""Create a temporary database file for testing."""
fd, path = tempfile.mkstemp(suffix=".db")
os.close(fd)
yield path
if os.path.exists(path):
os.unlink(path)
@pytest.fixture
def event_bus():
"""Create an EventBus for testing."""
return EventBus()
@pytest.fixture
def space_service(temp_db, event_bus):
"""Create a SpaceService with EventBus for testing."""
return SpaceService(
space_repo=SqliteSpaceRepository(temp_db),
document_repo=SqliteDocumentRepository(temp_db),
variable_repo=SqliteVariableRepository(temp_db),
reference_repo=SqliteReferenceRepository(temp_db),
event_bus=event_bus,
)
class TestSpaceEvents:
"""Tests for space lifecycle events."""
def test_space_created_event(self, space_service, event_bus):
"""Test that SPACE_CREATED event is emitted."""
received_events = []
event_bus.subscribe(SpaceEventType.SPACE_CREATED, received_events.append)
space = space_service.create_space(name="test-space")
assert len(received_events) == 1
event = received_events[0]
assert event.event_type == SpaceEventType.SPACE_CREATED
assert event.space_id == space.id
assert event.payload["name"] == "test-space"
def test_space_updated_event(self, space_service, event_bus):
"""Test that SPACE_UPDATED event is emitted."""
space = space_service.create_space(name="original")
received_events = []
event_bus.subscribe(SpaceEventType.SPACE_UPDATED, received_events.append)
space_service.update_space(space.id, name="updated", description="New desc")
assert len(received_events) == 1
event = received_events[0]
assert event.event_type == SpaceEventType.SPACE_UPDATED
assert event.space_id == space.id
assert event.payload["changes"]["name"] == "updated"
assert event.payload["changes"]["description"] == "New desc"
def test_space_deleted_event(self, space_service, event_bus):
"""Test that SPACE_DELETED event is emitted."""
space = space_service.create_space(name="to-delete")
received_events = []
event_bus.subscribe(SpaceEventType.SPACE_DELETED, received_events.append)
space_service.delete_space(space.id)
assert len(received_events) == 1
event = received_events[0]
assert event.event_type == SpaceEventType.SPACE_DELETED
assert event.space_id == space.id
assert event.payload["name"] == "to-delete"
def test_space_activated_event(self, space_service, event_bus):
"""Test that SPACE_ACTIVATED event is emitted."""
space = space_service.create_space(name="to-activate")
received_events = []
event_bus.subscribe(SpaceEventType.SPACE_ACTIVATED, received_events.append)
space_service.activate_space(space.id)
assert len(received_events) == 1
event = received_events[0]
assert event.event_type == SpaceEventType.SPACE_ACTIVATED
assert event.space_id == space.id
def test_space_archived_event(self, space_service, event_bus):
"""Test that SPACE_ARCHIVED event is emitted."""
space = space_service.create_space(name="to-archive")
received_events = []
event_bus.subscribe(SpaceEventType.SPACE_ARCHIVED, received_events.append)
space_service.archive_space(space.id)
assert len(received_events) == 1
event = received_events[0]
assert event.event_type == SpaceEventType.SPACE_ARCHIVED
assert event.space_id == space.id
def test_cascade_delete_emits_multiple_events(self, space_service, event_bus):
"""Test that cascade delete emits events for all deleted spaces."""
parent = space_service.create_space(name="parent")
child1 = space_service.create_space(name="child1", parent_space_id=parent.id)
child2 = space_service.create_space(name="child2", parent_space_id=parent.id)
received_events = []
event_bus.subscribe(SpaceEventType.SPACE_DELETED, received_events.append)
space_service.delete_space(parent.id, cascade=True)
# Should have events for parent and both children
assert len(received_events) == 3
deleted_ids = {e.space_id for e in received_events}
assert parent.id in deleted_ids
assert child1.id in deleted_ids
assert child2.id in deleted_ids
class TestDocumentEvents:
"""Tests for document events."""
def test_document_added_event(self, space_service, event_bus):
"""Test that DOCUMENT_ADDED event is emitted."""
space = space_service.create_space(name="doc-space")
received_events = []
event_bus.subscribe(SpaceEventType.DOCUMENT_ADDED, received_events.append)
doc = space_service.add_document(space.id, "/intro.md")
assert len(received_events) == 1
event = received_events[0]
assert event.event_type == SpaceEventType.DOCUMENT_ADDED
assert event.space_id == space.id
assert event.payload["document_id"] == doc.id
assert event.payload["space_path"] == "/intro.md"
def test_document_removed_event(self, space_service, event_bus):
"""Test that DOCUMENT_REMOVED event is emitted."""
space = space_service.create_space(name="doc-space")
doc = space_service.add_document(space.id, "/to-remove.md")
received_events = []
event_bus.subscribe(SpaceEventType.DOCUMENT_REMOVED, received_events.append)
space_service.remove_document(doc.id)
assert len(received_events) == 1
event = received_events[0]
assert event.event_type == SpaceEventType.DOCUMENT_REMOVED
assert event.space_id == space.id
assert event.payload["document_id"] == doc.id
assert event.payload["space_path"] == "/to-remove.md"
def test_document_moved_event(self, space_service, event_bus):
"""Test that DOCUMENT_MOVED event is emitted."""
space = space_service.create_space(name="doc-space")
doc = space_service.add_document(space.id, "/old-path.md")
received_events = []
event_bus.subscribe(SpaceEventType.DOCUMENT_MOVED, received_events.append)
space_service.move_document(doc.id, "/new-path.md")
assert len(received_events) == 1
event = received_events[0]
assert event.event_type == SpaceEventType.DOCUMENT_MOVED
assert event.space_id == space.id
assert event.payload["old_path"] == "/old-path.md"
assert event.payload["new_path"] == "/new-path.md"
def test_document_content_changed_event(self, space_service, event_bus):
"""Test that DOCUMENT_CONTENT_CHANGED event is emitted."""
space = space_service.create_space(name="doc-space")
doc = space_service.add_document(
space.id, "/content.md", content_hash="hash-v1"
)
received_events = []
event_bus.subscribe(
SpaceEventType.DOCUMENT_CONTENT_CHANGED, received_events.append
)
space_service.update_document_hash(doc.id, "hash-v2")
assert len(received_events) == 1
event = received_events[0]
assert event.event_type == SpaceEventType.DOCUMENT_CONTENT_CHANGED
assert event.space_id == space.id
assert event.payload["old_hash"] == "hash-v1"
assert event.payload["new_hash"] == "hash-v2"
def test_no_event_if_hash_unchanged(self, space_service, event_bus):
"""Test that no event is emitted if hash is unchanged."""
space = space_service.create_space(name="doc-space")
doc = space_service.add_document(
space.id, "/content.md", content_hash="same-hash"
)
received_events = []
event_bus.subscribe(
SpaceEventType.DOCUMENT_CONTENT_CHANGED, received_events.append
)
space_service.update_document_hash(doc.id, "same-hash")
assert len(received_events) == 0
class TestVariableEvents:
"""Tests for variable events."""
def test_variable_set_event(self, space_service, event_bus):
"""Test that VARIABLE_SET event is emitted."""
space = space_service.create_space(name="var-space")
received_events = []
event_bus.subscribe(SpaceEventType.VARIABLE_SET, received_events.append)
space_service.set_variable(space.id, "version", "1.0.0")
assert len(received_events) == 1
event = received_events[0]
assert event.event_type == SpaceEventType.VARIABLE_SET
assert event.space_id == space.id
assert event.payload["name"] == "version"
def test_variable_deleted_event(self, space_service, event_bus):
"""Test that VARIABLE_DELETED event is emitted."""
space = space_service.create_space(name="var-space")
space_service.set_variable(space.id, "temp", "value")
received_events = []
event_bus.subscribe(SpaceEventType.VARIABLE_DELETED, received_events.append)
space_service.delete_variable(space.id, "temp")
assert len(received_events) == 1
event = received_events[0]
assert event.event_type == SpaceEventType.VARIABLE_DELETED
assert event.space_id == space.id
assert event.payload["name"] == "temp"
def test_no_event_if_variable_not_found(self, space_service, event_bus):
"""Test that no event is emitted if variable doesn't exist."""
space = space_service.create_space(name="var-space")
received_events = []
event_bus.subscribe(SpaceEventType.VARIABLE_DELETED, received_events.append)
space_service.delete_variable(space.id, "non-existent")
assert len(received_events) == 0
class TestEventHandlerIntegration:
"""Tests for event handler integration patterns."""
def test_global_handler_receives_all_events(self, space_service, event_bus):
"""Test that a global handler receives all event types."""
received_events = []
event_bus.subscribe_all(received_events.append)
space = space_service.create_space(name="test")
space_service.add_document(space.id, "/doc.md")
space_service.set_variable(space.id, "var", "value")
space_service.delete_space(space.id)
# Should have: SPACE_CREATED, DOCUMENT_ADDED, VARIABLE_SET, SPACE_DELETED
assert len(received_events) >= 4
event_types = {e.event_type for e in received_events}
assert SpaceEventType.SPACE_CREATED in event_types
assert SpaceEventType.DOCUMENT_ADDED in event_types
assert SpaceEventType.VARIABLE_SET in event_types
assert SpaceEventType.SPACE_DELETED in event_types
def test_event_history_records_all_events(self, space_service, event_bus):
"""Test that event history captures all events."""
event_bus.enable_history()
space = space_service.create_space(name="history-test")
space_service.add_document(space.id, "/doc1.md")
space_service.add_document(space.id, "/doc2.md")
space_service.activate_space(space.id)
history = event_bus.get_history()
assert len(history) == 4
# Newest first
assert history[0].event_type == SpaceEventType.SPACE_ACTIVATED
assert history[1].event_type == SpaceEventType.DOCUMENT_ADDED
assert history[2].event_type == SpaceEventType.DOCUMENT_ADDED
assert history[3].event_type == SpaceEventType.SPACE_CREATED
def test_filter_history_by_space(self, space_service, event_bus):
"""Test filtering event history by space ID."""
event_bus.enable_history()
space1 = space_service.create_space(name="space1")
space2 = space_service.create_space(name="space2")
space_service.add_document(space1.id, "/doc.md")
space_service.add_document(space2.id, "/doc.md")
history_space1 = event_bus.get_history(space_id=space1.id)
assert len(history_space1) == 2 # create + add_document
history_space2 = event_bus.get_history(space_id=space2.id)
assert len(history_space2) == 2
def test_multiple_handlers_for_same_event(self, space_service, event_bus):
"""Test that multiple handlers can listen to the same event."""
handler1_events = []
handler2_events = []
event_bus.subscribe(SpaceEventType.SPACE_CREATED, handler1_events.append)
event_bus.subscribe(SpaceEventType.SPACE_CREATED, handler2_events.append)
space_service.create_space(name="multi-handler-test")
assert len(handler1_events) == 1
assert len(handler2_events) == 1
assert handler1_events[0].event_id == handler2_events[0].event_id
class TestEventDrivenWorkflow:
"""Tests for event-driven workflow patterns."""
def test_cache_invalidation_pattern(self, space_service, event_bus):
"""Test a cache invalidation pattern using events."""
cache = {}
def on_content_changed(event: SpaceEvent):
doc_id = event.payload["document_id"]
if doc_id in cache:
del cache[doc_id]
event_bus.subscribe(
SpaceEventType.DOCUMENT_CONTENT_CHANGED, on_content_changed
)
# Setup
space = space_service.create_space(name="cache-test")
doc = space_service.add_document(space.id, "/doc.md", content_hash="v1")
# Simulate cached content
cache[doc.id] = "cached content"
# Update triggers cache invalidation via event
space_service.update_document_hash(doc.id, "v2")
assert doc.id not in cache
def test_audit_log_pattern(self, space_service, event_bus):
"""Test an audit log pattern using events."""
audit_log = []
def log_event(event: SpaceEvent):
audit_log.append({
"event_type": event.event_type.value,
"space_id": event.space_id,
"timestamp": event.timestamp.isoformat(),
})
event_bus.subscribe_all(log_event)
space = space_service.create_space(name="audit-test")
space_service.activate_space(space.id)
space_service.archive_space(space.id)
space_service.delete_space(space.id)
# Should have complete audit trail
assert len(audit_log) == 4
event_types = [entry["event_type"] for entry in audit_log]
assert "space.created" in event_types
assert "space.activated" in event_types
assert "space.archived" in event_types
assert "space.deleted" in event_types

View File

@@ -0,0 +1,581 @@
"""
Unit tests for the space event system.
Tests the event models and EventBus functionality.
"""
import pytest
import asyncio
from datetime import datetime
from unittest.mock import Mock, AsyncMock
from markitect.spaces.events import (
SpaceEvent,
SpaceEventType,
EventBus,
get_event_bus,
reset_event_bus,
space_created_event,
space_updated_event,
space_deleted_event,
document_added_event,
document_updated_event,
document_removed_event,
document_content_changed_event,
cache_invalidated_event,
)
class TestSpaceEventType:
"""Tests for SpaceEventType enum."""
def test_event_type_values(self):
"""Test that event types have expected string values."""
assert SpaceEventType.SPACE_CREATED.value == "space.created"
assert SpaceEventType.DOCUMENT_ADDED.value == "document.added"
assert SpaceEventType.CACHE_INVALIDATED.value == "cache.invalidated"
def test_all_event_types_have_values(self):
"""Test that all event types have non-empty values."""
for event_type in SpaceEventType:
assert event_type.value
assert "." in event_type.value
class TestSpaceEvent:
"""Tests for SpaceEvent dataclass."""
def test_event_creation(self):
"""Test basic event creation."""
event = SpaceEvent(
event_type=SpaceEventType.SPACE_CREATED,
space_id="space-123",
payload={"name": "test-space"},
)
assert event.event_type == SpaceEventType.SPACE_CREATED
assert event.space_id == "space-123"
assert event.payload["name"] == "test-space"
assert event.event_id is not None
assert event.timestamp is not None
def test_event_with_source_and_correlation(self):
"""Test event with source and correlation ID."""
event = SpaceEvent(
event_type=SpaceEventType.DOCUMENT_ADDED,
space_id="space-123",
source="api",
correlation_id="req-456",
)
assert event.source == "api"
assert event.correlation_id == "req-456"
def test_event_to_dict(self):
"""Test event serialization."""
event = SpaceEvent(
event_type=SpaceEventType.SPACE_UPDATED,
space_id="space-123",
payload={"changes": {"name": "new-name"}},
source="cli",
)
data = event.to_dict()
assert data["event_type"] == "space.updated"
assert data["space_id"] == "space-123"
assert data["payload"]["changes"]["name"] == "new-name"
assert data["source"] == "cli"
assert "event_id" in data
assert "timestamp" in data
def test_event_from_dict(self):
"""Test event deserialization."""
data = {
"event_id": "evt-123",
"event_type": "document.added",
"space_id": "space-456",
"payload": {"document_id": "doc-1", "space_path": "/intro.md"},
"timestamp": "2025-01-15T10:30:00",
"source": "sync",
}
event = SpaceEvent.from_dict(data)
assert event.event_id == "evt-123"
assert event.event_type == SpaceEventType.DOCUMENT_ADDED
assert event.space_id == "space-456"
assert event.payload["document_id"] == "doc-1"
assert event.source == "sync"
def test_event_roundtrip(self):
"""Test that to_dict and from_dict are inverses."""
original = SpaceEvent(
event_type=SpaceEventType.RENDER_COMPLETED,
space_id="space-123",
payload={"output_path": "/tmp/output"},
source="renderer",
correlation_id="batch-1",
)
data = original.to_dict()
restored = SpaceEvent.from_dict(data)
assert restored.event_type == original.event_type
assert restored.space_id == original.space_id
assert restored.payload == original.payload
assert restored.source == original.source
assert restored.correlation_id == original.correlation_id
class TestEventFactories:
"""Tests for event factory functions."""
def test_space_created_event(self):
"""Test space_created_event factory."""
event = space_created_event("space-1", "my-docs", source="api")
assert event.event_type == SpaceEventType.SPACE_CREATED
assert event.space_id == "space-1"
assert event.payload["name"] == "my-docs"
assert event.source == "api"
def test_space_updated_event(self):
"""Test space_updated_event factory."""
event = space_updated_event(
"space-1",
changes={"description": "Updated description"},
)
assert event.event_type == SpaceEventType.SPACE_UPDATED
assert event.payload["changes"]["description"] == "Updated description"
def test_space_deleted_event(self):
"""Test space_deleted_event factory."""
event = space_deleted_event("space-1", "old-space")
assert event.event_type == SpaceEventType.SPACE_DELETED
assert event.payload["name"] == "old-space"
def test_document_added_event(self):
"""Test document_added_event factory."""
event = document_added_event("space-1", "doc-1", "/intro.md")
assert event.event_type == SpaceEventType.DOCUMENT_ADDED
assert event.payload["document_id"] == "doc-1"
assert event.payload["space_path"] == "/intro.md"
def test_document_content_changed_event(self):
"""Test document_content_changed_event factory."""
event = document_content_changed_event(
"space-1", "doc-1", old_hash="abc", new_hash="xyz"
)
assert event.event_type == SpaceEventType.DOCUMENT_CONTENT_CHANGED
assert event.payload["old_hash"] == "abc"
assert event.payload["new_hash"] == "xyz"
def test_cache_invalidated_event(self):
"""Test cache_invalidated_event factory."""
event = cache_invalidated_event(
"space-1",
document_ids=["doc-1", "doc-2"],
reason="dependency_changed",
)
assert event.event_type == SpaceEventType.CACHE_INVALIDATED
assert event.payload["document_ids"] == ["doc-1", "doc-2"]
assert event.payload["reason"] == "dependency_changed"
class TestEventBus:
"""Tests for EventBus."""
@pytest.fixture
def bus(self):
"""Create a fresh EventBus for each test."""
return EventBus()
def test_subscribe_and_emit(self, bus):
"""Test basic subscribe and emit."""
received_events = []
def handler(event: SpaceEvent):
received_events.append(event)
bus.subscribe(SpaceEventType.SPACE_CREATED, handler)
event = space_created_event("space-1", "test")
bus.emit(event)
assert len(received_events) == 1
assert received_events[0].event_type == SpaceEventType.SPACE_CREATED
def test_subscribe_returns_handler_id(self, bus):
"""Test that subscribe returns a handler ID."""
handler_id = bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None)
assert handler_id is not None
assert isinstance(handler_id, str)
def test_unsubscribe(self, bus):
"""Test unsubscribing a handler."""
received_events = []
def handler(event: SpaceEvent):
received_events.append(event)
bus.subscribe(SpaceEventType.SPACE_CREATED, handler)
bus.emit(space_created_event("s1", "test1"))
assert len(received_events) == 1
result = bus.unsubscribe(SpaceEventType.SPACE_CREATED, handler)
assert result is True
bus.emit(space_created_event("s2", "test2"))
assert len(received_events) == 1 # No new events
def test_unsubscribe_by_id(self, bus):
"""Test unsubscribing by handler ID."""
handler = Mock()
handler_id = bus.subscribe(SpaceEventType.SPACE_CREATED, handler)
bus.emit(space_created_event("s1", "test"))
assert handler.call_count == 1
result = bus.unsubscribe_by_id(handler_id)
assert result is True
bus.emit(space_created_event("s2", "test"))
assert handler.call_count == 1 # No additional calls
def test_multiple_handlers(self, bus):
"""Test multiple handlers for same event type."""
handler1 = Mock()
handler2 = Mock()
bus.subscribe(SpaceEventType.SPACE_CREATED, handler1)
bus.subscribe(SpaceEventType.SPACE_CREATED, handler2)
bus.emit(space_created_event("s1", "test"))
handler1.assert_called_once()
handler2.assert_called_once()
def test_handler_receives_correct_event(self, bus):
"""Test that handlers receive the emitted event."""
received = []
def handler(event: SpaceEvent):
received.append(event)
bus.subscribe(SpaceEventType.DOCUMENT_ADDED, handler)
event = document_added_event("s1", "doc-1", "/intro.md")
bus.emit(event)
assert len(received) == 1
assert received[0].event_id == event.event_id
def test_handler_isolation_by_event_type(self, bus):
"""Test that handlers only receive their subscribed event types."""
space_handler = Mock()
doc_handler = Mock()
bus.subscribe(SpaceEventType.SPACE_CREATED, space_handler)
bus.subscribe(SpaceEventType.DOCUMENT_ADDED, doc_handler)
bus.emit(space_created_event("s1", "test"))
space_handler.assert_called_once()
doc_handler.assert_not_called()
def test_global_handler(self, bus):
"""Test handlers subscribed to all events."""
global_handler = Mock()
specific_handler = Mock()
bus.subscribe_all(global_handler)
bus.subscribe(SpaceEventType.SPACE_CREATED, specific_handler)
bus.emit(space_created_event("s1", "test"))
bus.emit(document_added_event("s1", "doc-1", "/intro.md"))
assert global_handler.call_count == 2
assert specific_handler.call_count == 1
def test_handler_priority(self, bus):
"""Test that handlers are called in priority order."""
call_order = []
def handler_low(event):
call_order.append("low")
def handler_high(event):
call_order.append("high")
def handler_medium(event):
call_order.append("medium")
bus.subscribe(SpaceEventType.SPACE_CREATED, handler_low, priority=100)
bus.subscribe(SpaceEventType.SPACE_CREATED, handler_high, priority=10)
bus.subscribe(SpaceEventType.SPACE_CREATED, handler_medium, priority=50)
bus.emit(space_created_event("s1", "test"))
assert call_order == ["high", "medium", "low"]
def test_handler_exception_does_not_stop_others(self, bus):
"""Test that one handler's exception doesn't prevent other handlers."""
handler1 = Mock()
handler2 = Mock(side_effect=ValueError("Test error"))
handler3 = Mock()
bus.subscribe(SpaceEventType.SPACE_CREATED, handler1, priority=1)
bus.subscribe(SpaceEventType.SPACE_CREATED, handler2, priority=2)
bus.subscribe(SpaceEventType.SPACE_CREATED, handler3, priority=3)
exceptions = bus.emit(space_created_event("s1", "test"))
handler1.assert_called_once()
handler2.assert_called_once()
handler3.assert_called_once()
assert len(exceptions) == 1
assert isinstance(exceptions[0], ValueError)
def test_has_handlers(self, bus):
"""Test checking for handlers."""
assert bus.has_handlers(SpaceEventType.SPACE_CREATED) is False
bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None)
assert bus.has_handlers(SpaceEventType.SPACE_CREATED) is True
assert bus.has_handlers(SpaceEventType.DOCUMENT_ADDED) is False
def test_has_handlers_with_global(self, bus):
"""Test that has_handlers considers global handlers."""
assert bus.has_handlers(SpaceEventType.DOCUMENT_ADDED) is False
bus.subscribe_all(lambda e: None)
assert bus.has_handlers(SpaceEventType.DOCUMENT_ADDED) is True
def test_handler_count(self, bus):
"""Test counting handlers."""
assert bus.handler_count() == 0
assert bus.handler_count(SpaceEventType.SPACE_CREATED) == 0
bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None)
bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None)
bus.subscribe(SpaceEventType.DOCUMENT_ADDED, lambda e: None)
assert bus.handler_count() == 3
assert bus.handler_count(SpaceEventType.SPACE_CREATED) == 2
assert bus.handler_count(SpaceEventType.DOCUMENT_ADDED) == 1
def test_clear_all(self, bus):
"""Test clearing all handlers."""
bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None)
bus.subscribe(SpaceEventType.DOCUMENT_ADDED, lambda e: None)
bus.subscribe_all(lambda e: None)
count = bus.clear()
assert count == 3
assert bus.handler_count() == 0
def test_clear_by_event_type(self, bus):
"""Test clearing handlers for specific event type."""
bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None)
bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None)
bus.subscribe(SpaceEventType.DOCUMENT_ADDED, lambda e: None)
count = bus.clear(SpaceEventType.SPACE_CREATED)
assert count == 2
assert bus.handler_count(SpaceEventType.SPACE_CREATED) == 0
assert bus.handler_count(SpaceEventType.DOCUMENT_ADDED) == 1
class TestEventBusHistory:
"""Tests for EventBus history feature."""
@pytest.fixture
def bus(self):
"""Create a fresh EventBus with history enabled."""
bus = EventBus()
bus.enable_history()
return bus
def test_history_disabled_by_default(self):
"""Test that history is disabled by default."""
bus = EventBus()
bus.emit(space_created_event("s1", "test"))
history = bus.get_history()
assert len(history) == 0
def test_history_records_events(self, bus):
"""Test that history records emitted events."""
bus.emit(space_created_event("s1", "test1"))
bus.emit(space_created_event("s2", "test2"))
history = bus.get_history()
assert len(history) == 2
def test_history_newest_first(self, bus):
"""Test that history returns newest events first."""
bus.emit(space_created_event("s1", "first"))
bus.emit(space_created_event("s2", "second"))
history = bus.get_history()
assert history[0].payload["name"] == "second"
assert history[1].payload["name"] == "first"
def test_history_filter_by_event_type(self, bus):
"""Test filtering history by event type."""
bus.emit(space_created_event("s1", "test"))
bus.emit(document_added_event("s1", "doc-1", "/intro.md"))
bus.emit(space_created_event("s2", "test2"))
history = bus.get_history(event_type=SpaceEventType.SPACE_CREATED)
assert len(history) == 2
def test_history_filter_by_space_id(self, bus):
"""Test filtering history by space ID."""
bus.emit(space_created_event("s1", "test1"))
bus.emit(space_created_event("s2", "test2"))
bus.emit(document_added_event("s1", "doc-1", "/intro.md"))
history = bus.get_history(space_id="s1")
assert len(history) == 2
def test_history_limit(self, bus):
"""Test history limit."""
for i in range(10):
bus.emit(space_created_event(f"s{i}", f"test{i}"))
history = bus.get_history(limit=3)
assert len(history) == 3
def test_history_max_size(self):
"""Test that history respects max size."""
bus = EventBus()
bus.enable_history(max_events=5)
for i in range(10):
bus.emit(space_created_event(f"s{i}", f"test{i}"))
history = bus.get_history()
assert len(history) == 5
# Should have the most recent events
assert history[0].payload["name"] == "test9"
def test_clear_history(self, bus):
"""Test clearing history."""
bus.emit(space_created_event("s1", "test"))
bus.emit(space_created_event("s2", "test"))
bus.clear_history()
history = bus.get_history()
assert len(history) == 0
def test_disable_history(self, bus):
"""Test disabling history."""
bus.emit(space_created_event("s1", "test"))
bus.disable_history()
bus.emit(space_created_event("s2", "test"))
history = bus.get_history()
assert len(history) == 1
class TestEventBusAsync:
"""Tests for async EventBus functionality."""
@pytest.fixture
def bus(self):
"""Create a fresh EventBus."""
return EventBus()
@pytest.mark.asyncio
async def test_async_handler(self, bus):
"""Test async event handler."""
received = []
async def async_handler(event: SpaceEvent):
await asyncio.sleep(0.01)
received.append(event)
bus.subscribe(SpaceEventType.SPACE_CREATED, async_handler)
await bus.emit_async(space_created_event("s1", "test"))
assert len(received) == 1
@pytest.mark.asyncio
async def test_mixed_sync_async_handlers(self, bus):
"""Test mixing sync and async handlers."""
sync_received = []
async_received = []
def sync_handler(event: SpaceEvent):
sync_received.append(event)
async def async_handler(event: SpaceEvent):
await asyncio.sleep(0.01)
async_received.append(event)
bus.subscribe(SpaceEventType.SPACE_CREATED, sync_handler)
bus.subscribe(SpaceEventType.SPACE_CREATED, async_handler)
await bus.emit_async(space_created_event("s1", "test"))
assert len(sync_received) == 1
assert len(async_received) == 1
@pytest.mark.asyncio
async def test_async_handler_exception(self, bus):
"""Test that async handler exceptions are caught."""
async def failing_handler(event: SpaceEvent):
raise ValueError("Async error")
bus.subscribe(SpaceEventType.SPACE_CREATED, failing_handler)
exceptions = await bus.emit_async(space_created_event("s1", "test"))
assert len(exceptions) == 1
assert isinstance(exceptions[0], ValueError)
class TestGlobalEventBus:
"""Tests for global event bus singleton."""
def test_get_event_bus_returns_same_instance(self):
"""Test that get_event_bus returns the same instance."""
reset_event_bus()
bus1 = get_event_bus()
bus2 = get_event_bus()
assert bus1 is bus2
def test_reset_event_bus(self):
"""Test that reset_event_bus creates new instance."""
bus1 = get_event_bus()
reset_event_bus()
bus2 = get_event_bus()
assert bus1 is not bus2
def test_reset_clears_handlers(self):
"""Test that reset clears all handlers."""
bus = get_event_bus()
bus.subscribe(SpaceEventType.SPACE_CREATED, lambda e: None)
assert bus.handler_count() == 1
reset_event_bus()
new_bus = get_event_bus()
assert new_bus.handler_count() == 0