Milestone 2’s core deterministic scanner path

This commit is contained in:
2026-04-25 22:32:05 +02:00
parent 3b2d1667bb
commit 3d9032a386
11 changed files with 853 additions and 2 deletions

View File

@@ -46,3 +46,30 @@ Then add abilities, capabilities, features, and evidence under that repository a
curl http://127.0.0.1:8000/repos/1/ability-map
curl 'http://127.0.0.1:8000/search?q=classify'
```
## Deterministic Analysis
For local development, repository URLs may be local filesystem paths. Trigger a deterministic scan:
```bash
curl -X POST http://127.0.0.1:8000/repos/1/analysis-runs \
-H 'content-type: application/json' \
-d '{}'
```
Or override the scan source path explicitly:
```bash
curl -X POST http://127.0.0.1:8000/repos/1/analysis-runs \
-H 'content-type: application/json' \
-d '{"source_path":"/path/to/repository"}'
```
Inspect recorded facts:
```bash
curl http://127.0.0.1:8000/repos/1/analysis-runs
curl http://127.0.0.1:8000/repos/1/observed-facts
```
The deterministic scanner records observed facts only: languages, documentation files, examples, tests, package manifests, configuration files, framework hints, and likely API/CLI interfaces.

View File

@@ -9,6 +9,40 @@ CREATE TABLE IF NOT EXISTS repositories (
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS repository_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
repository_id INTEGER NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
commit_hash TEXT NOT NULL,
branch TEXT NOT NULL,
source_path TEXT NOT NULL,
file_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS analysis_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
repository_id INTEGER NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
snapshot_id INTEGER REFERENCES repository_snapshots(id) ON DELETE SET NULL,
status TEXT NOT NULL,
started_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
completed_at TEXT,
error_message TEXT,
scanner_version TEXT NOT NULL DEFAULT 'deterministic-v0.1'
);
CREATE TABLE IF NOT EXISTS observed_facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
repository_id INTEGER NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
analysis_run_id INTEGER NOT NULL REFERENCES analysis_runs(id) ON DELETE CASCADE,
snapshot_id INTEGER REFERENCES repository_snapshots(id) ON DELETE CASCADE,
kind TEXT NOT NULL,
path TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL,
value TEXT NOT NULL DEFAULT '',
metadata TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS approved_abilities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
repository_id INTEGER NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
@@ -52,6 +86,10 @@ CREATE TABLE IF NOT EXISTS approved_evidence (
);
CREATE INDEX IF NOT EXISTS idx_repositories_status ON repositories(status);
CREATE INDEX IF NOT EXISTS idx_snapshots_repository ON repository_snapshots(repository_id);
CREATE INDEX IF NOT EXISTS idx_analysis_runs_repository ON analysis_runs(repository_id);
CREATE INDEX IF NOT EXISTS idx_observed_facts_repository ON observed_facts(repository_id);
CREATE INDEX IF NOT EXISTS idx_observed_facts_run ON observed_facts(analysis_run_id);
CREATE INDEX IF NOT EXISTS idx_abilities_repository ON approved_abilities(repository_id);
CREATE INDEX IF NOT EXISTS idx_capabilities_repository ON approved_capabilities(repository_id);
CREATE INDEX IF NOT EXISTS idx_features_repository ON approved_features(repository_id);

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True)
@@ -13,6 +14,48 @@ class Repository:
status: str
@dataclass(frozen=True)
class RepositorySnapshot:
id: int
repository_id: int
commit_hash: str
branch: str
source_path: str
file_count: int
@dataclass(frozen=True)
class AnalysisRun:
id: int
repository_id: int
snapshot_id: int | None
status: str
started_at: str
completed_at: str | None
error_message: str | None
scanner_version: str
@dataclass(frozen=True)
class ObservedFact:
id: int
repository_id: int
analysis_run_id: int
snapshot_id: int | None
kind: str
path: str
name: str
value: str
metadata: dict[str, Any]
@dataclass(frozen=True)
class ScanSummary:
analysis_run: AnalysisRun
snapshot: RepositorySnapshot | None
facts: list[ObservedFact]
@dataclass(frozen=True)
class Evidence:
id: int

View File

@@ -2,7 +2,15 @@ from __future__ import annotations
from collections.abc import Sequence
from repo_registry.core.models import Repository, RepositoryAbilityMap, SearchResult
from repo_registry.core.models import (
AnalysisRun,
ObservedFact,
Repository,
RepositoryAbilityMap,
ScanSummary,
SearchResult,
)
from repo_registry.repo_scanning.scanner import DeterministicScanner
from repo_registry.storage.sqlite import RegistryStore
@@ -11,6 +19,7 @@ class RegistryService:
def __init__(self, store: RegistryStore) -> None:
self.store = store
self.scanner = DeterministicScanner()
def register_repository(
self,
@@ -33,6 +42,48 @@ class RegistryService:
def get_repository(self, repository_id: int) -> Repository:
return self.store.get_repository(repository_id)
def analyze_repository(
self,
repository_id: int,
*,
source_path: str | None = None,
) -> ScanSummary:
repository = self.store.get_repository(repository_id)
run = self.store.create_analysis_run(repository_id)
self.store.update_repository_status(repository_id, "analyzing")
try:
scan_result = self.scanner.scan(source_path or repository.url)
except Exception as exc:
failed_run = self.store.fail_analysis_run(repository_id, run.id, str(exc))
return ScanSummary(analysis_run=failed_run, snapshot=None, facts=[])
completed_run = self.store.complete_analysis_run(
repository_id,
run.id,
scan_result,
)
snapshot = (
self.store.get_snapshot(completed_run.snapshot_id)
if completed_run.snapshot_id is not None
else None
)
facts = self.store.list_observed_facts(repository_id, completed_run.id)
return ScanSummary(
analysis_run=completed_run,
snapshot=snapshot,
facts=facts,
)
def list_analysis_runs(self, repository_id: int) -> list[AnalysisRun]:
return self.store.list_analysis_runs(repository_id)
def list_observed_facts(
self,
repository_id: int,
analysis_run_id: int | None = None,
) -> list[ObservedFact]:
return self.store.list_observed_facts(repository_id, analysis_run_id)
def add_ability(
self,
repository_id: int,

View File

@@ -0,0 +1 @@
"""Deterministic repository scanning."""

View File

@@ -0,0 +1,271 @@
from __future__ import annotations
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
IGNORED_DIRS = {
".git",
".hg",
".mypy_cache",
".pytest_cache",
".ruff_cache",
".tox",
".venv",
"__pycache__",
"build",
"dist",
"node_modules",
"target",
"vendor",
}
LANGUAGE_BY_EXTENSION = {
".go": "Go",
".java": "Java",
".js": "JavaScript",
".jsx": "JavaScript",
".kt": "Kotlin",
".php": "PHP",
".py": "Python",
".rb": "Ruby",
".rs": "Rust",
".ts": "TypeScript",
".tsx": "TypeScript",
}
MANIFEST_FRAMEWORK_HINTS = {
"pyproject.toml": {
"fastapi": "FastAPI",
"django": "Django",
"flask": "Flask",
"typer": "Typer",
"click": "Click",
"pytest": "pytest",
},
"requirements.txt": {
"fastapi": "FastAPI",
"django": "Django",
"flask": "Flask",
"typer": "Typer",
"click": "Click",
"pytest": "pytest",
},
"package.json": {
"next": "Next.js",
"react": "React",
"express": "Express",
"vite": "Vite",
"jest": "Jest",
"vitest": "Vitest",
},
"Cargo.toml": {
"axum": "Axum",
"actix-web": "Actix Web",
"clap": "Clap",
"tokio": "Tokio",
},
}
@dataclass(frozen=True)
class FactCandidate:
kind: str
name: str
path: str = ""
value: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class ScanResult:
source_path: str
commit_hash: str
branch: str
file_count: int
facts: list[FactCandidate]
class DeterministicScanner:
version = "deterministic-v0.1"
def scan(self, source_path: str | Path) -> ScanResult:
root = Path(source_path).expanduser().resolve()
if not root.exists() or not root.is_dir():
raise ValueError(f"source path does not exist or is not a directory: {root}")
files = list(self._iter_files(root))
facts: list[FactCandidate] = []
facts.extend(self._language_facts(files, root))
facts.extend(self._classified_file_facts(files, root))
facts.extend(self._framework_facts(files, root))
facts.extend(self._interface_facts(files, root))
return ScanResult(
source_path=str(root),
commit_hash=self._git_value(root, "rev-parse", "HEAD") or "working-tree",
branch=self._git_value(root, "branch", "--show-current") or "unknown",
file_count=len(files),
facts=sorted(facts, key=lambda fact: (fact.kind, fact.path, fact.name)),
)
def _iter_files(self, root: Path) -> list[Path]:
files: list[Path] = []
for path in root.rglob("*"):
if not path.is_file():
continue
relative_parts = path.relative_to(root).parts
if any(part in IGNORED_DIRS for part in relative_parts):
continue
files.append(path)
return files
def _language_facts(self, files: list[Path], root: Path) -> list[FactCandidate]:
counts: dict[str, int] = {}
for path in files:
language = LANGUAGE_BY_EXTENSION.get(path.suffix)
if language is None:
continue
counts[language] = counts.get(language, 0) + 1
return [
FactCandidate(
kind="language",
name=language,
value=str(count),
metadata={"file_count": count},
)
for language, count in counts.items()
]
def _classified_file_facts(
self, files: list[Path], root: Path
) -> list[FactCandidate]:
facts: list[FactCandidate] = []
for path in files:
relative = path.relative_to(root).as_posix()
lower = relative.lower()
name = path.name.lower()
if name.startswith("readme"):
facts.append(FactCandidate("documentation", "README", relative))
elif lower.startswith("docs/") or lower.startswith("doc/"):
facts.append(FactCandidate("documentation", path.name, relative))
if lower.startswith("examples/") or lower.startswith("example/"):
facts.append(FactCandidate("example", path.name, relative))
if (
lower.startswith("tests/")
or lower.startswith("test/")
or name.startswith("test_")
or name.endswith("_test.py")
or name.endswith(".test.ts")
or name.endswith(".spec.ts")
):
facts.append(FactCandidate("test", path.name, relative))
if name in MANIFEST_FRAMEWORK_HINTS or name in {
"requirements.txt",
"poetry.lock",
"package-lock.json",
"pnpm-lock.yaml",
"yarn.lock",
"go.mod",
}:
facts.append(FactCandidate("manifest", path.name, relative))
if lower.endswith((".yaml", ".yml", ".toml", ".ini", ".env.example")):
facts.append(FactCandidate("config", path.name, relative))
return facts
def _framework_facts(self, files: list[Path], root: Path) -> list[FactCandidate]:
facts: list[FactCandidate] = []
seen: set[tuple[str, str]] = set()
for path in files:
hints = MANIFEST_FRAMEWORK_HINTS.get(path.name)
if hints is None:
continue
try:
text = path.read_text(encoding="utf-8", errors="ignore").lower()
except OSError:
continue
for needle, framework in hints.items():
if needle not in text:
continue
key = (framework, path.relative_to(root).as_posix())
if key in seen:
continue
seen.add(key)
facts.append(
FactCandidate(
kind="framework",
name=framework,
path=path.relative_to(root).as_posix(),
metadata={"source": "manifest_hint", "needle": needle},
)
)
return facts
def _interface_facts(self, files: list[Path], root: Path) -> list[FactCandidate]:
facts: list[FactCandidate] = []
for path in files:
relative = path.relative_to(root).as_posix()
lower = relative.lower()
if path.suffix == ".py":
facts.extend(self._python_interface_facts(path, relative))
if "cli" in lower or lower.endswith("/commands.py"):
facts.append(FactCandidate("interface", "possible CLI", relative))
if "routes" in lower or "api" in lower:
facts.append(FactCandidate("interface", "possible API surface", relative))
return facts
def _python_interface_facts(self, path: Path, relative: str) -> list[FactCandidate]:
facts: list[FactCandidate] = []
try:
lines = path.read_text(encoding="utf-8", errors="ignore").splitlines()
except OSError:
return facts
for line_number, line in enumerate(lines, start=1):
stripped = line.strip()
if stripped.startswith("@app.") or stripped.startswith("@router."):
facts.append(
FactCandidate(
kind="interface",
name="python route decorator",
path=relative,
value=stripped,
metadata={"line": line_number},
)
)
elif stripped.startswith("@click.command") or stripped.startswith("@app.command"):
facts.append(
FactCandidate(
kind="interface",
name="python CLI command decorator",
path=relative,
value=stripped,
metadata={"line": line_number},
)
)
return facts
def _git_value(self, root: Path, *args: str) -> str | None:
try:
result = subprocess.run(
["git", *args],
cwd=root,
check=False,
capture_output=True,
text=True,
timeout=5,
)
except (OSError, subprocess.SubprocessError):
return None
if result.returncode != 0:
return None
return result.stdout.strip() or None

View File

@@ -3,17 +3,20 @@ from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from typing import Any
from repo_registry.core.models import (
Ability,
AnalysisRun,
Capability,
Evidence,
Feature,
ObservedFact,
Repository,
RepositoryAbilityMap,
RepositorySnapshot,
SearchResult,
)
from repo_registry.repo_scanning.scanner import FactCandidate, ScanResult
class NotFoundError(ValueError):
@@ -54,6 +57,19 @@ class RegistryStore:
repository_id = int(cursor.lastrowid)
return self.get_repository(repository_id)
def update_repository_status(self, repository_id: int, status: str) -> None:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE repositories
SET status = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(status, repository_id),
)
if cursor.rowcount == 0:
raise NotFoundError(f"repository {repository_id} was not found")
def list_repositories(self) -> list[Repository]:
with self.connect() as connection:
rows = connection.execute(
@@ -79,6 +95,172 @@ class RegistryStore:
raise NotFoundError(f"repository {repository_id} was not found")
return self._repository_from_row(row)
def create_analysis_run(self, repository_id: int) -> AnalysisRun:
self.get_repository(repository_id)
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO analysis_runs (repository_id, status)
VALUES (?, 'running')
""",
(repository_id,),
)
analysis_run_id = int(cursor.lastrowid)
return self.get_analysis_run(repository_id, analysis_run_id)
def complete_analysis_run(
self,
repository_id: int,
analysis_run_id: int,
scan_result: ScanResult,
) -> AnalysisRun:
with self.connect() as connection:
snapshot_cursor = connection.execute(
"""
INSERT INTO repository_snapshots
(repository_id, commit_hash, branch, source_path, file_count)
VALUES (?, ?, ?, ?, ?)
""",
(
repository_id,
scan_result.commit_hash,
scan_result.branch,
scan_result.source_path,
scan_result.file_count,
),
)
snapshot_id = int(snapshot_cursor.lastrowid)
self._insert_facts(
connection,
repository_id=repository_id,
analysis_run_id=analysis_run_id,
snapshot_id=snapshot_id,
facts=scan_result.facts,
)
connection.execute(
"""
UPDATE analysis_runs
SET status = 'completed',
snapshot_id = ?,
completed_at = CURRENT_TIMESTAMP,
error_message = NULL
WHERE id = ? AND repository_id = ?
""",
(snapshot_id, analysis_run_id, repository_id),
)
connection.execute(
"""
UPDATE repositories
SET status = 'analyzed', updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(repository_id,),
)
return self.get_analysis_run(repository_id, analysis_run_id)
def fail_analysis_run(
self,
repository_id: int,
analysis_run_id: int,
error_message: str,
) -> AnalysisRun:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE analysis_runs
SET status = 'failed',
completed_at = CURRENT_TIMESTAMP,
error_message = ?
WHERE id = ? AND repository_id = ?
""",
(error_message, analysis_run_id, repository_id),
)
connection.execute(
"""
UPDATE repositories
SET status = 'analysis_failed', updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(repository_id,),
)
if cursor.rowcount == 0:
raise NotFoundError(
f"analysis run {analysis_run_id} was not found for repository {repository_id}"
)
return self.get_analysis_run(repository_id, analysis_run_id)
def get_analysis_run(self, repository_id: int, analysis_run_id: int) -> AnalysisRun:
with self.connect() as connection:
row = connection.execute(
"""
SELECT id, repository_id, snapshot_id, status, started_at,
completed_at, error_message, scanner_version
FROM analysis_runs
WHERE id = ? AND repository_id = ?
""",
(analysis_run_id, repository_id),
).fetchone()
if row is None:
raise NotFoundError(
f"analysis run {analysis_run_id} was not found for repository {repository_id}"
)
return self._analysis_run_from_row(row)
def list_analysis_runs(self, repository_id: int) -> list[AnalysisRun]:
self.get_repository(repository_id)
with self.connect() as connection:
rows = connection.execute(
"""
SELECT id, repository_id, snapshot_id, status, started_at,
completed_at, error_message, scanner_version
FROM analysis_runs
WHERE repository_id = ?
ORDER BY started_at DESC, id DESC
""",
(repository_id,),
).fetchall()
return [self._analysis_run_from_row(row) for row in rows]
def get_snapshot(self, snapshot_id: int) -> RepositorySnapshot:
with self.connect() as connection:
row = connection.execute(
"""
SELECT id, repository_id, commit_hash, branch, source_path, file_count
FROM repository_snapshots
WHERE id = ?
""",
(snapshot_id,),
).fetchone()
if row is None:
raise NotFoundError(f"snapshot {snapshot_id} was not found")
return self._snapshot_from_row(row)
def list_observed_facts(
self,
repository_id: int,
analysis_run_id: int | None = None,
) -> list[ObservedFact]:
self.get_repository(repository_id)
params: tuple[int, ...]
where = "WHERE repository_id = ?"
params = (repository_id,)
if analysis_run_id is not None:
where += " AND analysis_run_id = ?"
params = (repository_id, analysis_run_id)
with self.connect() as connection:
rows = connection.execute(
f"""
SELECT id, repository_id, analysis_run_id, snapshot_id, kind,
path, name, value, metadata
FROM observed_facts
{where}
ORDER BY kind ASC, path ASC, name ASC, id ASC
""",
params,
).fetchall()
return [self._observed_fact_from_row(row) for row in rows]
def create_ability(
self,
repository_id: int,
@@ -326,6 +508,36 @@ class RegistryStore:
for row in rows
]
def _insert_facts(
self,
connection: sqlite3.Connection,
*,
repository_id: int,
analysis_run_id: int,
snapshot_id: int,
facts: list[FactCandidate],
) -> None:
connection.executemany(
"""
INSERT INTO observed_facts
(repository_id, analysis_run_id, snapshot_id, kind, path, name, value, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(
repository_id,
analysis_run_id,
snapshot_id,
fact.kind,
fact.path,
fact.name,
fact.value,
json.dumps(fact.metadata),
)
for fact in facts
],
)
@staticmethod
def _repository_from_row(row: sqlite3.Row) -> Repository:
return Repository(
@@ -336,3 +548,41 @@ class RegistryStore:
branch=row["branch"],
status=row["status"],
)
@staticmethod
def _snapshot_from_row(row: sqlite3.Row) -> RepositorySnapshot:
return RepositorySnapshot(
id=row["id"],
repository_id=row["repository_id"],
commit_hash=row["commit_hash"],
branch=row["branch"],
source_path=row["source_path"],
file_count=row["file_count"],
)
@staticmethod
def _analysis_run_from_row(row: sqlite3.Row) -> AnalysisRun:
return AnalysisRun(
id=row["id"],
repository_id=row["repository_id"],
snapshot_id=row["snapshot_id"],
status=row["status"],
started_at=row["started_at"],
completed_at=row["completed_at"],
error_message=row["error_message"],
scanner_version=row["scanner_version"],
)
@staticmethod
def _observed_fact_from_row(row: sqlite3.Row) -> ObservedFact:
return ObservedFact(
id=row["id"],
repository_id=row["repository_id"],
analysis_run_id=row["analysis_run_id"],
snapshot_id=row["snapshot_id"],
kind=row["kind"],
path=row["path"],
name=row["name"],
value=row["value"],
metadata=json.loads(row["metadata"]),
)

View File

@@ -63,6 +63,10 @@ class EvidenceCreate(BaseModel):
strength: str = "medium"
class AnalysisRunCreate(BaseModel):
source_path: str | None = None
app = FastAPI(title="Repository Ability Registry", version="0.1.0")
@@ -101,6 +105,48 @@ def get_repository(
raise HTTPException(status_code=404, detail=str(exc)) from exc
@app.post("/repos/{repository_id}/analysis-runs", status_code=201)
def create_analysis_run(
repository_id: int,
payload: AnalysisRunCreate,
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
summary = service.analyze_repository(
repository_id,
source_path=payload.source_path,
)
except NotFoundError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
return asdict(summary)
@app.get("/repos/{repository_id}/analysis-runs")
def list_analysis_runs(
repository_id: int,
service: RegistryService = Depends(get_service),
) -> list[dict[str, object]]:
try:
return [asdict(run) for run in service.list_analysis_runs(repository_id)]
except NotFoundError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
@app.get("/repos/{repository_id}/observed-facts")
def list_observed_facts(
repository_id: int,
analysis_run_id: int | None = None,
service: RegistryService = Depends(get_service),
) -> list[dict[str, object]]:
try:
return [
asdict(fact)
for fact in service.list_observed_facts(repository_id, analysis_run_id)
]
except NotFoundError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
@app.post("/repos/{repository_id}/abilities", status_code=201)
def create_ability(
repository_id: int,

View File

@@ -96,3 +96,51 @@ def test_capability_must_belong_to_repository(tmp_path):
assert "ability" in str(exc)
else:
raise AssertionError("expected a NotFoundError")
def test_analyze_repository_records_snapshot_and_observed_facts(tmp_path):
source = tmp_path / "repo"
source.mkdir()
(source / "README.md").write_text("# Example\n", encoding="utf-8")
(source / "requirements.txt").write_text("fastapi\n", encoding="utf-8")
(source / "app.py").write_text(
"from fastapi import FastAPI\n"
"app = FastAPI()\n"
'@app.get("/health")\n'
"def health():\n"
" return {}\n",
encoding="utf-8",
)
service = make_service(tmp_path)
repository = service.register_repository(
name="Example",
url=str(source),
description="A local fixture repository",
)
summary = service.analyze_repository(repository.id)
assert summary.analysis_run.status == "completed"
assert summary.snapshot is not None
assert summary.snapshot.file_count == 3
assert service.get_repository(repository.id).status == "analyzed"
fact_names = {(fact.kind, fact.name, fact.path) for fact in summary.facts}
assert ("documentation", "README", "README.md") in fact_names
assert ("framework", "FastAPI", "requirements.txt") in fact_names
assert ("interface", "python route decorator", "app.py") in fact_names
def test_analyze_repository_failure_is_recorded(tmp_path):
service = make_service(tmp_path)
repository = service.register_repository(
name="Missing",
url=str(tmp_path / "does-not-exist"),
)
summary = service.analyze_repository(repository.id)
assert summary.analysis_run.status == "failed"
assert summary.snapshot is None
assert "does not exist" in (summary.analysis_run.error_message or "")
assert service.get_repository(repository.id).status == "analysis_failed"

View File

@@ -0,0 +1,36 @@
from repo_registry.repo_scanning.scanner import DeterministicScanner
def test_deterministic_scanner_extracts_structural_facts(tmp_path):
repo = tmp_path / "sample"
repo.mkdir()
(repo / "README.md").write_text("# MailRouter\n", encoding="utf-8")
(repo / "pyproject.toml").write_text(
'[project]\ndependencies = ["fastapi", "pytest"]\n',
encoding="utf-8",
)
(repo / "src").mkdir()
(repo / "src" / "routes.py").write_text(
"from fastapi import APIRouter\n"
"router = APIRouter()\n"
'@router.post("/classify-email")\n'
"def classify_email():\n"
" return {}\n",
encoding="utf-8",
)
(repo / "tests").mkdir()
(repo / "tests" / "test_routes.py").write_text("def test_ok(): pass\n", encoding="utf-8")
result = DeterministicScanner().scan(repo)
facts = {(fact.kind, fact.name, fact.path) for fact in result.facts}
assert result.file_count == 4
assert ("documentation", "README", "README.md") in facts
assert ("manifest", "pyproject.toml", "pyproject.toml") in facts
assert ("test", "test_routes.py", "tests/test_routes.py") in facts
assert ("framework", "FastAPI", "pyproject.toml") in facts
assert ("framework", "pytest", "pyproject.toml") in facts
assert ("interface", "python route decorator", "src/routes.py") in facts
languages = {fact.name: fact.metadata["file_count"] for fact in result.facts if fact.kind == "language"}
assert languages == {"Python": 2}

View File

@@ -67,3 +67,43 @@ def test_api_manual_registry_loop(tmp_path):
assert search_response.json()
finally:
app.dependency_overrides.clear()
def test_api_analysis_run_loop(tmp_path):
source = tmp_path / "repo"
source.mkdir()
(source / "README.md").write_text("# Searchable\n", encoding="utf-8")
(source / "package.json").write_text(
'{"dependencies":{"react":"latest","vite":"latest"}}',
encoding="utf-8",
)
def override_settings():
return Settings(database_path=str(tmp_path / "api-analysis.sqlite3"))
app.dependency_overrides[get_settings] = override_settings
client = TestClient(app)
try:
repository_response = client.post(
"/repos",
json={"name": "Frontend", "url": str(source)},
)
repository_id = repository_response.json()["id"]
run_response = client.post(f"/repos/{repository_id}/analysis-runs", json={})
assert run_response.status_code == 201
run = run_response.json()
assert run["analysis_run"]["status"] == "completed"
assert run["snapshot"]["file_count"] == 2
facts_response = client.get(f"/repos/{repository_id}/observed-facts")
assert facts_response.status_code == 200
fact_names = {
(fact["kind"], fact["name"], fact["path"])
for fact in facts_response.json()
}
assert ("documentation", "README", "README.md") in fact_names
assert ("framework", "React", "package.json") in fact_names
assert ("framework", "Vite", "package.json") in fact_names
finally:
app.dependency_overrides.clear()