repos with username password

This commit is contained in:
2026-04-26 19:17:36 +02:00
parent dda00b70ac
commit 345aeaf353
6 changed files with 155 additions and 11 deletions

View File

@@ -62,9 +62,16 @@ class RegistryService:
name: str | None = None, name: str | None = None,
description: str | None = None, description: str | None = None,
branch: str = "main", branch: str = "main",
access_username: str | None = None,
access_password: str | None = None,
) -> Repository: ) -> Repository:
if name is None or description is None: if name is None or description is None:
checkout = self.ingestion.resolve(url, branch=branch) checkout = self.ingestion.resolve(
url,
branch=branch,
access_username=access_username,
access_password=access_password,
)
metadata = self.metadata_extractor.extract(checkout.source_path, url) metadata = self.metadata_extractor.extract(checkout.source_path, url)
else: else:
metadata = None metadata = None

View File

@@ -3,8 +3,10 @@ from __future__ import annotations
import hashlib import hashlib
import shutil import shutil
import subprocess import subprocess
import os
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from base64 import b64encode
from urllib.parse import urlparse from urllib.parse import urlparse
@@ -18,7 +20,14 @@ class GitIngestionService:
def __init__(self, checkout_root: str | Path = "var/checkouts") -> None: def __init__(self, checkout_root: str | Path = "var/checkouts") -> None:
self.checkout_root = Path(checkout_root) self.checkout_root = Path(checkout_root)
def resolve(self, url_or_path: str, *, branch: str = "main") -> Checkout: def resolve(
self,
url_or_path: str,
*,
branch: str = "main",
access_username: str | None = None,
access_password: str | None = None,
) -> Checkout:
local_path = self._local_path(url_or_path) local_path = self._local_path(url_or_path)
if local_path is not None: if local_path is not None:
return Checkout(source_path=local_path.resolve(), was_cloned=False) return Checkout(source_path=local_path.resolve(), was_cloned=False)
@@ -26,17 +35,49 @@ class GitIngestionService:
checkout_path = self.checkout_root / self._checkout_key(url_or_path) checkout_path = self.checkout_root / self._checkout_key(url_or_path)
self.checkout_root.mkdir(parents=True, exist_ok=True) self.checkout_root.mkdir(parents=True, exist_ok=True)
if checkout_path.exists(): if checkout_path.exists():
self._run_git(["fetch", "--all", "--prune"], cwd=checkout_path) self._run_git(
["fetch", "--all", "--prune"],
cwd=checkout_path,
access_username=access_username,
access_password=access_password,
)
else: else:
self._run_git(["clone", url_or_path, str(checkout_path)], cwd=None) self._run_git(
["clone", url_or_path, str(checkout_path)],
cwd=None,
access_username=access_username,
access_password=access_password,
)
self._checkout_branch(checkout_path, branch) self._checkout_branch(
checkout_path,
branch,
access_username=access_username,
access_password=access_password,
)
return Checkout(source_path=checkout_path.resolve(), was_cloned=True) return Checkout(source_path=checkout_path.resolve(), was_cloned=True)
def _checkout_branch(self, checkout_path: Path, branch: str) -> None: def _checkout_branch(
self,
checkout_path: Path,
branch: str,
*,
access_username: str | None = None,
access_password: str | None = None,
) -> None:
if branch: if branch:
self._run_git(["checkout", branch], cwd=checkout_path) self._run_git(
self._run_git(["pull", "--ff-only"], cwd=checkout_path) ["checkout", branch],
cwd=checkout_path,
access_username=access_username,
access_password=access_password,
)
self._run_git(
["pull", "--ff-only"],
cwd=checkout_path,
access_username=access_username,
access_password=access_password,
)
def _local_path(self, value: str) -> Path | None: def _local_path(self, value: str) -> Path | None:
parsed = urlparse(value) parsed = urlparse(value)
@@ -60,10 +101,23 @@ class GitIngestionService:
safe = "".join(char if char.isalnum() or char in "-_" else "-" for char in value) safe = "".join(char if char.isalnum() or char in "-_" else "-" for char in value)
return safe.strip("-") or "repository" return safe.strip("-") or "repository"
def _run_git(self, args: list[str], *, cwd: Path | None) -> None: def _run_git(
self,
args: list[str],
*,
cwd: Path | None,
access_username: str | None = None,
access_password: str | None = None,
) -> None:
if shutil.which("git") is None: if shutil.which("git") is None:
raise RuntimeError("git executable was not found") raise RuntimeError("git executable was not found")
command = ["git", *args] auth_config = self._auth_config(access_username, access_password)
command = ["git", *auth_config, *args]
env = {
**os.environ,
"GIT_TERMINAL_PROMPT": "0",
"GIT_ASKPASS": "echo",
}
try: try:
result = subprocess.run( result = subprocess.run(
command, command,
@@ -72,11 +126,46 @@ class GitIngestionService:
capture_output=True, capture_output=True,
text=True, text=True,
timeout=120, timeout=120,
env=env,
) )
except subprocess.TimeoutExpired as exc: except subprocess.TimeoutExpired as exc:
raise RuntimeError( raise RuntimeError(
f"git {' '.join(args)} timed out after {exc.timeout} seconds" f"git {' '.join(args)} timed out after {exc.timeout} seconds. "
"If this is a private repository, provide HTTP access credentials."
) from exc ) from exc
if result.returncode != 0: if result.returncode != 0:
message = result.stderr.strip() or result.stdout.strip() message = result.stderr.strip() or result.stdout.strip()
if self._looks_like_auth_failure(message):
raise RuntimeError(
f"git {' '.join(args)} failed: authentication required. "
"Provide a username and password or access token for this repository."
)
raise RuntimeError(f"git {' '.join(args)} failed: {message}") raise RuntimeError(f"git {' '.join(args)} failed: {message}")
def _auth_config(
self,
access_username: str | None,
access_password: str | None,
) -> list[str]:
if not access_username or not access_password:
return []
token = b64encode(
f"{access_username}:{access_password}".encode("utf-8")
).decode("ascii")
return ["-c", f"http.extraHeader=Authorization: Basic {token}"]
def _looks_like_auth_failure(self, message: str) -> bool:
lowered = message.lower()
return any(
phrase in lowered
for phrase in (
"authentication failed",
"could not read username",
"could not read password",
"terminal prompts disabled",
"authentication required",
"access denied",
"401",
"403",
)
)

View File

@@ -52,6 +52,8 @@ class RepositoryCreate(BaseModel):
name: str | None = None name: str | None = None
description: str | None = None description: str | None = None
branch: str = "main" branch: str = "main"
access_username: str | None = None
access_password: str | None = Field(default=None, repr=False)
model_config = { model_config = {
"json_schema_extra": { "json_schema_extra": {
@@ -61,6 +63,8 @@ class RepositoryCreate(BaseModel):
"name": "Example Repository", "name": "Example Repository",
"description": "Optional human-readable repository summary.", "description": "Optional human-readable repository summary.",
"branch": "main", "branch": "main",
"access_username": None,
"access_password": None,
} }
] ]
} }

View File

@@ -201,6 +201,8 @@ def render_repository_index(
<form class="stack" method="post" action="/ui/repos"> <form class="stack" method="post" action="/ui/repos">
<label>Git URL or local path <input name="url" required></label> <label>Git URL or local path <input name="url" required></label>
<label>Branch <input name="branch" value="main"></label> <label>Branch <input name="branch" value="main"></label>
<label>Username <input name="access_username" autocomplete="username" placeholder="Optional for private HTTP(S) repos"></label>
<label>Password or access token <input name="access_password" type="password" autocomplete="current-password" placeholder="Used for this Git operation only"></label>
<div class="actions"> <div class="actions">
<button type="submit">Register</button> <button type="submit">Register</button>
<span data-pending>Registering repository...</span> <span data-pending>Registering repository...</span>
@@ -410,12 +412,16 @@ def search_page(
def create_repository_from_form( def create_repository_from_form(
url: str = Form(...), url: str = Form(...),
branch: str = Form("main"), branch: str = Form("main"),
access_username: str = Form(""),
access_password: str = Form(""),
service: RegistryService = Depends(get_service), service: RegistryService = Depends(get_service),
): ):
try: try:
repository = service.register_repository( repository = service.register_repository(
url=url, url=url,
branch=branch or "main", branch=branch or "main",
access_username=access_username or None,
access_password=access_password or None,
) )
except (RuntimeError, ValueError) as exc: except (RuntimeError, ValueError) as exc:
return render_repository_index( return render_repository_index(

View File

@@ -45,3 +45,38 @@ def test_ingestion_clones_file_url(tmp_path):
text=True, text=True,
).stdout.strip() ).stdout.strip()
assert branch == "main" assert branch == "main"
def test_git_commands_fail_fast_and_accept_ephemeral_http_credentials(monkeypatch):
calls = []
def fake_run(command, **kwargs):
calls.append((command, kwargs))
return subprocess.CompletedProcess(
command,
128,
stdout="",
stderr="fatal: could not read Username for 'https://example.com': terminal prompts disabled",
)
monkeypatch.setattr(subprocess, "run", fake_run)
service = GitIngestionService()
try:
service._run_git(
["clone", "https://example.com/private.git", "/tmp/private"],
cwd=None,
access_username="user",
access_password="secret",
)
except RuntimeError as exc:
message = str(exc)
else:
raise AssertionError("expected authentication failure")
command, kwargs = calls[0]
assert command[:3] == ["git", "-c", "http.extraHeader=Authorization: Basic dXNlcjpzZWNyZXQ="]
assert kwargs["env"]["GIT_TERMINAL_PROMPT"] == "0"
assert "authentication required" in message
assert "secret" not in message

View File

@@ -1074,12 +1074,15 @@ def test_ui_register_analyze_and_approve_loop(tmp_path):
assert index_response.status_code == 200 assert index_response.status_code == 200
assert "Register Repository" in index_response.text assert "Register Repository" in index_response.text
assert "Registering repository..." in index_response.text assert "Registering repository..." in index_response.text
assert "Password or access token" in index_response.text
create_response = client.post( create_response = client.post(
"/ui/repos", "/ui/repos",
data={ data={
"url": str(source), "url": str(source),
"branch": "main", "branch": "main",
"access_username": "",
"access_password": "",
}, },
follow_redirects=False, follow_redirects=False,
) )