from __future__ import annotations import subprocess from dataclasses import dataclass from pathlib import Path from typing import Any @dataclass(frozen=True) class RepoRef: repo_id: str slug: str local_path: str | None = None host_paths: dict[str, Any] | None = None remote_url: str | None = None git_fingerprint: str | None = None @dataclass(frozen=True) class RepoMatch: repo_id: str slug: str method: str confidence: float def normalise_cwd(raw: str | None) -> str | None: if not raw: return None value = raw.replace("\\", "/") prefixes = ( "//wsl.localhost/Ubuntu-24.04", "//wsl$/Ubuntu-24.04", ) for prefix in prefixes: if value.startswith(prefix): return value[len(prefix):] or "/" if len(value) >= 3 and value[1:3] == ":/": drive = value[0].lower() return f"/mnt/{drive}{value[2:]}" return value def normalise_remote_url(raw: str | None) -> str | None: if not raw: return None value = raw.strip() if value.endswith(".git"): value = value[:-4] if value.startswith("git@") and ":" in value: host, path = value[4:].split(":", 1) value = f"ssh://{host}/{path}" return value.lower().rstrip("/") def repo_refs_from_api(repos: list[dict[str, Any]]) -> list[RepoRef]: refs = [] for repo in repos: repo_id = repo.get("id") slug = repo.get("slug") if not repo_id or not slug: continue refs.append( RepoRef( repo_id=str(repo_id), slug=str(slug), local_path=repo.get("local_path"), host_paths=repo.get("host_paths") if isinstance(repo.get("host_paths"), dict) else {}, remote_url=repo.get("remote_url"), git_fingerprint=repo.get("git_fingerprint"), ) ) return refs def _git(cwd: str, *args: str) -> str | None: try: result = subprocess.run( ["git", *args], cwd=cwd, check=False, capture_output=True, text=True, timeout=5, ) except (OSError, subprocess.SubprocessError): return None if result.returncode != 0: return None value = result.stdout.strip().splitlines() return value[0] if value else None def git_fingerprint_for_path(cwd: str | None) -> str | None: path = normalise_cwd(cwd) if not path or not Path(path).exists(): return None root = _git(path, "rev-parse", "--show-toplevel") if not root: return None return _git(root, "rev-list", "--max-parents=0", "HEAD") def git_remote_for_path(cwd: str | None) -> str | None: path = normalise_cwd(cwd) if not path or not Path(path).exists(): return None root = _git(path, "rev-parse", "--show-toplevel") if not root: return None return _git(root, "remote", "get-url", "origin") def _repo_paths(repo: RepoRef) -> list[str]: paths = [repo.local_path] if repo.host_paths: paths.extend(str(v) for v in repo.host_paths.values() if v) result = [] for raw in paths: path = normalise_cwd(str(raw)) if raw and raw != "(unknown)" else None if path: result.append(path.rstrip("/")) return result def resolve_repo(cwd: str | None, repos: list[RepoRef]) -> RepoMatch | None: path = normalise_cwd(cwd) fingerprint = git_fingerprint_for_path(path) remote = normalise_remote_url(git_remote_for_path(path)) if fingerprint: candidates = [repo for repo in repos if repo.git_fingerprint == fingerprint] if len(candidates) == 1: repo = candidates[0] return RepoMatch(repo.repo_id, repo.slug, "git_fingerprint", 0.98) if remote: remote_candidates = [ repo for repo in candidates if normalise_remote_url(repo.remote_url) == remote ] if len(remote_candidates) == 1: repo = remote_candidates[0] return RepoMatch(repo.repo_id, repo.slug, "git_fingerprint_remote", 0.99) if remote: candidates = [repo for repo in repos if normalise_remote_url(repo.remote_url) == remote] if len(candidates) == 1: repo = candidates[0] return RepoMatch(repo.repo_id, repo.slug, "remote_url", 0.90) if not path: return None path_matches: list[tuple[str, RepoRef]] = [] for repo in repos: for repo_path in _repo_paths(repo): if path == repo_path or path.startswith(f"{repo_path}/"): path_matches.append((repo_path, repo)) if not path_matches: return None path_matches.sort(key=lambda item: len(item[0]), reverse=True) exact = [item for item in path_matches if path == item[0]] if exact: basename = Path(path).name for _, repo in exact: if repo.slug == basename: return RepoMatch(repo.repo_id, repo.slug, "path_exact_slug", 0.85) repo = exact[0][1] return RepoMatch(repo.repo_id, repo.slug, "path_exact", 0.80) repo = path_matches[0][1] return RepoMatch(repo.repo_id, repo.slug, "path_prefix", 0.75)