Files
markitect-main/markitect/spaces/sync/importer.py
tegwick 535b83996b feat(spaces): implement Phase 5 Directory Sync Mode
Implements directory synchronization for Information Spaces:

- SpaceDirectoryExporter: Export space to directory structure
  - Multiple variants: flat, hierarchical, by_path
  - Manifest generation for reimport
  - Incremental export (skip unchanged files)
  - Metadata file export
  - IncrementalExporter for change detection

- DirectorySpaceImporter: Import directory content into space
  - Recursive directory scanning
  - Multiple file pattern support
  - Conflict detection with strategies (skip/overwrite/rename)
  - Manifest-based import for intelligent reimport
  - Structure preservation in space paths

- BidirectionalSyncCoordinator: Two-way sync with conflict detection
  - Sync directions: space-to-directory, directory-to-space, bidirectional
  - Conflict resolution strategies: space_wins, directory_wins, newer_wins, manual, skip
  - Dry-run mode for preview
  - Orphan cleanup option
  - Event emission for progress tracking

45 unit tests covering all sync components.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-08 12:11:37 +01:00

473 lines
15 KiB
Python

"""
Directory to Space Importer.
Imports directory content into an Information Space, handling
various directory structures and conflict detection.
"""
import hashlib
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional, List, Set, Tuple
from ..models import InformationSpace, SpaceDocument, SpaceStatus
from ..events import EventBus, SpaceEventType, SpaceEvent
logger = logging.getLogger(__name__)
@dataclass
class ImportConfig:
"""
Configuration for directory import.
Attributes:
file_patterns: Glob patterns for files to import (default: *.md)
recursive: Whether to import recursively
ignore_patterns: Patterns to ignore
preserve_structure: Whether to preserve directory structure in space_path
conflict_strategy: How to handle conflicts ('skip', 'overwrite', 'rename')
import_metadata: Whether to read .markitect-* metadata files
"""
file_patterns: List[str] = field(default_factory=lambda: ["*.md", "*.markdown"])
recursive: bool = True
ignore_patterns: List[str] = field(
default_factory=lambda: [".*", "__pycache__", "node_modules"]
)
preserve_structure: bool = True
conflict_strategy: str = "skip" # skip, overwrite, rename
import_metadata: bool = True
@dataclass
class ImportedDocument:
"""
Record of an imported document.
Attributes:
file_path: Source file path
space_path: Path in space
document_id: Assigned document ID
content_hash: Hash of imported content
size: Content size in bytes
is_new: Whether this is a new document
"""
file_path: Path
space_path: str
document_id: str
content_hash: str
size: int
is_new: bool = True
@dataclass
class ImportConflict:
"""
Record of an import conflict.
Attributes:
file_path: Source file path
space_path: Target space path
reason: Conflict reason
resolution: How conflict was resolved
"""
file_path: Path
space_path: str
reason: str
resolution: str
@dataclass
class ImportResult:
"""
Result of an import operation.
Attributes:
source_directory: Imported directory
space_id: Target space ID (if existing)
imported_documents: Successfully imported documents
conflicts: Conflicts encountered
errors: Any errors
space_metadata: Imported space metadata if found
duration_ms: Import duration in milliseconds
"""
source_directory: Path
space_id: Optional[str] = None
imported_documents: List[ImportedDocument] = field(default_factory=list)
conflicts: List[ImportConflict] = field(default_factory=list)
errors: Dict[str, str] = field(default_factory=dict)
space_metadata: Optional[Dict[str, Any]] = None
duration_ms: int = 0
@property
def success(self) -> bool:
"""Check if import was successful."""
return len(self.errors) == 0
@property
def document_count(self) -> int:
"""Total number of imported documents."""
return len(self.imported_documents)
class DirectorySpaceImporter:
"""
Imports directory content into Information Space.
Features:
- Multiple file pattern support
- Recursive directory scanning
- Conflict detection and resolution
- Metadata file handling
- Event emission for progress tracking
"""
def __init__(
self,
config: Optional[ImportConfig] = None,
event_bus: Optional[EventBus] = None,
):
"""
Initialize the importer.
Args:
config: Import configuration
event_bus: Event bus for notifications
"""
self.config = config or ImportConfig()
self.event_bus = event_bus
def scan_directory(self, source_directory: Path) -> List[Path]:
"""
Scan directory for importable files.
Args:
source_directory: Directory to scan
Returns:
List of file paths to import
"""
if not source_directory.exists():
raise ValueError(f"Directory does not exist: {source_directory}")
files = []
for pattern in self.config.file_patterns:
if self.config.recursive:
matches = source_directory.rglob(pattern)
else:
matches = source_directory.glob(pattern)
for path in matches:
if self._should_include(path, source_directory):
files.append(path)
return sorted(files)
def import_directory(
self,
source_directory: Path,
existing_documents: Optional[Dict[str, SpaceDocument]] = None,
document_creator: Optional[callable] = None,
) -> ImportResult:
"""
Import directory content.
Args:
source_directory: Directory to import
existing_documents: Map of space_path to existing documents
document_creator: Function(space_path, content) -> document_id
Returns:
ImportResult with details of the import
"""
start_time = datetime.now()
result = ImportResult(source_directory=source_directory)
existing_documents = existing_documents or {}
self._emit_event(
SpaceEventType.SYNC_STARTED,
result.space_id or "pending",
{"direction": "import", "source": str(source_directory)},
)
try:
# Load space metadata if available
result.space_metadata = self._load_space_metadata(source_directory)
if result.space_metadata:
result.space_id = result.space_metadata.get("id")
# Scan for files
files = self.scan_directory(source_directory)
# Import each file
for file_path in files:
try:
imported = self._import_file(
file_path,
source_directory,
existing_documents,
document_creator,
result,
)
if imported:
result.imported_documents.append(imported)
except Exception as e:
logger.error(f"Failed to import {file_path}: {e}")
result.errors[str(file_path)] = str(e)
# Calculate duration
end_time = datetime.now()
result.duration_ms = int((end_time - start_time).total_seconds() * 1000)
self._emit_event(
SpaceEventType.SYNC_COMPLETED,
result.space_id or "imported",
{
"direction": "import",
"document_count": result.document_count,
"conflicts": len(result.conflicts),
"errors": len(result.errors),
},
)
except Exception as e:
logger.error(f"Import failed: {e}")
result.errors["_import"] = str(e)
return result
def _should_include(self, path: Path, base_dir: Path) -> bool:
"""Check if a path should be included in import."""
# Skip directories
if path.is_dir():
return False
# Check ignore patterns
relative_parts = path.relative_to(base_dir).parts
for pattern in self.config.ignore_patterns:
for part in relative_parts:
if part.startswith(pattern.rstrip("*")):
return False
if pattern.startswith(".") and part.startswith("."):
return False
return True
def _import_file(
self,
file_path: Path,
source_directory: Path,
existing_documents: Dict[str, SpaceDocument],
document_creator: Optional[callable],
result: ImportResult,
) -> Optional[ImportedDocument]:
"""Import a single file."""
# Determine space path
space_path = self._file_to_space_path(file_path, source_directory)
# Read content
try:
content = file_path.read_text(encoding="utf-8")
except Exception as e:
raise ValueError(f"Failed to read file: {e}")
content_hash = self._compute_hash(content)
size = len(content.encode("utf-8"))
# Check for existing document
existing_doc = existing_documents.get(space_path)
if existing_doc:
# Handle conflict
conflict = self._handle_conflict(
file_path, space_path, content_hash, existing_doc
)
if conflict:
result.conflicts.append(conflict)
if conflict.resolution == "skip":
return None
# Create or update document
is_new = existing_doc is None
if document_creator:
document_id = document_creator(space_path, content)
else:
# Generate a simple document ID
document_id = self._generate_document_id(space_path)
return ImportedDocument(
file_path=file_path,
space_path=space_path,
document_id=document_id,
content_hash=content_hash,
size=size,
is_new=is_new,
)
def _file_to_space_path(self, file_path: Path, source_directory: Path) -> str:
"""Convert file path to space path."""
if self.config.preserve_structure:
relative = file_path.relative_to(source_directory)
return "/" + str(relative).replace("\\", "/")
else:
return "/" + file_path.name
def _handle_conflict(
self,
file_path: Path,
space_path: str,
content_hash: str,
existing_doc: SpaceDocument,
) -> Optional[ImportConflict]:
"""Handle a conflict with existing document."""
# Check if content is actually different
existing_hash = getattr(existing_doc, "content_hash", None)
if existing_hash == content_hash:
return None # No actual conflict, content is same
if self.config.conflict_strategy == "skip":
return ImportConflict(
file_path=file_path,
space_path=space_path,
reason="document_exists",
resolution="skip",
)
elif self.config.conflict_strategy == "overwrite":
return ImportConflict(
file_path=file_path,
space_path=space_path,
reason="document_exists",
resolution="overwrite",
)
elif self.config.conflict_strategy == "rename":
return ImportConflict(
file_path=file_path,
space_path=space_path,
reason="document_exists",
resolution="rename",
)
return None
def _generate_document_id(self, space_path: str) -> str:
"""Generate a document ID from space path."""
import uuid
return str(uuid.uuid4())
def _compute_hash(self, content: str) -> str:
"""Compute hash of content."""
return hashlib.sha256(content.encode("utf-8")).hexdigest()[:16]
def _load_space_metadata(self, directory: Path) -> Optional[Dict[str, Any]]:
"""Load space metadata from .markitect-space.json."""
if not self.config.import_metadata:
return None
metadata_path = directory / ".markitect-space.json"
if metadata_path.exists():
try:
return json.loads(metadata_path.read_text(encoding="utf-8"))
except Exception as e:
logger.warning(f"Failed to load space metadata: {e}")
return None
def _load_manifest(self, directory: Path) -> Optional[Dict[str, Any]]:
"""Load export manifest from .markitect-manifest.json."""
manifest_path = directory / ".markitect-manifest.json"
if manifest_path.exists():
try:
return json.loads(manifest_path.read_text(encoding="utf-8"))
except Exception as e:
logger.warning(f"Failed to load manifest: {e}")
return None
def _emit_event(
self, event_type: SpaceEventType, space_id: str, payload: Dict[str, Any]
) -> None:
"""Emit an event if event bus is available."""
if not self.event_bus:
return
event = SpaceEvent(
event_type=event_type,
space_id=space_id,
payload=payload,
)
self.event_bus.emit(event)
class ManifestImporter(DirectorySpaceImporter):
"""
Importer that uses manifest for intelligent reimport.
Uses the .markitect-manifest.json to detect changes and
only import modified files.
"""
def __init__(
self,
config: Optional[ImportConfig] = None,
event_bus: Optional[EventBus] = None,
):
"""Initialize manifest-aware importer."""
super().__init__(config, event_bus)
self._manifest: Optional[Dict[str, Any]] = None
def import_with_manifest(
self,
source_directory: Path,
existing_documents: Optional[Dict[str, SpaceDocument]] = None,
document_creator: Optional[callable] = None,
) -> ImportResult:
"""
Import using manifest for change detection.
Args:
source_directory: Directory to import
existing_documents: Existing documents
document_creator: Document creator function
Returns:
ImportResult
"""
# Load manifest
self._manifest = self._load_manifest(source_directory)
if self._manifest:
logger.info(
f"Using manifest from previous export at "
f"{self._manifest.get('exported_at', 'unknown')}"
)
return self.import_directory(
source_directory, existing_documents, document_creator
)
def _get_manifest_hash(self, space_path: str) -> Optional[str]:
"""Get content hash from manifest for a space path."""
if not self._manifest:
return None
for file_info in self._manifest.get("files", []):
if file_info.get("space_path") == space_path:
return file_info.get("content_hash")
return None
def has_changed(self, space_path: str, current_hash: str) -> bool:
"""Check if file has changed since last export."""
manifest_hash = self._get_manifest_hash(space_path)
if manifest_hash is None:
return True # New file
return manifest_hash != current_hash