Milestone 0 plus the manual-registry spine from Milestone 1

This commit is contained in:
2026-04-25 22:00:26 +02:00
parent a833d4f82c
commit 3b2d1667bb
13 changed files with 1000 additions and 2 deletions

View File

@@ -0,0 +1,338 @@
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from typing import Any
from repo_registry.core.models import (
Ability,
Capability,
Evidence,
Feature,
Repository,
RepositoryAbilityMap,
SearchResult,
)
class NotFoundError(ValueError):
pass
class RegistryStore:
def __init__(self, database_path: str | Path) -> None:
self.database_path = str(database_path)
def initialize(self) -> None:
migration_path = Path(__file__).parents[3] / "migrations" / "0001_initial.sql"
with self.connect() as connection:
connection.executescript(migration_path.read_text(encoding="utf-8"))
def connect(self) -> sqlite3.Connection:
connection = sqlite3.connect(self.database_path)
connection.row_factory = sqlite3.Row
connection.execute("PRAGMA foreign_keys = ON")
return connection
def create_repository(
self,
*,
name: str,
url: str,
description: str | None,
branch: str,
) -> Repository:
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO repositories (name, url, description, branch)
VALUES (?, ?, ?, ?)
""",
(name, url, description, branch),
)
repository_id = int(cursor.lastrowid)
return self.get_repository(repository_id)
def list_repositories(self) -> list[Repository]:
with self.connect() as connection:
rows = connection.execute(
"""
SELECT id, name, url, description, branch, status
FROM repositories
ORDER BY created_at DESC, id DESC
"""
).fetchall()
return [self._repository_from_row(row) for row in rows]
def get_repository(self, repository_id: int) -> Repository:
with self.connect() as connection:
row = connection.execute(
"""
SELECT id, name, url, description, branch, status
FROM repositories
WHERE id = ?
""",
(repository_id,),
).fetchone()
if row is None:
raise NotFoundError(f"repository {repository_id} was not found")
return self._repository_from_row(row)
def create_ability(
self,
repository_id: int,
*,
name: str,
description: str,
confidence: float,
) -> int:
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO approved_abilities
(repository_id, name, description, confidence)
VALUES (?, ?, ?, ?)
""",
(repository_id, name, description, confidence),
)
return int(cursor.lastrowid)
def ensure_ability(self, repository_id: int, ability_id: int) -> None:
with self.connect() as connection:
row = connection.execute(
"""
SELECT id FROM approved_abilities
WHERE id = ? AND repository_id = ?
""",
(ability_id, repository_id),
).fetchone()
if row is None:
raise NotFoundError(
f"ability {ability_id} was not found for repository {repository_id}"
)
def create_capability(
self,
repository_id: int,
ability_id: int,
*,
name: str,
description: str,
inputs: list[str],
outputs: list[str],
confidence: float,
) -> int:
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO approved_capabilities
(repository_id, ability_id, name, description, inputs, outputs, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
ability_id,
name,
description,
json.dumps(inputs),
json.dumps(outputs),
confidence,
),
)
return int(cursor.lastrowid)
def ensure_capability(self, repository_id: int, capability_id: int) -> None:
with self.connect() as connection:
row = connection.execute(
"""
SELECT id FROM approved_capabilities
WHERE id = ? AND repository_id = ?
""",
(capability_id, repository_id),
).fetchone()
if row is None:
raise NotFoundError(
f"capability {capability_id} was not found for repository {repository_id}"
)
def create_feature(
self,
repository_id: int,
capability_id: int,
*,
name: str,
type: str,
location: str,
confidence: float,
) -> int:
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO approved_features
(repository_id, capability_id, name, type, location, confidence)
VALUES (?, ?, ?, ?, ?, ?)
""",
(repository_id, capability_id, name, type, location, confidence),
)
return int(cursor.lastrowid)
def create_evidence(
self,
repository_id: int,
capability_id: int,
*,
type: str,
reference: str,
strength: str,
) -> int:
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO approved_evidence
(repository_id, capability_id, type, reference, strength)
VALUES (?, ?, ?, ?, ?)
""",
(repository_id, capability_id, type, reference, strength),
)
return int(cursor.lastrowid)
def get_ability_map(self, repository_id: int) -> RepositoryAbilityMap:
repository = self.get_repository(repository_id)
with self.connect() as connection:
ability_rows = connection.execute(
"""
SELECT id, name, description, confidence
FROM approved_abilities
WHERE repository_id = ?
ORDER BY id
""",
(repository_id,),
).fetchall()
capability_rows = connection.execute(
"""
SELECT id, ability_id, name, description, inputs, outputs, confidence
FROM approved_capabilities
WHERE repository_id = ?
ORDER BY id
""",
(repository_id,),
).fetchall()
feature_rows = connection.execute(
"""
SELECT id, capability_id, name, type, location, confidence
FROM approved_features
WHERE repository_id = ?
ORDER BY id
""",
(repository_id,),
).fetchall()
evidence_rows = connection.execute(
"""
SELECT id, capability_id, type, reference, strength
FROM approved_evidence
WHERE repository_id = ?
ORDER BY id
""",
(repository_id,),
).fetchall()
features_by_capability: dict[int, list[Feature]] = {}
for row in feature_rows:
features_by_capability.setdefault(row["capability_id"], []).append(
Feature(
id=row["id"],
name=row["name"],
type=row["type"],
location=row["location"],
confidence=row["confidence"],
)
)
evidence_by_capability: dict[int, list[Evidence]] = {}
for row in evidence_rows:
evidence_by_capability.setdefault(row["capability_id"], []).append(
Evidence(
id=row["id"],
type=row["type"],
reference=row["reference"],
strength=row["strength"],
)
)
capabilities_by_ability: dict[int, list[Capability]] = {}
for row in capability_rows:
capabilities_by_ability.setdefault(row["ability_id"], []).append(
Capability(
id=row["id"],
name=row["name"],
description=row["description"],
inputs=json.loads(row["inputs"]),
outputs=json.loads(row["outputs"]),
confidence=row["confidence"],
features=features_by_capability.get(row["id"], []),
evidence=evidence_by_capability.get(row["id"], []),
)
)
abilities = [
Ability(
id=row["id"],
name=row["name"],
description=row["description"],
confidence=row["confidence"],
capabilities=capabilities_by_ability.get(row["id"], []),
)
for row in ability_rows
]
return RepositoryAbilityMap(repository=repository, abilities=abilities)
def search(self, query: str) -> list[SearchResult]:
needle = f"%{query.strip()}%"
if needle == "%%":
return []
with self.connect() as connection:
rows = connection.execute(
"""
SELECT r.id AS repository_id, r.name AS repository_name,
'repository' AS match_type, r.name AS match_name,
1.0 AS confidence
FROM repositories r
WHERE r.name LIKE ? OR COALESCE(r.description, '') LIKE ?
UNION ALL
SELECT r.id, r.name, 'ability', a.name, a.confidence
FROM approved_abilities a
JOIN repositories r ON r.id = a.repository_id
WHERE a.name LIKE ? OR a.description LIKE ?
UNION ALL
SELECT r.id, r.name, 'capability', c.name, c.confidence
FROM approved_capabilities c
JOIN repositories r ON r.id = c.repository_id
WHERE c.name LIKE ? OR c.description LIKE ?
ORDER BY confidence DESC, repository_name ASC, match_name ASC
""",
(needle, needle, needle, needle, needle, needle),
).fetchall()
return [
SearchResult(
repository_id=row["repository_id"],
repository_name=row["repository_name"],
match_type=row["match_type"],
match_name=row["match_name"],
confidence=row["confidence"],
)
for row in rows
]
@staticmethod
def _repository_from_row(row: sqlite3.Row) -> Repository:
return Repository(
id=row["id"],
name=row["name"],
url=row["url"],
description=row["description"],
branch=row["branch"],
status=row["status"],
)