Files
markitect-main/markitect/spaces/history/git_backend.py
tegwick 4588cbeee8 feat(spaces): implement Phase 8 Git History Tracking
Implements optional git-based version control for information spaces:
- HistoryConfig model for configuring history tracking
- Commit, Branch, HistoryEntry, DiffResult models
- IHistoryBackend and IHistoryQuery interfaces
- GitHistoryBackend using git CLI for version control
- GitHistoryEventHandler for event-driven auto-commits
- HistoryEventCoordinator for managing space history
- HistoryQueryService for high-level history queries
- Automatic commits on DOCUMENT_ADDED/REMOVED/CONTENT_CHANGED events
- Support for:
  * Commit log with pagination and filtering
  * Diff between versions
  * File content at specific versions
  * Branch creation and switching
  * Version restoration
  * Uncommitted changes detection
- 43 comprehensive unit tests with git availability checks

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-08 18:03:35 +01:00

522 lines
16 KiB
Python

"""
Git implementation of the history backend.
Uses git commands to provide version control for space content.
"""
import subprocess
import os
from pathlib import Path
from typing import List, Optional, Dict, Any
from datetime import datetime
import re
from .interfaces import IHistoryBackend
from .models import Commit, Branch, DiffResult, DiffLine, DiffType, HistoryConfig
class GitError(Exception):
"""Raised when a git operation fails."""
pass
class GitHistoryBackend(IHistoryBackend):
"""
Git-based history backend implementation.
Uses the git CLI to manage version control. Requires git to be
installed on the system.
"""
def __init__(self, default_author: str = "markitect <markitect@local>"):
"""
Initialize the git backend.
Args:
default_author: Default author for commits
"""
self._default_author = default_author
self._verify_git_available()
def _verify_git_available(self) -> None:
"""Verify git is available on the system."""
try:
result = subprocess.run(
["git", "--version"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
raise GitError("Git is not available")
except FileNotFoundError:
raise GitError("Git is not installed")
except subprocess.TimeoutExpired:
raise GitError("Git command timed out")
def _run_git(
self,
directory: Path,
args: List[str],
check: bool = True,
capture_output: bool = True,
) -> subprocess.CompletedProcess:
"""
Run a git command.
Args:
directory: Working directory
args: Git command arguments (without 'git')
check: Whether to raise on non-zero exit
capture_output: Whether to capture stdout/stderr
Returns:
CompletedProcess result
"""
cmd = ["git"] + args
try:
result = subprocess.run(
cmd,
cwd=str(directory),
capture_output=capture_output,
text=True,
timeout=30,
)
if check and result.returncode != 0:
raise GitError(f"Git command failed: {result.stderr}")
return result
except subprocess.TimeoutExpired:
raise GitError(f"Git command timed out: {' '.join(cmd)}")
def initialize(self, directory: Path, config: HistoryConfig) -> None:
"""Initialize a git repository."""
if not directory.exists():
directory.mkdir(parents=True, exist_ok=True)
if not self.is_initialized(directory):
self._run_git(directory, ["init"])
# Configure user for this repo
self._run_git(
directory,
["config", "user.name", config.author_name],
)
self._run_git(
directory,
["config", "user.email", config.author_email],
)
# Create initial .gitignore
gitignore = directory / ".gitignore"
if not gitignore.exists():
gitignore.write_text("*.pyc\n__pycache__/\n.DS_Store\n")
self._run_git(directory, ["add", ".gitignore"])
self._run_git(
directory,
["commit", "-m", "Initial commit: add .gitignore"],
)
def is_initialized(self, directory: Path) -> bool:
"""Check if directory is a git repository."""
git_dir = directory / ".git"
return git_dir.exists() and git_dir.is_dir()
def commit(
self,
directory: Path,
message: str,
files: Optional[List[str]] = None,
author: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Commit:
"""Create a commit."""
if not self.is_initialized(directory):
raise GitError(f"Not a git repository: {directory}")
# Stage files
if files:
for f in files:
file_path = directory / f
if file_path.exists():
self._run_git(directory, ["add", f])
else:
# File was deleted
self._run_git(directory, ["add", "-u", f], check=False)
else:
# Stage all changes
self._run_git(directory, ["add", "-A"])
# Check if there are changes to commit
status = self._run_git(directory, ["status", "--porcelain"])
if not status.stdout.strip():
raise GitError("No changes to commit")
# Build commit command
commit_args = ["commit", "-m", message]
if author:
commit_args.extend(["--author", author])
# Add metadata as trailer
if metadata:
for key, value in metadata.items():
commit_args.extend(["--trailer", f"{key}={value}"])
self._run_git(directory, commit_args)
# Get the commit info
return self._get_head_commit(directory)
def _get_head_commit(self, directory: Path) -> Commit:
"""Get the HEAD commit."""
result = self._run_git(
directory,
["log", "-1", "--format=%H|%s|%an <%ae>|%aI|%P"],
)
return self._parse_commit_line(result.stdout.strip(), directory)
def _parse_commit_line(self, line: str, directory: Path) -> Commit:
"""Parse a commit from log format."""
parts = line.split("|")
if len(parts) < 4:
raise GitError(f"Invalid commit format: {line}")
commit_id = parts[0]
message = parts[1]
author = parts[2]
timestamp_str = parts[3]
parent_ids = parts[4].split() if len(parts) > 4 and parts[4] else []
# Parse ISO timestamp
try:
timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
except ValueError:
timestamp = datetime.now()
# Get changed files
files_result = self._run_git(
directory,
["show", "--name-only", "--format=", commit_id],
)
files_changed = [f for f in files_result.stdout.strip().split("\n") if f]
# Extract metadata from commit body (git trailers)
metadata = {}
try:
body_result = self._run_git(
directory,
["log", "-1", "--format=%b", commit_id],
)
body = body_result.stdout.strip()
# Parse trailers - format is "Key: value" or "Key=value"
for line in body.split("\n"):
line = line.strip()
if not line:
continue
# Try "Key: value" format first
if ":" in line:
key, value = line.split(":", 1)
metadata[key.strip()] = value.strip()
# Also try "Key=value" format (what git --trailer actually creates)
elif "=" in line:
key, value = line.split("=", 1)
metadata[key.strip()] = value.strip()
except GitError:
pass
return Commit(
id=commit_id,
message=message,
author=author,
timestamp=timestamp,
parent_ids=parent_ids,
files_changed=files_changed,
metadata=metadata,
)
def get_commit(self, directory: Path, commit_id: str) -> Optional[Commit]:
"""Get a commit by ID."""
if not self.is_initialized(directory):
return None
try:
result = self._run_git(
directory,
["log", "-1", "--format=%H|%s|%an <%ae>|%aI|%P", commit_id],
)
if result.stdout.strip():
return self._parse_commit_line(result.stdout.strip(), directory)
except GitError:
pass
return None
def get_log(
self,
directory: Path,
limit: int = 50,
offset: int = 0,
path: Optional[str] = None,
) -> List[Commit]:
"""Get commit history."""
if not self.is_initialized(directory):
return []
args = [
"log",
f"--skip={offset}",
f"-{limit}",
"--format=%H|%s|%an <%ae>|%aI|%P",
]
if path:
args.extend(["--", path])
try:
result = self._run_git(directory, args)
commits = []
for line in result.stdout.strip().split("\n"):
if line:
commits.append(self._parse_commit_line(line, directory))
return commits
except GitError:
return []
def get_diff(
self,
directory: Path,
old_version: Optional[str] = None,
new_version: Optional[str] = None,
path: Optional[str] = None,
) -> List[DiffResult]:
"""Get diff between versions."""
if not self.is_initialized(directory):
return []
args = ["diff"]
if old_version and new_version:
args.append(f"{old_version}..{new_version}")
elif old_version:
args.append(old_version)
elif new_version:
args.extend(["HEAD", new_version])
if path:
args.extend(["--", path])
try:
result = self._run_git(directory, args)
return self._parse_diff_output(result.stdout)
except GitError:
return []
def _parse_diff_output(self, output: str) -> List[DiffResult]:
"""Parse git diff output into DiffResult objects."""
results = []
current_diff: Optional[DiffResult] = None
old_line = 0
new_line = 0
for line in output.split("\n"):
# New file diff
if line.startswith("diff --git"):
if current_diff:
results.append(current_diff)
# Extract path from "diff --git a/path b/path"
match = re.search(r"diff --git a/(.*) b/", line)
path = match.group(1) if match else "unknown"
current_diff = DiffResult(path=path)
elif line.startswith("@@") and current_diff:
# Parse hunk header: @@ -old_start,old_count +new_start,new_count @@
match = re.search(r"@@ -(\d+)", line)
if match:
old_line = int(match.group(1))
match = re.search(r"\+(\d+)", line)
if match:
new_line = int(match.group(1))
elif current_diff and line:
if line.startswith("+") and not line.startswith("+++"):
current_diff.lines.append(DiffLine(
line_type=DiffType.ADDITION,
content=line[1:],
new_line_no=new_line,
))
current_diff.additions += 1
new_line += 1
elif line.startswith("-") and not line.startswith("---"):
current_diff.lines.append(DiffLine(
line_type=DiffType.DELETION,
content=line[1:],
old_line_no=old_line,
))
current_diff.deletions += 1
old_line += 1
elif line.startswith(" "):
current_diff.lines.append(DiffLine(
line_type=DiffType.CONTEXT,
content=line[1:],
old_line_no=old_line,
new_line_no=new_line,
))
old_line += 1
new_line += 1
if current_diff:
results.append(current_diff)
return results
def checkout(
self,
directory: Path,
version: str,
path: Optional[str] = None,
) -> bool:
"""Checkout a specific version."""
if not self.is_initialized(directory):
return False
try:
if path:
self._run_git(directory, ["checkout", version, "--", path])
else:
self._run_git(directory, ["checkout", version])
return True
except GitError:
return False
def get_file_at_version(
self,
directory: Path,
path: str,
version: str,
) -> Optional[str]:
"""Get file content at a specific version."""
if not self.is_initialized(directory):
return None
try:
result = self._run_git(
directory,
["show", f"{version}:{path}"],
)
return result.stdout
except GitError:
return None
def list_branches(self, directory: Path) -> List[Branch]:
"""List all branches."""
if not self.is_initialized(directory):
return []
try:
result = self._run_git(
directory,
["branch", "-v", "--format=%(refname:short)|%(objectname)|%(HEAD)"],
)
branches = []
for line in result.stdout.strip().split("\n"):
if line:
parts = line.split("|")
if len(parts) >= 3:
branches.append(Branch(
name=parts[0],
head_commit_id=parts[1],
is_current=(parts[2] == "*"),
))
return branches
except GitError:
return []
def create_branch(
self,
directory: Path,
name: str,
start_point: Optional[str] = None,
) -> Branch:
"""Create a new branch."""
if not self.is_initialized(directory):
raise GitError(f"Not a git repository: {directory}")
args = ["branch", name]
if start_point:
args.append(start_point)
self._run_git(directory, args)
# Get the created branch info
result = self._run_git(
directory,
["rev-parse", name],
)
return Branch(
name=name,
head_commit_id=result.stdout.strip(),
is_current=False,
)
def switch_branch(self, directory: Path, name: str) -> bool:
"""Switch to a branch."""
if not self.is_initialized(directory):
return False
try:
self._run_git(directory, ["checkout", name])
return True
except GitError:
return False
def get_current_branch(self, directory: Path) -> Optional[str]:
"""Get the current branch name."""
if not self.is_initialized(directory):
return None
try:
result = self._run_git(
directory,
["rev-parse", "--abbrev-ref", "HEAD"],
)
branch = result.stdout.strip()
return branch if branch != "HEAD" else None
except GitError:
return None
def has_changes(self, directory: Path) -> bool:
"""Check if there are uncommitted changes."""
if not self.is_initialized(directory):
return False
try:
result = self._run_git(directory, ["status", "--porcelain"])
return bool(result.stdout.strip())
except GitError:
return False
def restore_file(
self,
directory: Path,
path: str,
version: str,
) -> bool:
"""
Restore a file to a specific version.
Gets the file content at the version and writes it to the working tree.
Args:
directory: Root directory
path: File path
version: Version to restore from
Returns:
True if successful
"""
content = self.get_file_at_version(directory, path, version)
if content is None:
return False
file_path = directory / path
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content)
return True