generated from coulomb/repo-seed
Fixed and improved token tracking
This commit is contained in:
171
api/services/token_sources/attribution.py
Normal file
171
api/services/token_sources/attribution.py
Normal file
@@ -0,0 +1,171 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RepoRef:
|
||||
repo_id: str
|
||||
slug: str
|
||||
local_path: str | None = None
|
||||
host_paths: dict[str, Any] | None = None
|
||||
remote_url: str | None = None
|
||||
git_fingerprint: str | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RepoMatch:
|
||||
repo_id: str
|
||||
slug: str
|
||||
method: str
|
||||
confidence: float
|
||||
|
||||
|
||||
def normalise_cwd(raw: str | None) -> str | None:
|
||||
if not raw:
|
||||
return None
|
||||
value = raw.replace("\\", "/")
|
||||
prefixes = (
|
||||
"//wsl.localhost/Ubuntu-24.04",
|
||||
"//wsl$/Ubuntu-24.04",
|
||||
)
|
||||
for prefix in prefixes:
|
||||
if value.startswith(prefix):
|
||||
return value[len(prefix):] or "/"
|
||||
if len(value) >= 3 and value[1:3] == ":/":
|
||||
drive = value[0].lower()
|
||||
return f"/mnt/{drive}{value[2:]}"
|
||||
return value
|
||||
|
||||
|
||||
def normalise_remote_url(raw: str | None) -> str | None:
|
||||
if not raw:
|
||||
return None
|
||||
value = raw.strip()
|
||||
if value.endswith(".git"):
|
||||
value = value[:-4]
|
||||
if value.startswith("git@") and ":" in value:
|
||||
host, path = value[4:].split(":", 1)
|
||||
value = f"ssh://{host}/{path}"
|
||||
return value.lower().rstrip("/")
|
||||
|
||||
|
||||
def repo_refs_from_api(repos: list[dict[str, Any]]) -> list[RepoRef]:
|
||||
refs = []
|
||||
for repo in repos:
|
||||
repo_id = repo.get("id")
|
||||
slug = repo.get("slug")
|
||||
if not repo_id or not slug:
|
||||
continue
|
||||
refs.append(
|
||||
RepoRef(
|
||||
repo_id=str(repo_id),
|
||||
slug=str(slug),
|
||||
local_path=repo.get("local_path"),
|
||||
host_paths=repo.get("host_paths") if isinstance(repo.get("host_paths"), dict) else {},
|
||||
remote_url=repo.get("remote_url"),
|
||||
git_fingerprint=repo.get("git_fingerprint"),
|
||||
)
|
||||
)
|
||||
return refs
|
||||
|
||||
|
||||
def _git(cwd: str, *args: str) -> str | None:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", *args],
|
||||
cwd=cwd,
|
||||
check=False,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5,
|
||||
)
|
||||
except (OSError, subprocess.SubprocessError):
|
||||
return None
|
||||
if result.returncode != 0:
|
||||
return None
|
||||
value = result.stdout.strip().splitlines()
|
||||
return value[0] if value else None
|
||||
|
||||
|
||||
def git_fingerprint_for_path(cwd: str | None) -> str | None:
|
||||
path = normalise_cwd(cwd)
|
||||
if not path or not Path(path).exists():
|
||||
return None
|
||||
root = _git(path, "rev-parse", "--show-toplevel")
|
||||
if not root:
|
||||
return None
|
||||
return _git(root, "rev-list", "--max-parents=0", "HEAD")
|
||||
|
||||
|
||||
def git_remote_for_path(cwd: str | None) -> str | None:
|
||||
path = normalise_cwd(cwd)
|
||||
if not path or not Path(path).exists():
|
||||
return None
|
||||
root = _git(path, "rev-parse", "--show-toplevel")
|
||||
if not root:
|
||||
return None
|
||||
return _git(root, "remote", "get-url", "origin")
|
||||
|
||||
|
||||
def _repo_paths(repo: RepoRef) -> list[str]:
|
||||
paths = [repo.local_path]
|
||||
if repo.host_paths:
|
||||
paths.extend(str(v) for v in repo.host_paths.values() if v)
|
||||
result = []
|
||||
for raw in paths:
|
||||
path = normalise_cwd(str(raw)) if raw and raw != "(unknown)" else None
|
||||
if path:
|
||||
result.append(path.rstrip("/"))
|
||||
return result
|
||||
|
||||
|
||||
def resolve_repo(cwd: str | None, repos: list[RepoRef]) -> RepoMatch | None:
|
||||
path = normalise_cwd(cwd)
|
||||
fingerprint = git_fingerprint_for_path(path)
|
||||
remote = normalise_remote_url(git_remote_for_path(path))
|
||||
|
||||
if fingerprint:
|
||||
candidates = [repo for repo in repos if repo.git_fingerprint == fingerprint]
|
||||
if len(candidates) == 1:
|
||||
repo = candidates[0]
|
||||
return RepoMatch(repo.repo_id, repo.slug, "git_fingerprint", 0.98)
|
||||
if remote:
|
||||
remote_candidates = [
|
||||
repo for repo in candidates
|
||||
if normalise_remote_url(repo.remote_url) == remote
|
||||
]
|
||||
if len(remote_candidates) == 1:
|
||||
repo = remote_candidates[0]
|
||||
return RepoMatch(repo.repo_id, repo.slug, "git_fingerprint_remote", 0.99)
|
||||
|
||||
if remote:
|
||||
candidates = [repo for repo in repos if normalise_remote_url(repo.remote_url) == remote]
|
||||
if len(candidates) == 1:
|
||||
repo = candidates[0]
|
||||
return RepoMatch(repo.repo_id, repo.slug, "remote_url", 0.90)
|
||||
|
||||
if not path:
|
||||
return None
|
||||
|
||||
path_matches: list[tuple[str, RepoRef]] = []
|
||||
for repo in repos:
|
||||
for repo_path in _repo_paths(repo):
|
||||
if path == repo_path or path.startswith(f"{repo_path}/"):
|
||||
path_matches.append((repo_path, repo))
|
||||
if not path_matches:
|
||||
return None
|
||||
path_matches.sort(key=lambda item: len(item[0]), reverse=True)
|
||||
exact = [item for item in path_matches if path == item[0]]
|
||||
if exact:
|
||||
basename = Path(path).name
|
||||
for _, repo in exact:
|
||||
if repo.slug == basename:
|
||||
return RepoMatch(repo.repo_id, repo.slug, "path_exact_slug", 0.85)
|
||||
repo = exact[0][1]
|
||||
return RepoMatch(repo.repo_id, repo.slug, "path_exact", 0.80)
|
||||
repo = path_matches[0][1]
|
||||
return RepoMatch(repo.repo_id, repo.slug, "path_prefix", 0.75)
|
||||
Reference in New Issue
Block a user