Files
repo-scoping/src/repo_registry/repo_ingestion/git.py

77 lines
2.6 KiB
Python

from __future__ import annotations
import hashlib
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urlparse
@dataclass(frozen=True)
class Checkout:
source_path: Path
was_cloned: bool
class GitIngestionService:
def __init__(self, checkout_root: str | Path = "var/checkouts") -> None:
self.checkout_root = Path(checkout_root)
def resolve(self, url_or_path: str, *, branch: str = "main") -> Checkout:
local_path = self._local_path(url_or_path)
if local_path is not None:
return Checkout(source_path=local_path.resolve(), was_cloned=False)
checkout_path = self.checkout_root / self._checkout_key(url_or_path)
self.checkout_root.mkdir(parents=True, exist_ok=True)
if checkout_path.exists():
self._run_git(["fetch", "--all", "--prune"], cwd=checkout_path)
else:
self._run_git(["clone", url_or_path, str(checkout_path)], cwd=None)
self._checkout_branch(checkout_path, branch)
return Checkout(source_path=checkout_path.resolve(), was_cloned=True)
def _checkout_branch(self, checkout_path: Path, branch: str) -> None:
if branch:
self._run_git(["checkout", branch], cwd=checkout_path)
self._run_git(["pull", "--ff-only"], cwd=checkout_path)
def _local_path(self, value: str) -> Path | None:
parsed = urlparse(value)
if parsed.scheme:
return None
path = Path(value).expanduser()
if path.exists():
return path
return None
def _checkout_key(self, url: str) -> str:
parsed = urlparse(url)
name = Path(parsed.path.rstrip("/")).name or "repository"
if name.endswith(".git"):
name = name[:-4]
digest = hashlib.sha256(url.encode("utf-8")).hexdigest()[:12]
return f"{self._safe_name(name)}-{digest}"
def _safe_name(self, value: str) -> str:
safe = "".join(char if char.isalnum() or char in "-_" else "-" for char in value)
return safe.strip("-") or "repository"
def _run_git(self, args: list[str], *, cwd: Path | None) -> None:
if shutil.which("git") is None:
raise RuntimeError("git executable was not found")
result = subprocess.run(
["git", *args],
cwd=cwd,
check=False,
capture_output=True,
text=True,
timeout=120,
)
if result.returncode != 0:
message = result.stderr.strip() or result.stdout.strip()
raise RuntimeError(f"git {' '.join(args)} failed: {message}")