""" Git implementation of the history backend. Uses git commands to provide version control for space content. """ import subprocess import os from pathlib import Path from typing import List, Optional, Dict, Any from datetime import datetime import re from .interfaces import IHistoryBackend from .models import Commit, Branch, DiffResult, DiffLine, DiffType, HistoryConfig class GitError(Exception): """Raised when a git operation fails.""" pass class GitHistoryBackend(IHistoryBackend): """ Git-based history backend implementation. Uses the git CLI to manage version control. Requires git to be installed on the system. """ def __init__(self, default_author: str = "markitect "): """ Initialize the git backend. Args: default_author: Default author for commits """ self._default_author = default_author self._verify_git_available() def _verify_git_available(self) -> None: """Verify git is available on the system.""" try: result = subprocess.run( ["git", "--version"], capture_output=True, text=True, timeout=5, ) if result.returncode != 0: raise GitError("Git is not available") except FileNotFoundError: raise GitError("Git is not installed") except subprocess.TimeoutExpired: raise GitError("Git command timed out") def _run_git( self, directory: Path, args: List[str], check: bool = True, capture_output: bool = True, ) -> subprocess.CompletedProcess: """ Run a git command. Args: directory: Working directory args: Git command arguments (without 'git') check: Whether to raise on non-zero exit capture_output: Whether to capture stdout/stderr Returns: CompletedProcess result """ cmd = ["git"] + args try: result = subprocess.run( cmd, cwd=str(directory), capture_output=capture_output, text=True, timeout=30, ) if check and result.returncode != 0: raise GitError(f"Git command failed: {result.stderr}") return result except subprocess.TimeoutExpired: raise GitError(f"Git command timed out: {' '.join(cmd)}") def initialize(self, directory: Path, config: HistoryConfig) -> None: """Initialize a git repository.""" if not directory.exists(): directory.mkdir(parents=True, exist_ok=True) if not self.is_initialized(directory): self._run_git(directory, ["init"]) # Configure user for this repo self._run_git( directory, ["config", "user.name", config.author_name], ) self._run_git( directory, ["config", "user.email", config.author_email], ) # Create initial .gitignore gitignore = directory / ".gitignore" if not gitignore.exists(): gitignore.write_text("*.pyc\n__pycache__/\n.DS_Store\n") self._run_git(directory, ["add", ".gitignore"]) self._run_git( directory, ["commit", "-m", "Initial commit: add .gitignore"], ) def is_initialized(self, directory: Path) -> bool: """Check if directory is a git repository.""" git_dir = directory / ".git" return git_dir.exists() and git_dir.is_dir() def commit( self, directory: Path, message: str, files: Optional[List[str]] = None, author: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> Commit: """Create a commit.""" if not self.is_initialized(directory): raise GitError(f"Not a git repository: {directory}") # Stage files if files: for f in files: file_path = directory / f if file_path.exists(): self._run_git(directory, ["add", f]) else: # File was deleted self._run_git(directory, ["add", "-u", f], check=False) else: # Stage all changes self._run_git(directory, ["add", "-A"]) # Check if there are changes to commit status = self._run_git(directory, ["status", "--porcelain"]) if not status.stdout.strip(): raise GitError("No changes to commit") # Build commit command commit_args = ["commit", "-m", message] if author: commit_args.extend(["--author", author]) # Add metadata as trailer if metadata: for key, value in metadata.items(): commit_args.extend(["--trailer", f"{key}={value}"]) self._run_git(directory, commit_args) # Get the commit info return self._get_head_commit(directory) def _get_head_commit(self, directory: Path) -> Commit: """Get the HEAD commit.""" result = self._run_git( directory, ["log", "-1", "--format=%H|%s|%an <%ae>|%aI|%P"], ) return self._parse_commit_line(result.stdout.strip(), directory) def _parse_commit_line(self, line: str, directory: Path) -> Commit: """Parse a commit from log format.""" parts = line.split("|") if len(parts) < 4: raise GitError(f"Invalid commit format: {line}") commit_id = parts[0] message = parts[1] author = parts[2] timestamp_str = parts[3] parent_ids = parts[4].split() if len(parts) > 4 and parts[4] else [] # Parse ISO timestamp try: timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) except ValueError: timestamp = datetime.now() # Get changed files files_result = self._run_git( directory, ["show", "--name-only", "--format=", commit_id], ) files_changed = [f for f in files_result.stdout.strip().split("\n") if f] # Extract metadata from commit body (git trailers) metadata = {} try: body_result = self._run_git( directory, ["log", "-1", "--format=%b", commit_id], ) body = body_result.stdout.strip() # Parse trailers - format is "Key: value" or "Key=value" for line in body.split("\n"): line = line.strip() if not line: continue # Try "Key: value" format first if ":" in line: key, value = line.split(":", 1) metadata[key.strip()] = value.strip() # Also try "Key=value" format (what git --trailer actually creates) elif "=" in line: key, value = line.split("=", 1) metadata[key.strip()] = value.strip() except GitError: pass return Commit( id=commit_id, message=message, author=author, timestamp=timestamp, parent_ids=parent_ids, files_changed=files_changed, metadata=metadata, ) def get_commit(self, directory: Path, commit_id: str) -> Optional[Commit]: """Get a commit by ID.""" if not self.is_initialized(directory): return None try: result = self._run_git( directory, ["log", "-1", "--format=%H|%s|%an <%ae>|%aI|%P", commit_id], ) if result.stdout.strip(): return self._parse_commit_line(result.stdout.strip(), directory) except GitError: pass return None def get_log( self, directory: Path, limit: int = 50, offset: int = 0, path: Optional[str] = None, ) -> List[Commit]: """Get commit history.""" if not self.is_initialized(directory): return [] args = [ "log", f"--skip={offset}", f"-{limit}", "--format=%H|%s|%an <%ae>|%aI|%P", ] if path: args.extend(["--", path]) try: result = self._run_git(directory, args) commits = [] for line in result.stdout.strip().split("\n"): if line: commits.append(self._parse_commit_line(line, directory)) return commits except GitError: return [] def get_diff( self, directory: Path, old_version: Optional[str] = None, new_version: Optional[str] = None, path: Optional[str] = None, ) -> List[DiffResult]: """Get diff between versions.""" if not self.is_initialized(directory): return [] args = ["diff"] if old_version and new_version: args.append(f"{old_version}..{new_version}") elif old_version: args.append(old_version) elif new_version: args.extend(["HEAD", new_version]) if path: args.extend(["--", path]) try: result = self._run_git(directory, args) return self._parse_diff_output(result.stdout) except GitError: return [] def _parse_diff_output(self, output: str) -> List[DiffResult]: """Parse git diff output into DiffResult objects.""" results = [] current_diff: Optional[DiffResult] = None old_line = 0 new_line = 0 for line in output.split("\n"): # New file diff if line.startswith("diff --git"): if current_diff: results.append(current_diff) # Extract path from "diff --git a/path b/path" match = re.search(r"diff --git a/(.*) b/", line) path = match.group(1) if match else "unknown" current_diff = DiffResult(path=path) elif line.startswith("@@") and current_diff: # Parse hunk header: @@ -old_start,old_count +new_start,new_count @@ match = re.search(r"@@ -(\d+)", line) if match: old_line = int(match.group(1)) match = re.search(r"\+(\d+)", line) if match: new_line = int(match.group(1)) elif current_diff and line: if line.startswith("+") and not line.startswith("+++"): current_diff.lines.append(DiffLine( line_type=DiffType.ADDITION, content=line[1:], new_line_no=new_line, )) current_diff.additions += 1 new_line += 1 elif line.startswith("-") and not line.startswith("---"): current_diff.lines.append(DiffLine( line_type=DiffType.DELETION, content=line[1:], old_line_no=old_line, )) current_diff.deletions += 1 old_line += 1 elif line.startswith(" "): current_diff.lines.append(DiffLine( line_type=DiffType.CONTEXT, content=line[1:], old_line_no=old_line, new_line_no=new_line, )) old_line += 1 new_line += 1 if current_diff: results.append(current_diff) return results def checkout( self, directory: Path, version: str, path: Optional[str] = None, ) -> bool: """Checkout a specific version.""" if not self.is_initialized(directory): return False try: if path: self._run_git(directory, ["checkout", version, "--", path]) else: self._run_git(directory, ["checkout", version]) return True except GitError: return False def get_file_at_version( self, directory: Path, path: str, version: str, ) -> Optional[str]: """Get file content at a specific version.""" if not self.is_initialized(directory): return None try: result = self._run_git( directory, ["show", f"{version}:{path}"], ) return result.stdout except GitError: return None def list_branches(self, directory: Path) -> List[Branch]: """List all branches.""" if not self.is_initialized(directory): return [] try: result = self._run_git( directory, ["branch", "-v", "--format=%(refname:short)|%(objectname)|%(HEAD)"], ) branches = [] for line in result.stdout.strip().split("\n"): if line: parts = line.split("|") if len(parts) >= 3: branches.append(Branch( name=parts[0], head_commit_id=parts[1], is_current=(parts[2] == "*"), )) return branches except GitError: return [] def create_branch( self, directory: Path, name: str, start_point: Optional[str] = None, ) -> Branch: """Create a new branch.""" if not self.is_initialized(directory): raise GitError(f"Not a git repository: {directory}") args = ["branch", name] if start_point: args.append(start_point) self._run_git(directory, args) # Get the created branch info result = self._run_git( directory, ["rev-parse", name], ) return Branch( name=name, head_commit_id=result.stdout.strip(), is_current=False, ) def switch_branch(self, directory: Path, name: str) -> bool: """Switch to a branch.""" if not self.is_initialized(directory): return False try: self._run_git(directory, ["checkout", name]) return True except GitError: return False def get_current_branch(self, directory: Path) -> Optional[str]: """Get the current branch name.""" if not self.is_initialized(directory): return None try: result = self._run_git( directory, ["rev-parse", "--abbrev-ref", "HEAD"], ) branch = result.stdout.strip() return branch if branch != "HEAD" else None except GitError: return None def has_changes(self, directory: Path) -> bool: """Check if there are uncommitted changes.""" if not self.is_initialized(directory): return False try: result = self._run_git(directory, ["status", "--porcelain"]) return bool(result.stdout.strip()) except GitError: return False def restore_file( self, directory: Path, path: str, version: str, ) -> bool: """ Restore a file to a specific version. Gets the file content at the version and writes it to the working tree. Args: directory: Root directory path: File path version: Version to restore from Returns: True if successful """ content = self.get_file_at_version(directory, path, version) if content is None: return False file_path = directory / path file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(content) return True