Files
repo-scoping/src/repo_registry/storage/sqlite.py

2870 lines
105 KiB
Python

from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from repo_registry.core.models import (
Ability,
AbilitySummary,
AnalysisRun,
CandidateAbility,
CandidateCapability,
CandidateEvidence,
CandidateFeature,
CandidateGraph,
Capability,
CapabilitySummary,
ContentChunk,
Evidence,
ExpectationGap,
Feature,
ObservedFact,
Repository,
RepositoryAbilityMap,
RepositorySnapshot,
ReviewDecision,
SearchResult,
Scope,
SourceReference,
confidence_label,
)
from repo_registry.core.logging import log_operation
from repo_registry.content_indexing.extractor import ContentChunkCandidate
from repo_registry.candidate_graph.generator import CandidateAbilityDraft
from repo_registry.repo_scanning.scanner import FactCandidate, ScanResult
class NotFoundError(ValueError):
pass
class RegistryStore:
def __init__(self, database_path: str | Path) -> None:
self.database_path = str(database_path)
def initialize(self) -> None:
migration_path = Path(__file__).parents[3] / "migrations" / "0001_initial.sql"
with self.connect() as connection:
connection.executescript(migration_path.read_text(encoding="utf-8"))
self._ensure_content_chunks_table(connection)
self._ensure_repository_scopes_table(connection)
self._ensure_approved_source_ref_columns(connection)
self._ensure_evidence_relationship_columns(connection)
self._ensure_characteristic_classification_columns(connection)
self._ensure_expectation_gaps_table(connection)
def connect(self) -> sqlite3.Connection:
connection = sqlite3.connect(self.database_path)
connection.row_factory = sqlite3.Row
connection.execute("PRAGMA foreign_keys = ON")
return connection
def _ensure_approved_source_ref_columns(
self,
connection: sqlite3.Connection,
) -> None:
for table in ("approved_features", "approved_evidence"):
columns = {
row["name"]
for row in connection.execute(f"PRAGMA table_info({table})").fetchall()
}
if "source_refs" not in columns:
connection.execute(
f"ALTER TABLE {table} ADD COLUMN source_refs TEXT NOT NULL DEFAULT '[]'"
)
def _ensure_evidence_relationship_columns(
self,
connection: sqlite3.Connection,
) -> None:
for table in ("candidate_evidence", "approved_evidence"):
columns = {
row["name"]
for row in connection.execute(f"PRAGMA table_info({table})").fetchall()
}
if "target_kind" not in columns:
connection.execute(
f"ALTER TABLE {table} ADD COLUMN target_kind TEXT NOT NULL DEFAULT 'capability'"
)
if "target_id" not in columns:
connection.execute(f"ALTER TABLE {table} ADD COLUMN target_id INTEGER")
if "reference_kind" not in columns:
connection.execute(
f"ALTER TABLE {table} ADD COLUMN reference_kind TEXT NOT NULL DEFAULT 'source'"
)
if "reference_id" not in columns:
connection.execute(
f"ALTER TABLE {table} ADD COLUMN reference_id INTEGER"
)
connection.execute(
f"""
UPDATE {table}
SET target_kind = COALESCE(NULLIF(target_kind, ''), 'capability'),
target_id = COALESCE(target_id, capability_id),
reference_kind = COALESCE(NULLIF(reference_kind, ''), 'source')
WHERE target_id IS NULL
OR target_kind = ''
OR reference_kind = ''
"""
)
def _ensure_characteristic_classification_columns(
self,
connection: sqlite3.Connection,
) -> None:
defaults = {
"candidate_abilities": "ability",
"approved_abilities": "ability",
"candidate_capabilities": "capability",
"approved_capabilities": "capability",
"candidate_features": "",
"approved_features": "",
}
for table, default_class in defaults.items():
columns = {
row["name"]
for row in connection.execute(f"PRAGMA table_info({table})").fetchall()
}
if "primary_class" not in columns:
connection.execute(
f"ALTER TABLE {table} ADD COLUMN primary_class TEXT NOT NULL DEFAULT '{default_class}'"
)
if "attributes" not in columns:
connection.execute(
f"ALTER TABLE {table} ADD COLUMN attributes TEXT NOT NULL DEFAULT '[]'"
)
for table in ("candidate_abilities", "approved_abilities"):
connection.execute(
f"""
UPDATE {table}
SET primary_class = COALESCE(NULLIF(primary_class, ''), 'ability'),
attributes = COALESCE(NULLIF(attributes, ''), '[]')
WHERE primary_class = '' OR attributes = ''
"""
)
for table in ("candidate_capabilities", "approved_capabilities"):
connection.execute(
f"""
UPDATE {table}
SET primary_class = COALESCE(NULLIF(primary_class, ''), 'capability'),
attributes = COALESCE(NULLIF(attributes, ''), '[]')
WHERE primary_class = '' OR attributes = ''
"""
)
for table in ("candidate_features", "approved_features"):
connection.execute(
f"""
UPDATE {table}
SET primary_class = COALESCE(NULLIF(primary_class, ''), type),
attributes = COALESCE(NULLIF(attributes, ''), json_array(type))
WHERE primary_class = '' OR attributes = ''
"""
)
def _ensure_content_chunks_table(self, connection: sqlite3.Connection) -> None:
connection.execute(
"""
CREATE TABLE IF NOT EXISTS content_chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
repository_id INTEGER NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
analysis_run_id INTEGER NOT NULL REFERENCES analysis_runs(id) ON DELETE CASCADE,
snapshot_id INTEGER REFERENCES repository_snapshots(id) ON DELETE CASCADE,
path TEXT NOT NULL,
kind TEXT NOT NULL,
start_line INTEGER NOT NULL,
end_line INTEGER NOT NULL,
text TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""
)
columns = {
row["name"]
for row in connection.execute("PRAGMA table_info(content_chunks)").fetchall()
}
if "metadata" not in columns:
connection.execute(
"ALTER TABLE content_chunks ADD COLUMN metadata TEXT NOT NULL DEFAULT '{}'"
)
connection.execute(
"CREATE INDEX IF NOT EXISTS idx_content_chunks_repository ON content_chunks(repository_id)"
)
connection.execute(
"CREATE INDEX IF NOT EXISTS idx_content_chunks_run ON content_chunks(analysis_run_id)"
)
def _ensure_repository_scopes_table(self, connection: sqlite3.Connection) -> None:
connection.execute(
"""
CREATE TABLE IF NOT EXISTS repository_scopes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
repository_id INTEGER NOT NULL UNIQUE REFERENCES repositories(id) ON DELETE CASCADE,
name TEXT NOT NULL,
description TEXT NOT NULL DEFAULT '',
confidence REAL NOT NULL DEFAULT 1.0,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""
)
connection.execute(
"CREATE INDEX IF NOT EXISTS idx_scopes_repository ON repository_scopes(repository_id)"
)
def _ensure_expectation_gaps_table(self, connection: sqlite3.Connection) -> None:
connection.execute(
"""
CREATE TABLE IF NOT EXISTS expectation_gaps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
repository_id INTEGER NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
analysis_run_id INTEGER REFERENCES analysis_runs(id) ON DELETE SET NULL,
expected_type TEXT NOT NULL,
expected_name TEXT NOT NULL,
source TEXT NOT NULL,
notes TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'open',
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""
)
connection.execute(
"CREATE INDEX IF NOT EXISTS idx_expectation_gaps_repository ON expectation_gaps(repository_id)"
)
connection.execute(
"CREATE INDEX IF NOT EXISTS idx_expectation_gaps_run ON expectation_gaps(analysis_run_id)"
)
def create_repository(
self,
*,
name: str,
url: str,
description: str | None,
branch: str,
) -> Repository:
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO repositories (name, url, description, branch)
VALUES (?, ?, ?, ?)
""",
(name, url, description, branch),
)
repository_id = int(cursor.lastrowid)
connection.execute(
"""
INSERT INTO repository_scopes
(repository_id, name, description, confidence)
VALUES (?, ?, ?, 1.0)
""",
(repository_id, name, description or ""),
)
return self.get_repository(repository_id)
def update_repository(
self,
repository_id: int,
*,
name: str | None = None,
description: str | None = None,
branch: str | None = None,
) -> Repository:
self.get_repository(repository_id)
assignments: list[str] = []
values: list[str | int | None] = []
if name is not None:
assignments.append("name = ?")
values.append(name)
if description is not None:
assignments.append("description = ?")
values.append(description)
if branch is not None:
assignments.append("branch = ?")
values.append(branch)
if not assignments:
return self.get_repository(repository_id)
values.append(repository_id)
with self.connect() as connection:
cursor = connection.execute(
f"""
UPDATE repositories
SET {", ".join(assignments)}, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
values,
)
if cursor.rowcount == 0:
raise NotFoundError(f"repository {repository_id} was not found")
return self.get_repository(repository_id)
def delete_repository(self, repository_id: int) -> None:
with self.connect() as connection:
cursor = connection.execute(
"DELETE FROM repositories WHERE id = ?",
(repository_id,),
)
if cursor.rowcount == 0:
raise NotFoundError(f"repository {repository_id} was not found")
def update_repository_status(self, repository_id: int, status: str) -> None:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE repositories
SET status = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(status, repository_id),
)
if cursor.rowcount == 0:
raise NotFoundError(f"repository {repository_id} was not found")
def list_repositories(self) -> list[Repository]:
with self.connect() as connection:
rows = connection.execute(
"""
SELECT id, name, url, description, branch, status
FROM repositories
ORDER BY created_at DESC, id DESC
"""
).fetchall()
return [self._repository_from_row(row) for row in rows]
def get_repository(self, repository_id: int) -> Repository:
with self.connect() as connection:
row = connection.execute(
"""
SELECT id, name, url, description, branch, status
FROM repositories
WHERE id = ?
""",
(repository_id,),
).fetchone()
if row is None:
raise NotFoundError(f"repository {repository_id} was not found")
return self._repository_from_row(row)
def create_analysis_run(self, repository_id: int) -> AnalysisRun:
self.get_repository(repository_id)
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO analysis_runs (repository_id, status)
VALUES (?, 'running')
""",
(repository_id,),
)
analysis_run_id = int(cursor.lastrowid)
return self.get_analysis_run(repository_id, analysis_run_id)
def complete_analysis_run(
self,
repository_id: int,
analysis_run_id: int,
scan_result: ScanResult,
) -> AnalysisRun:
with self.connect() as connection:
snapshot_cursor = connection.execute(
"""
INSERT INTO repository_snapshots
(repository_id, commit_hash, branch, source_path, file_count)
VALUES (?, ?, ?, ?, ?)
""",
(
repository_id,
scan_result.commit_hash,
scan_result.branch,
scan_result.source_path,
scan_result.file_count,
),
)
snapshot_id = int(snapshot_cursor.lastrowid)
self._insert_facts(
connection,
repository_id=repository_id,
analysis_run_id=analysis_run_id,
snapshot_id=snapshot_id,
facts=scan_result.facts,
)
connection.execute(
"""
UPDATE analysis_runs
SET status = 'completed',
snapshot_id = ?,
completed_at = CURRENT_TIMESTAMP,
error_message = NULL
WHERE id = ? AND repository_id = ?
""",
(snapshot_id, analysis_run_id, repository_id),
)
connection.execute(
"""
UPDATE repositories
SET status = 'analyzed', updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(repository_id,),
)
return self.get_analysis_run(repository_id, analysis_run_id)
def replace_candidate_graph(
self,
repository_id: int,
analysis_run_id: int,
abilities: list[CandidateAbilityDraft],
) -> None:
with self.connect() as connection:
connection.execute(
"DELETE FROM candidate_abilities WHERE analysis_run_id = ?",
(analysis_run_id,),
)
for ability in abilities:
ability_cursor = connection.execute(
"""
INSERT INTO candidate_abilities
(repository_id, analysis_run_id, name, description, primary_class,
attributes, confidence, source_refs)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
analysis_run_id,
ability.name,
ability.description,
ability.primary_class or "ability",
self._attributes_to_json(ability.attributes),
ability.confidence,
self._source_refs_to_json(ability.source_refs),
),
)
ability_id = int(ability_cursor.lastrowid)
for capability in ability.capabilities:
capability_cursor = connection.execute(
"""
INSERT INTO candidate_capabilities
(repository_id, analysis_run_id, ability_id, name, description,
inputs, outputs, primary_class, attributes, confidence, source_refs)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
analysis_run_id,
ability_id,
capability.name,
capability.description,
json.dumps(capability.inputs),
json.dumps(capability.outputs),
capability.primary_class or "capability",
self._attributes_to_json(capability.attributes),
capability.confidence,
self._source_refs_to_json(capability.source_refs),
),
)
capability_id = int(capability_cursor.lastrowid)
for feature in capability.features:
connection.execute(
"""
INSERT INTO candidate_features
(repository_id, analysis_run_id, capability_id, name, type,
primary_class, attributes, location, confidence, source_refs)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
analysis_run_id,
capability_id,
feature.name,
feature.type,
feature.primary_class or feature.type,
self._attributes_to_json(
feature.attributes or [feature.type]
),
feature.location,
feature.confidence,
self._source_refs_to_json(feature.source_refs),
),
)
for evidence in capability.evidence:
connection.execute(
"""
INSERT INTO candidate_evidence
(repository_id, analysis_run_id, capability_id,
target_kind, target_id, type, reference,
reference_kind, reference_id, strength, source_refs)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
analysis_run_id,
capability_id,
"capability",
capability_id,
evidence.type,
evidence.reference,
"source",
None,
evidence.strength,
self._source_refs_to_json(evidence.source_refs),
),
)
def get_candidate_graph(
self,
repository_id: int,
analysis_run_id: int,
) -> CandidateGraph:
repository = self.get_repository(repository_id)
analysis_run = self.get_analysis_run(repository_id, analysis_run_id)
with self.connect() as connection:
ability_rows = connection.execute(
"""
SELECT id, name, description, primary_class, attributes, confidence,
status, source_refs
FROM candidate_abilities
WHERE repository_id = ? AND analysis_run_id = ?
ORDER BY id
""",
(repository_id, analysis_run_id),
).fetchall()
capability_rows = connection.execute(
"""
SELECT id, ability_id, name, description, inputs, outputs,
primary_class, attributes, confidence, status, source_refs
FROM candidate_capabilities
WHERE repository_id = ? AND analysis_run_id = ?
ORDER BY id
""",
(repository_id, analysis_run_id),
).fetchall()
feature_rows = connection.execute(
"""
SELECT id, capability_id, name, type, primary_class, attributes,
location, confidence, status, source_refs
FROM candidate_features
WHERE repository_id = ? AND analysis_run_id = ?
ORDER BY id
""",
(repository_id, analysis_run_id),
).fetchall()
evidence_rows = connection.execute(
"""
SELECT id, capability_id, target_kind, target_id, type, reference,
reference_kind, reference_id, strength, status, source_refs
FROM candidate_evidence
WHERE repository_id = ? AND analysis_run_id = ?
ORDER BY id
""",
(repository_id, analysis_run_id),
).fetchall()
features_by_capability: dict[int, list[CandidateFeature]] = {}
for row in feature_rows:
features_by_capability.setdefault(row["capability_id"], []).append(
CandidateFeature(
id=row["id"],
name=row["name"],
type=row["type"],
location=row["location"],
confidence=row["confidence"],
status=row["status"],
source_refs=self._source_refs_from_json(row["source_refs"]),
confidence_label=confidence_label(row["confidence"]),
primary_class=row["primary_class"] or row["type"],
attributes=self._attributes_from_json(row["attributes"]),
)
)
evidence_by_capability: dict[int, list[CandidateEvidence]] = {}
for row in evidence_rows:
evidence_by_capability.setdefault(row["capability_id"], []).append(
CandidateEvidence(
id=row["id"],
type=row["type"],
reference=row["reference"],
strength=row["strength"],
status=row["status"],
source_refs=self._source_refs_from_json(row["source_refs"]),
target_kind=row["target_kind"],
target_id=row["target_id"],
reference_kind=row["reference_kind"],
reference_id=row["reference_id"],
)
)
capabilities_by_ability: dict[int, list[CandidateCapability]] = {}
for row in capability_rows:
capabilities_by_ability.setdefault(row["ability_id"], []).append(
CandidateCapability(
id=row["id"],
name=row["name"],
description=row["description"],
inputs=json.loads(row["inputs"]),
outputs=json.loads(row["outputs"]),
confidence=row["confidence"],
status=row["status"],
source_refs=self._source_refs_from_json(row["source_refs"]),
confidence_label=confidence_label(row["confidence"]),
primary_class=row["primary_class"] or "capability",
attributes=self._attributes_from_json(row["attributes"]),
features=features_by_capability.get(row["id"], []),
evidence=evidence_by_capability.get(row["id"], []),
)
)
abilities = [
CandidateAbility(
id=row["id"],
name=row["name"],
description=row["description"],
confidence=row["confidence"],
status=row["status"],
source_refs=self._source_refs_from_json(row["source_refs"]),
confidence_label=confidence_label(row["confidence"]),
primary_class=row["primary_class"] or "ability",
attributes=self._attributes_from_json(row["attributes"]),
capabilities=capabilities_by_ability.get(row["id"], []),
)
for row in ability_rows
]
return CandidateGraph(
repository=repository,
analysis_run=analysis_run,
abilities=abilities,
)
def mark_candidate_graph_status(
self,
repository_id: int,
analysis_run_id: int,
status: str,
) -> None:
with self.connect() as connection:
for table in (
"candidate_abilities",
"candidate_capabilities",
"candidate_features",
"candidate_evidence",
):
connection.execute(
f"""
UPDATE {table}
SET status = ?
WHERE repository_id = ? AND analysis_run_id = ?
""",
(status, repository_id, analysis_run_id),
)
def mark_candidate_ability_status(
self,
repository_id: int,
analysis_run_id: int,
candidate_ability_id: int,
status: str,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_abilities
SET status = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(status, candidate_ability_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
"candidate ability "
f"{candidate_ability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
capability_rows = connection.execute(
"""
SELECT id FROM candidate_capabilities
WHERE ability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(candidate_ability_id, repository_id, analysis_run_id),
).fetchall()
capability_ids = [row["id"] for row in capability_rows]
connection.execute(
"""
UPDATE candidate_capabilities
SET status = ?
WHERE ability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(status, candidate_ability_id, repository_id, analysis_run_id),
)
for capability_id in capability_ids:
self._mark_candidate_children_status(
connection,
repository_id,
analysis_run_id,
capability_id,
status,
)
def mark_candidate_capability_status(
self,
repository_id: int,
analysis_run_id: int,
candidate_capability_id: int,
status: str,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_capabilities
SET status = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(status, candidate_capability_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
"candidate capability "
f"{candidate_capability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
self._mark_candidate_children_status(
connection,
repository_id,
analysis_run_id,
candidate_capability_id,
status,
)
def mark_candidate_feature_status(
self,
repository_id: int,
analysis_run_id: int,
candidate_feature_id: int,
status: str,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_features
SET status = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(status, candidate_feature_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
"candidate feature "
f"{candidate_feature_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def mark_candidate_evidence_status(
self,
repository_id: int,
analysis_run_id: int,
candidate_evidence_id: int,
status: str,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_evidence
SET status = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(status, candidate_evidence_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
"candidate evidence "
f"{candidate_evidence_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def _mark_candidate_children_status(
self,
connection: sqlite3.Connection,
repository_id: int,
analysis_run_id: int,
candidate_capability_id: int,
status: str,
) -> None:
connection.execute(
"""
UPDATE candidate_features
SET status = ?
WHERE capability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(status, candidate_capability_id, repository_id, analysis_run_id),
)
connection.execute(
"""
UPDATE candidate_evidence
SET status = ?
WHERE capability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(status, candidate_capability_id, repository_id, analysis_run_id),
)
def reject_candidate_ability(
self,
repository_id: int,
analysis_run_id: int,
candidate_ability_id: int,
) -> None:
with self.connect() as connection:
ability_cursor = connection.execute(
"""
UPDATE candidate_abilities
SET status = 'rejected'
WHERE id = ?
AND repository_id = ?
AND analysis_run_id = ?
AND status = 'candidate'
""",
(candidate_ability_id, repository_id, analysis_run_id),
)
if ability_cursor.rowcount == 0:
raise NotFoundError(
"candidate ability "
f"{candidate_ability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
capability_rows = connection.execute(
"""
SELECT id FROM candidate_capabilities
WHERE ability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(candidate_ability_id, repository_id, analysis_run_id),
).fetchall()
capability_ids = [row["id"] for row in capability_rows]
connection.execute(
"""
UPDATE candidate_capabilities
SET status = 'rejected'
WHERE ability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(candidate_ability_id, repository_id, analysis_run_id),
)
for capability_id in capability_ids:
connection.execute(
"""
UPDATE candidate_features
SET status = 'rejected'
WHERE capability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(capability_id, repository_id, analysis_run_id),
)
connection.execute(
"""
UPDATE candidate_evidence
SET status = 'rejected'
WHERE capability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(capability_id, repository_id, analysis_run_id),
)
def reject_candidate_capability(
self,
repository_id: int,
analysis_run_id: int,
candidate_capability_id: int,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_capabilities
SET status = 'rejected'
WHERE id = ?
AND repository_id = ?
AND analysis_run_id = ?
AND status = 'candidate'
""",
(candidate_capability_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
"candidate capability "
f"{candidate_capability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
connection.execute(
"""
UPDATE candidate_features
SET status = 'rejected'
WHERE capability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(candidate_capability_id, repository_id, analysis_run_id),
)
connection.execute(
"""
UPDATE candidate_evidence
SET status = 'rejected'
WHERE capability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(candidate_capability_id, repository_id, analysis_run_id),
)
def reject_candidate_feature(
self,
repository_id: int,
analysis_run_id: int,
candidate_feature_id: int,
) -> None:
self._reject_candidate_leaf(
table="candidate_features",
label="candidate feature",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=candidate_feature_id,
)
def reject_candidate_evidence(
self,
repository_id: int,
analysis_run_id: int,
candidate_evidence_id: int,
) -> None:
self._reject_candidate_leaf(
table="candidate_evidence",
label="candidate evidence",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=candidate_evidence_id,
)
def update_candidate_ability(
self,
repository_id: int,
analysis_run_id: int,
candidate_ability_id: int,
*,
name: str,
description: str,
confidence: float,
primary_class: str = "ability",
attributes: list[str] | None = None,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_abilities
SET name = ?, description = ?, primary_class = ?, attributes = ?,
confidence = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(
name,
description,
primary_class or "ability",
self._attributes_to_json(attributes or []),
confidence,
candidate_ability_id,
repository_id,
analysis_run_id,
),
)
if cursor.rowcount == 0:
raise NotFoundError(
"candidate ability "
f"{candidate_ability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def update_candidate_capability(
self,
repository_id: int,
analysis_run_id: int,
candidate_capability_id: int,
*,
name: str,
description: str,
confidence: float,
primary_class: str = "capability",
attributes: list[str] | None = None,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_capabilities
SET name = ?, description = ?, primary_class = ?, attributes = ?,
confidence = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(
name,
description,
primary_class or "capability",
self._attributes_to_json(attributes or []),
confidence,
candidate_capability_id,
repository_id,
analysis_run_id,
),
)
if cursor.rowcount == 0:
raise NotFoundError(
"candidate capability "
f"{candidate_capability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def update_candidate_feature(
self,
repository_id: int,
analysis_run_id: int,
candidate_feature_id: int,
*,
name: str,
type: str,
location: str,
confidence: float,
primary_class: str | None = None,
attributes: list[str] | None = None,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_features
SET name = ?, type = ?, primary_class = ?, attributes = ?,
location = ?, confidence = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(
name,
type,
primary_class or type,
self._attributes_to_json(attributes or [type]),
location,
confidence,
candidate_feature_id,
repository_id,
analysis_run_id,
),
)
if cursor.rowcount == 0:
raise NotFoundError(
"candidate feature "
f"{candidate_feature_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def relink_candidate_capability(
self,
repository_id: int,
analysis_run_id: int,
candidate_capability_id: int,
target_ability_id: int,
) -> None:
self._ensure_candidate_row(
table="candidate_abilities",
label="target candidate ability",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=target_ability_id,
)
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_capabilities
SET ability_id = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(
target_ability_id,
candidate_capability_id,
repository_id,
analysis_run_id,
),
)
if cursor.rowcount == 0:
raise NotFoundError(
"candidate capability "
f"{candidate_capability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def relink_candidate_feature(
self,
repository_id: int,
analysis_run_id: int,
candidate_feature_id: int,
target_capability_id: int,
) -> None:
self._relink_candidate_leaf(
table="candidate_features",
label="candidate feature",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=candidate_feature_id,
target_capability_id=target_capability_id,
)
def relink_candidate_evidence(
self,
repository_id: int,
analysis_run_id: int,
candidate_evidence_id: int,
target_capability_id: int,
) -> None:
self._relink_candidate_leaf(
table="candidate_evidence",
label="candidate evidence",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=candidate_evidence_id,
target_capability_id=target_capability_id,
)
def merge_candidate_ability(
self,
repository_id: int,
analysis_run_id: int,
source_ability_id: int,
target_ability_id: int,
) -> None:
if source_ability_id == target_ability_id:
raise ValueError("source and target candidate ability must be different")
self._ensure_candidate_row(
table="candidate_abilities",
label="target candidate ability",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=target_ability_id,
)
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_abilities
SET status = 'merged'
WHERE id = ?
AND repository_id = ?
AND analysis_run_id = ?
AND status = 'candidate'
""",
(source_ability_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
"source candidate ability "
f"{source_ability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
connection.execute(
"""
UPDATE candidate_capabilities
SET ability_id = ?
WHERE ability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(target_ability_id, source_ability_id, repository_id, analysis_run_id),
)
def merge_candidate_capability(
self,
repository_id: int,
analysis_run_id: int,
source_capability_id: int,
target_capability_id: int,
) -> None:
if source_capability_id == target_capability_id:
raise ValueError("source and target candidate capability must be different")
self._ensure_candidate_row(
table="candidate_capabilities",
label="target candidate capability",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=target_capability_id,
)
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_capabilities
SET status = 'merged'
WHERE id = ?
AND repository_id = ?
AND analysis_run_id = ?
AND status = 'candidate'
""",
(source_capability_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
"source candidate capability "
f"{source_capability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
for table in ("candidate_features", "candidate_evidence"):
connection.execute(
f"""
UPDATE {table}
SET capability_id = ?
WHERE capability_id = ?
AND repository_id = ?
AND analysis_run_id = ?
""",
(
target_capability_id,
source_capability_id,
repository_id,
analysis_run_id,
),
)
def merge_candidate_feature(
self,
repository_id: int,
analysis_run_id: int,
source_feature_id: int,
target_feature_id: int,
) -> None:
self._merge_candidate_leaf(
table="candidate_features",
label="candidate feature",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
source_id=source_feature_id,
target_id=target_feature_id,
)
def merge_candidate_evidence(
self,
repository_id: int,
analysis_run_id: int,
source_evidence_id: int,
target_evidence_id: int,
) -> None:
self._merge_candidate_leaf(
table="candidate_evidence",
label="candidate evidence",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
source_id=source_evidence_id,
target_id=target_evidence_id,
)
def _ensure_candidate_row(
self,
*,
table: str,
label: str,
repository_id: int,
analysis_run_id: int,
candidate_id: int,
) -> None:
with self.connect() as connection:
row = connection.execute(
f"""
SELECT id FROM {table}
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(candidate_id, repository_id, analysis_run_id),
).fetchone()
if row is None:
raise NotFoundError(
f"{label} {candidate_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def _relink_candidate_leaf(
self,
*,
table: str,
label: str,
repository_id: int,
analysis_run_id: int,
candidate_id: int,
target_capability_id: int,
) -> None:
self._ensure_candidate_row(
table="candidate_capabilities",
label="target candidate capability",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=target_capability_id,
)
with self.connect() as connection:
cursor = connection.execute(
f"""
UPDATE {table}
SET capability_id = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(
target_capability_id,
candidate_id,
repository_id,
analysis_run_id,
),
)
if cursor.rowcount == 0:
raise NotFoundError(
f"{label} {candidate_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def _merge_candidate_leaf(
self,
*,
table: str,
label: str,
repository_id: int,
analysis_run_id: int,
source_id: int,
target_id: int,
) -> None:
if source_id == target_id:
raise ValueError(f"source and target {label} must be different")
self._ensure_candidate_row(
table=table,
label=f"target {label}",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=target_id,
)
with self.connect() as connection:
cursor = connection.execute(
f"""
UPDATE {table}
SET status = 'merged'
WHERE id = ?
AND repository_id = ?
AND analysis_run_id = ?
AND status = 'candidate'
""",
(source_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
f"source {label} {source_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def _reject_candidate_leaf(
self,
*,
table: str,
label: str,
repository_id: int,
analysis_run_id: int,
candidate_id: int,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
f"""
UPDATE {table}
SET status = 'rejected'
WHERE id = ?
AND repository_id = ?
AND analysis_run_id = ?
AND status = 'candidate'
""",
(candidate_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
f"{label} {candidate_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def create_review_decision(
self,
repository_id: int,
analysis_run_id: int,
*,
action: str,
notes: str = "",
) -> int:
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO review_decisions
(repository_id, analysis_run_id, action, notes)
VALUES (?, ?, ?, ?)
""",
(repository_id, analysis_run_id, action, notes),
)
decision_id = int(cursor.lastrowid)
log_operation(
"review_decision_recorded",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
review_decision_id=decision_id,
action=action,
)
return decision_id
def list_review_decisions(
self,
repository_id: int,
analysis_run_id: int | None = None,
) -> list[ReviewDecision]:
self.get_repository(repository_id)
params: tuple[int, ...]
where = "WHERE repository_id = ?"
params = (repository_id,)
if analysis_run_id is not None:
where += " AND analysis_run_id = ?"
params = (repository_id, analysis_run_id)
with self.connect() as connection:
rows = connection.execute(
f"""
SELECT id, repository_id, analysis_run_id, action, notes, created_at
FROM review_decisions
{where}
ORDER BY created_at DESC, id DESC
""",
params,
).fetchall()
return [
ReviewDecision(
id=row["id"],
repository_id=row["repository_id"],
analysis_run_id=row["analysis_run_id"],
action=row["action"],
notes=row["notes"],
created_at=row["created_at"],
)
for row in rows
]
def create_expectation_gap(
self,
repository_id: int,
analysis_run_id: int | None,
*,
expected_type: str,
expected_name: str,
source: str,
notes: str = "",
) -> ExpectationGap:
self.get_repository(repository_id)
if analysis_run_id is not None:
self.get_analysis_run(repository_id, analysis_run_id)
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO expectation_gaps
(repository_id, analysis_run_id, expected_type, expected_name,
source, notes)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
repository_id,
analysis_run_id,
expected_type,
expected_name,
source,
notes,
),
)
gap_id = int(cursor.lastrowid)
log_operation(
"expectation_gap_recorded",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
expectation_gap_id=gap_id,
expected_type=expected_type,
)
return self.get_expectation_gap(repository_id, gap_id)
def get_expectation_gap(
self,
repository_id: int,
expectation_gap_id: int,
) -> ExpectationGap:
with self.connect() as connection:
row = connection.execute(
"""
SELECT id, repository_id, analysis_run_id, expected_type,
expected_name, source, notes, status, created_at
FROM expectation_gaps
WHERE repository_id = ? AND id = ?
""",
(repository_id, expectation_gap_id),
).fetchone()
if row is None:
raise NotFoundError(
f"expectation gap {expectation_gap_id} was not found for repository "
f"{repository_id}"
)
return self._expectation_gap_from_row(row)
def list_expectation_gaps(
self,
repository_id: int,
analysis_run_id: int | None = None,
) -> list[ExpectationGap]:
self.get_repository(repository_id)
params: tuple[int, ...]
where = "WHERE repository_id = ?"
params = (repository_id,)
if analysis_run_id is not None:
where += " AND analysis_run_id = ?"
params = (repository_id, analysis_run_id)
with self.connect() as connection:
rows = connection.execute(
f"""
SELECT id, repository_id, analysis_run_id, expected_type,
expected_name, source, notes, status, created_at
FROM expectation_gaps
{where}
ORDER BY created_at DESC, id DESC
""",
params,
).fetchall()
return [self._expectation_gap_from_row(row) for row in rows]
def fail_analysis_run(
self,
repository_id: int,
analysis_run_id: int,
error_message: str,
) -> AnalysisRun:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE analysis_runs
SET status = 'failed',
completed_at = CURRENT_TIMESTAMP,
error_message = ?
WHERE id = ? AND repository_id = ?
""",
(error_message, analysis_run_id, repository_id),
)
connection.execute(
"""
UPDATE repositories
SET status = 'analysis_failed', updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(repository_id,),
)
if cursor.rowcount == 0:
raise NotFoundError(
f"analysis run {analysis_run_id} was not found for repository {repository_id}"
)
return self.get_analysis_run(repository_id, analysis_run_id)
def get_analysis_run(self, repository_id: int, analysis_run_id: int) -> AnalysisRun:
with self.connect() as connection:
row = connection.execute(
"""
SELECT id, repository_id, snapshot_id, status, started_at,
completed_at, error_message, scanner_version
FROM analysis_runs
WHERE id = ? AND repository_id = ?
""",
(analysis_run_id, repository_id),
).fetchone()
if row is None:
raise NotFoundError(
f"analysis run {analysis_run_id} was not found for repository {repository_id}"
)
return self._analysis_run_from_row(row)
def list_analysis_runs(self, repository_id: int) -> list[AnalysisRun]:
self.get_repository(repository_id)
with self.connect() as connection:
rows = connection.execute(
"""
SELECT id, repository_id, snapshot_id, status, started_at,
completed_at, error_message, scanner_version
FROM analysis_runs
WHERE repository_id = ?
ORDER BY started_at DESC, id DESC
""",
(repository_id,),
).fetchall()
return [self._analysis_run_from_row(row) for row in rows]
def list_abilities(self) -> list[AbilitySummary]:
with self.connect() as connection:
rows = connection.execute(
"""
SELECT a.id, a.repository_id, r.name AS repository_name,
a.name, a.description, a.confidence
FROM approved_abilities a
JOIN repositories r ON r.id = a.repository_id
ORDER BY r.name ASC, a.name ASC, a.id ASC
"""
).fetchall()
return [
AbilitySummary(
id=row["id"],
repository_id=row["repository_id"],
repository_name=row["repository_name"],
name=row["name"],
description=row["description"],
confidence=row["confidence"],
confidence_label=confidence_label(row["confidence"]),
)
for row in rows
]
def list_capabilities(self) -> list[CapabilitySummary]:
with self.connect() as connection:
rows = connection.execute(
"""
SELECT c.id, c.repository_id, r.name AS repository_name,
c.ability_id, a.name AS ability_name,
c.name, c.description, c.confidence
FROM approved_capabilities c
JOIN approved_abilities a ON a.id = c.ability_id
JOIN repositories r ON r.id = c.repository_id
ORDER BY r.name ASC, a.name ASC, c.name ASC, c.id ASC
"""
).fetchall()
return [
CapabilitySummary(
id=row["id"],
repository_id=row["repository_id"],
repository_name=row["repository_name"],
ability_id=row["ability_id"],
ability_name=row["ability_name"],
name=row["name"],
description=row["description"],
confidence=row["confidence"],
confidence_label=confidence_label(row["confidence"]),
)
for row in rows
]
def get_snapshot(self, snapshot_id: int) -> RepositorySnapshot:
with self.connect() as connection:
row = connection.execute(
"""
SELECT id, repository_id, commit_hash, branch, source_path, file_count
FROM repository_snapshots
WHERE id = ?
""",
(snapshot_id,),
).fetchone()
if row is None:
raise NotFoundError(f"snapshot {snapshot_id} was not found")
return self._snapshot_from_row(row)
def list_observed_facts(
self,
repository_id: int,
analysis_run_id: int | None = None,
) -> list[ObservedFact]:
self.get_repository(repository_id)
params: tuple[int, ...]
where = "WHERE repository_id = ?"
params = (repository_id,)
if analysis_run_id is not None:
where += " AND analysis_run_id = ?"
params = (repository_id, analysis_run_id)
with self.connect() as connection:
rows = connection.execute(
f"""
SELECT id, repository_id, analysis_run_id, snapshot_id, kind,
path, name, value, metadata
FROM observed_facts
{where}
ORDER BY kind ASC, path ASC, name ASC, id ASC
""",
params,
).fetchall()
return [self._observed_fact_from_row(row) for row in rows]
def replace_content_chunks(
self,
repository_id: int,
analysis_run_id: int,
snapshot_id: int | None,
chunks: list[ContentChunkCandidate],
) -> None:
with self.connect() as connection:
connection.execute(
"DELETE FROM content_chunks WHERE analysis_run_id = ?",
(analysis_run_id,),
)
connection.executemany(
"""
INSERT INTO content_chunks
(repository_id, analysis_run_id, snapshot_id, path, kind,
start_line, end_line, text, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(
repository_id,
analysis_run_id,
snapshot_id,
chunk.path,
chunk.kind,
chunk.start_line,
chunk.end_line,
chunk.text,
json.dumps(chunk.metadata),
)
for chunk in chunks
],
)
def list_content_chunks(
self,
repository_id: int,
analysis_run_id: int | None = None,
) -> list[ContentChunk]:
self.get_repository(repository_id)
params: tuple[int, ...]
where = "WHERE repository_id = ?"
params = (repository_id,)
if analysis_run_id is not None:
where += " AND analysis_run_id = ?"
params = (repository_id, analysis_run_id)
with self.connect() as connection:
rows = connection.execute(
f"""
SELECT id, repository_id, analysis_run_id, snapshot_id, path, kind,
start_line, end_line, text, metadata
FROM content_chunks
{where}
ORDER BY path ASC, start_line ASC, id ASC
""",
params,
).fetchall()
return [self._content_chunk_from_row(row) for row in rows]
def create_ability(
self,
repository_id: int,
*,
name: str,
description: str,
confidence: float,
primary_class: str = "ability",
attributes: list[str] | None = None,
) -> int:
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO approved_abilities
(repository_id, name, description, primary_class, attributes, confidence)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
repository_id,
name,
description,
primary_class or "ability",
self._attributes_to_json(attributes or []),
confidence,
),
)
return int(cursor.lastrowid)
def ensure_ability(self, repository_id: int, ability_id: int) -> None:
with self.connect() as connection:
row = connection.execute(
"""
SELECT id FROM approved_abilities
WHERE id = ? AND repository_id = ?
""",
(ability_id, repository_id),
).fetchone()
if row is None:
raise NotFoundError(
f"ability {ability_id} was not found for repository {repository_id}"
)
def update_ability(
self,
repository_id: int,
ability_id: int,
*,
name: str | None = None,
description: str | None = None,
confidence: float | None = None,
primary_class: str | None = None,
attributes: list[str] | None = None,
) -> None:
self._update_approved_row(
table="approved_abilities",
label="ability",
repository_id=repository_id,
row_id=ability_id,
values={
"name": name,
"description": description,
"confidence": confidence,
"primary_class": primary_class,
"attributes": (
self._attributes_to_json(attributes)
if attributes is not None
else None
),
},
)
def delete_ability(self, repository_id: int, ability_id: int) -> None:
self._delete_approved_row(
table="approved_abilities",
label="ability",
repository_id=repository_id,
row_id=ability_id,
)
def create_capability(
self,
repository_id: int,
ability_id: int,
*,
name: str,
description: str,
inputs: list[str],
outputs: list[str],
confidence: float,
primary_class: str = "capability",
attributes: list[str] | None = None,
) -> int:
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO approved_capabilities
(repository_id, ability_id, name, description, inputs, outputs,
primary_class, attributes, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
ability_id,
name,
description,
json.dumps(inputs),
json.dumps(outputs),
primary_class or "capability",
self._attributes_to_json(attributes or []),
confidence,
),
)
return int(cursor.lastrowid)
def ensure_capability(self, repository_id: int, capability_id: int) -> None:
with self.connect() as connection:
row = connection.execute(
"""
SELECT id FROM approved_capabilities
WHERE id = ? AND repository_id = ?
""",
(capability_id, repository_id),
).fetchone()
if row is None:
raise NotFoundError(
f"capability {capability_id} was not found for repository {repository_id}"
)
def update_capability(
self,
repository_id: int,
capability_id: int,
*,
name: str | None = None,
description: str | None = None,
inputs: list[str] | None = None,
outputs: list[str] | None = None,
confidence: float | None = None,
primary_class: str | None = None,
attributes: list[str] | None = None,
) -> None:
self._update_approved_row(
table="approved_capabilities",
label="capability",
repository_id=repository_id,
row_id=capability_id,
values={
"name": name,
"description": description,
"inputs": json.dumps(inputs) if inputs is not None else None,
"outputs": json.dumps(outputs) if outputs is not None else None,
"confidence": confidence,
"primary_class": primary_class,
"attributes": (
self._attributes_to_json(attributes)
if attributes is not None
else None
),
},
)
def delete_capability(self, repository_id: int, capability_id: int) -> None:
self._delete_approved_row(
table="approved_capabilities",
label="capability",
repository_id=repository_id,
row_id=capability_id,
)
def create_feature(
self,
repository_id: int,
capability_id: int,
*,
name: str,
type: str,
location: str,
confidence: float,
source_refs: list[SourceReference] | None = None,
primary_class: str | None = None,
attributes: list[str] | None = None,
) -> int:
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO approved_features
(repository_id, capability_id, name, type, primary_class, attributes,
location, confidence, source_refs)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
capability_id,
name,
type,
primary_class or type,
self._attributes_to_json(attributes or [type]),
location,
confidence,
self._source_refs_to_json(source_refs or []),
),
)
return int(cursor.lastrowid)
def update_feature(
self,
repository_id: int,
feature_id: int,
*,
name: str | None = None,
type: str | None = None,
location: str | None = None,
confidence: float | None = None,
primary_class: str | None = None,
attributes: list[str] | None = None,
) -> None:
self._update_approved_row(
table="approved_features",
label="feature",
repository_id=repository_id,
row_id=feature_id,
values={
"name": name,
"type": type,
"primary_class": primary_class,
"attributes": (
self._attributes_to_json(attributes)
if attributes is not None
else None
),
"location": location,
"confidence": confidence,
},
)
def delete_feature(self, repository_id: int, feature_id: int) -> None:
self._delete_approved_row(
table="approved_features",
label="feature",
repository_id=repository_id,
row_id=feature_id,
)
def create_evidence(
self,
repository_id: int,
capability_id: int,
*,
type: str,
reference: str,
strength: str,
target_kind: str = "capability",
target_id: int | None = None,
reference_kind: str = "source",
reference_id: int | None = None,
source_refs: list[SourceReference] | None = None,
) -> int:
target_id = capability_id if target_id is None else target_id
with self.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO approved_evidence
(repository_id, capability_id, target_kind, target_id, type,
reference, reference_kind, reference_id, strength, source_refs)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
capability_id,
target_kind,
target_id,
type,
reference,
reference_kind,
reference_id,
strength,
self._source_refs_to_json(source_refs or []),
),
)
return int(cursor.lastrowid)
def update_evidence(
self,
repository_id: int,
evidence_id: int,
*,
type: str | None = None,
reference: str | None = None,
strength: str | None = None,
target_kind: str | None = None,
target_id: int | None = None,
reference_kind: str | None = None,
reference_id: int | None = None,
) -> None:
self._update_approved_row(
table="approved_evidence",
label="evidence",
repository_id=repository_id,
row_id=evidence_id,
values={
"type": type,
"reference": reference,
"strength": strength,
"target_kind": target_kind,
"target_id": target_id,
"reference_kind": reference_kind,
"reference_id": reference_id,
},
)
def delete_evidence(self, repository_id: int, evidence_id: int) -> None:
self._delete_approved_row(
table="approved_evidence",
label="evidence",
repository_id=repository_id,
row_id=evidence_id,
)
def _ensure_scope(self, repository_id: int) -> Scope:
repository = self.get_repository(repository_id)
with self.connect() as connection:
row = connection.execute(
"""
SELECT id, name, description, confidence
FROM repository_scopes
WHERE repository_id = ?
""",
(repository_id,),
).fetchone()
if row is None:
cursor = connection.execute(
"""
INSERT INTO repository_scopes
(repository_id, name, description, confidence)
VALUES (?, ?, ?, 1.0)
""",
(repository_id, repository.name, repository.description or ""),
)
scope_id = int(cursor.lastrowid)
return Scope(
id=scope_id,
name=repository.name,
description=repository.description or "",
confidence=1.0,
confidence_label=confidence_label(1.0),
)
return self._scope_from_row(row)
def _get_scope(self, repository_id: int) -> Scope:
with self.connect() as connection:
row = connection.execute(
"""
SELECT id, name, description, confidence
FROM repository_scopes
WHERE repository_id = ?
""",
(repository_id,),
).fetchone()
if row is None:
return self._ensure_scope(repository_id)
return self._scope_from_row(row)
def _scope_from_row(self, row: sqlite3.Row) -> Scope:
return Scope(
id=row["id"],
name=row["name"],
description=row["description"],
confidence=row["confidence"],
confidence_label=confidence_label(row["confidence"]),
)
def update_scope(
self,
repository_id: int,
*,
name: str | None = None,
description: str | None = None,
confidence: float | None = None,
) -> Scope:
self._ensure_scope(repository_id)
self._update_approved_row(
table="repository_scopes",
label="scope",
repository_id=repository_id,
row_id=self._get_scope(repository_id).id,
values={
"name": name,
"description": description,
"confidence": confidence,
},
)
return self._get_scope(repository_id)
def replace_approved_from_candidate_graph(
self,
repository_id: int,
graph: CandidateGraph,
) -> None:
if graph.repository.id != repository_id:
raise NotFoundError(
f"candidate graph for repository {graph.repository.id} does not match "
f"repository {repository_id}"
)
with self.connect() as connection:
connection.execute(
"DELETE FROM approved_abilities WHERE repository_id = ?",
(repository_id,),
)
for ability in graph.abilities:
if ability.status not in {"candidate", "approved"}:
continue
ability_cursor = connection.execute(
"""
INSERT INTO approved_abilities
(repository_id, name, description, primary_class, attributes, confidence)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
repository_id,
ability.name,
ability.description,
ability.primary_class or "ability",
self._attributes_to_json(ability.attributes),
ability.confidence,
),
)
approved_ability_id = int(ability_cursor.lastrowid)
for capability in ability.capabilities:
if capability.status not in {"candidate", "approved"}:
continue
capability_cursor = connection.execute(
"""
INSERT INTO approved_capabilities
(repository_id, ability_id, name, description, inputs, outputs,
primary_class, attributes, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
approved_ability_id,
capability.name,
capability.description,
json.dumps(capability.inputs),
json.dumps(capability.outputs),
capability.primary_class or "capability",
self._attributes_to_json(capability.attributes),
capability.confidence,
),
)
approved_capability_id = int(capability_cursor.lastrowid)
for feature in capability.features:
if feature.status not in {"candidate", "approved"}:
continue
connection.execute(
"""
INSERT INTO approved_features
(repository_id, capability_id, name, type, primary_class,
attributes, location, confidence, source_refs)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
approved_capability_id,
feature.name,
feature.type,
feature.primary_class or feature.type,
self._attributes_to_json(
feature.attributes or [feature.type]
),
feature.location,
feature.confidence,
self._source_refs_to_json(feature.source_refs),
),
)
for evidence in capability.evidence:
if evidence.status not in {"candidate", "approved"}:
continue
connection.execute(
"""
INSERT INTO approved_evidence
(repository_id, capability_id, target_kind, target_id,
type, reference, reference_kind, reference_id, strength,
source_refs)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
repository_id,
approved_capability_id,
evidence.target_kind,
evidence.target_id or approved_capability_id,
evidence.type,
evidence.reference,
evidence.reference_kind,
evidence.reference_id,
evidence.strength,
self._source_refs_to_json(evidence.source_refs),
),
)
def get_ability_map(self, repository_id: int) -> RepositoryAbilityMap:
repository = self.get_repository(repository_id)
scope = self._ensure_scope(repository_id)
with self.connect() as connection:
ability_rows = connection.execute(
"""
SELECT id, name, description, primary_class, attributes, confidence
FROM approved_abilities
WHERE repository_id = ?
ORDER BY id
""",
(repository_id,),
).fetchall()
capability_rows = connection.execute(
"""
SELECT id, ability_id, name, description, inputs, outputs,
primary_class, attributes, confidence
FROM approved_capabilities
WHERE repository_id = ?
ORDER BY id
""",
(repository_id,),
).fetchall()
feature_rows = connection.execute(
"""
SELECT id, capability_id, name, type, primary_class, attributes,
location, confidence, source_refs
FROM approved_features
WHERE repository_id = ?
ORDER BY id
""",
(repository_id,),
).fetchall()
evidence_rows = connection.execute(
"""
SELECT id, capability_id, target_kind, target_id, type, reference,
reference_kind, reference_id, strength, source_refs
FROM approved_evidence
WHERE repository_id = ?
ORDER BY id
""",
(repository_id,),
).fetchall()
features_by_capability: dict[int, list[Feature]] = {}
for row in feature_rows:
features_by_capability.setdefault(row["capability_id"], []).append(
Feature(
id=row["id"],
name=row["name"],
type=row["type"],
location=row["location"],
confidence=row["confidence"],
confidence_label=confidence_label(row["confidence"]),
source_refs=self._source_refs_from_json(row["source_refs"]),
primary_class=row["primary_class"] or row["type"],
attributes=self._attributes_from_json(row["attributes"]),
)
)
evidence_by_capability: dict[int, list[Evidence]] = {}
for row in evidence_rows:
evidence_by_capability.setdefault(row["capability_id"], []).append(
Evidence(
id=row["id"],
type=row["type"],
reference=row["reference"],
strength=row["strength"],
source_refs=self._source_refs_from_json(row["source_refs"]),
target_kind=row["target_kind"],
target_id=row["target_id"],
reference_kind=row["reference_kind"],
reference_id=row["reference_id"],
)
)
capabilities_by_ability: dict[int, list[Capability]] = {}
for row in capability_rows:
capabilities_by_ability.setdefault(row["ability_id"], []).append(
Capability(
id=row["id"],
name=row["name"],
description=row["description"],
inputs=json.loads(row["inputs"]),
outputs=json.loads(row["outputs"]),
confidence=row["confidence"],
confidence_label=confidence_label(row["confidence"]),
primary_class=row["primary_class"] or "capability",
attributes=self._attributes_from_json(row["attributes"]),
features=features_by_capability.get(row["id"], []),
evidence=evidence_by_capability.get(row["id"], []),
)
)
abilities = [
Ability(
id=row["id"],
name=row["name"],
description=row["description"],
confidence=row["confidence"],
confidence_label=confidence_label(row["confidence"]),
primary_class=row["primary_class"] or "ability",
attributes=self._attributes_from_json(row["attributes"]),
capabilities=capabilities_by_ability.get(row["id"], []),
)
for row in ability_rows
]
return RepositoryAbilityMap(repository=repository, scope=scope, abilities=abilities)
def search(
self,
query: str,
*,
status: str | None = None,
language: str | None = None,
framework: str | None = None,
ability: str | None = None,
capability: str | None = None,
) -> list[SearchResult]:
term = query.strip()
needle = f"%{term}%"
if not term:
return []
with self.connect() as connection:
repository_ids = self._search_filter_repository_ids(
connection,
status=status,
language=language,
framework=framework,
ability=ability,
capability=capability,
)
if repository_ids is not None and not repository_ids:
return []
repository_filter, repository_params = self._repository_filter_sql(
repository_ids,
)
repository_rows = connection.execute(
f"""
SELECT r.id AS repository_id, r.name AS repository_name,
r.description
FROM repositories r
WHERE (r.name LIKE ? OR COALESCE(r.description, '') LIKE ?)
{repository_filter}
""",
(needle, needle, *repository_params),
).fetchall()
ability_rows = connection.execute(
f"""
SELECT r.id AS repository_id, r.name AS repository_name,
a.id AS ability_id, a.name AS ability_name,
a.description AS ability_description, a.confidence
FROM approved_abilities a
JOIN repositories r ON r.id = a.repository_id
WHERE (a.name LIKE ? OR a.description LIKE ?)
{repository_filter}
""",
(needle, needle, *repository_params),
).fetchall()
capability_rows = connection.execute(
f"""
SELECT r.id AS repository_id, r.name AS repository_name,
a.id AS ability_id, a.name AS ability_name,
c.id AS capability_id, c.name AS capability_name,
c.description AS capability_description, c.confidence
FROM approved_capabilities c
JOIN approved_abilities a ON a.id = c.ability_id
JOIN repositories r ON r.id = c.repository_id
WHERE (c.name LIKE ? OR c.description LIKE ?)
{repository_filter}
""",
(needle, needle, *repository_params),
).fetchall()
feature_rows = connection.execute(
f"""
SELECT r.id AS repository_id, r.name AS repository_name,
a.id AS ability_id, a.name AS ability_name,
c.id AS capability_id, c.name AS capability_name,
f.name AS feature_name, f.type AS feature_type,
f.location, f.confidence
FROM approved_features f
JOIN approved_capabilities c ON c.id = f.capability_id
JOIN approved_abilities a ON a.id = c.ability_id
JOIN repositories r ON r.id = f.repository_id
WHERE (f.name LIKE ? OR f.type LIKE ? OR f.location LIKE ?)
{repository_filter}
""",
(needle, needle, needle, *repository_params),
).fetchall()
evidence_rows = connection.execute(
f"""
SELECT r.id AS repository_id, r.name AS repository_name,
a.id AS ability_id, a.name AS ability_name,
c.id AS capability_id, c.name AS capability_name,
e.type AS evidence_type, e.reference, e.strength
FROM approved_evidence e
JOIN approved_capabilities c ON c.id = e.capability_id
JOIN approved_abilities a ON a.id = c.ability_id
JOIN repositories r ON r.id = e.repository_id
WHERE (e.type LIKE ? OR e.reference LIKE ? OR e.strength LIKE ?)
{repository_filter}
""",
(needle, needle, needle, *repository_params),
).fetchall()
results: list[SearchResult] = []
for row in repository_rows:
matched_field = (
"name" if self._matches(row["repository_name"], term) else "description"
)
results.append(
SearchResult(
repository_id=row["repository_id"],
repository_name=row["repository_name"],
match_type="repository",
match_name=row["repository_name"],
confidence=1.0,
confidence_label=confidence_label(1.0),
match_description=row["description"] or "",
matched_field=matched_field,
text_score=1.0,
hybrid_score=1.0,
)
)
for row in ability_rows:
matched_field = (
"name" if self._matches(row["ability_name"], term) else "description"
)
results.append(
SearchResult(
repository_id=row["repository_id"],
repository_name=row["repository_name"],
match_type="ability",
match_name=row["ability_name"],
confidence=row["confidence"],
confidence_label=confidence_label(row["confidence"]),
match_description=row["ability_description"],
matched_field=matched_field,
ability_id=row["ability_id"],
ability_name=row["ability_name"],
text_score=1.0,
hybrid_score=row["confidence"],
)
)
for row in capability_rows:
matched_field = (
"name"
if self._matches(row["capability_name"], term)
else "description"
)
results.append(
SearchResult(
repository_id=row["repository_id"],
repository_name=row["repository_name"],
match_type="capability",
match_name=row["capability_name"],
confidence=row["confidence"],
confidence_label=confidence_label(row["confidence"]),
match_description=row["capability_description"],
matched_field=matched_field,
ability_id=row["ability_id"],
ability_name=row["ability_name"],
capability_id=row["capability_id"],
capability_name=row["capability_name"],
text_score=1.0,
hybrid_score=row["confidence"],
)
)
for row in feature_rows:
matched_field = self._first_matched_field(
term,
{
"name": row["feature_name"],
"type": row["feature_type"],
"location": row["location"],
},
)
results.append(
SearchResult(
repository_id=row["repository_id"],
repository_name=row["repository_name"],
match_type="feature",
match_name=row["feature_name"],
confidence=row["confidence"],
confidence_label=confidence_label(row["confidence"]),
match_description=row["feature_type"],
matched_field=matched_field,
ability_id=row["ability_id"],
ability_name=row["ability_name"],
capability_id=row["capability_id"],
capability_name=row["capability_name"],
source_reference=row["location"],
text_score=1.0,
hybrid_score=row["confidence"],
)
)
for row in evidence_rows:
matched_field = self._first_matched_field(
term,
{
"type": row["evidence_type"],
"reference": row["reference"],
"strength": row["strength"],
},
)
results.append(
SearchResult(
repository_id=row["repository_id"],
repository_name=row["repository_name"],
match_type="evidence",
match_name=row["reference"],
confidence=self._evidence_confidence(row["strength"]),
confidence_label=confidence_label(
self._evidence_confidence(row["strength"])
),
match_description=row["evidence_type"],
matched_field=matched_field,
ability_id=row["ability_id"],
ability_name=row["ability_name"],
capability_id=row["capability_id"],
capability_name=row["capability_name"],
evidence_level=row["strength"],
source_reference=row["reference"],
text_score=1.0,
hybrid_score=self._evidence_confidence(row["strength"]),
)
)
return sorted(
results,
key=lambda result: (
-result.hybrid_score,
-result.confidence,
result.repository_name.lower(),
result.match_type,
result.match_name.lower(),
),
)
def _matches(self, value: str | None, term: str) -> bool:
return term.lower() in (value or "").lower()
def _first_matched_field(self, term: str, values: dict[str, str | None]) -> str:
for field, value in values.items():
if self._matches(value, term):
return field
return ""
def _evidence_confidence(self, strength: str) -> float:
return {"strong": 0.9, "medium": 0.6, "weak": 0.3}.get(strength, 0.5)
def _search_filter_repository_ids(
self,
connection: sqlite3.Connection,
*,
status: str | None,
language: str | None,
framework: str | None,
ability: str | None,
capability: str | None,
) -> list[int] | None:
filters: list[set[int]] = []
if status:
rows = connection.execute(
"SELECT id FROM repositories WHERE status = ?",
(status,),
).fetchall()
filters.append({row["id"] for row in rows})
if language:
rows = connection.execute(
"""
SELECT DISTINCT repository_id
FROM observed_facts
WHERE kind = 'language' AND name LIKE ?
""",
(language,),
).fetchall()
filters.append({row["repository_id"] for row in rows})
if framework:
rows = connection.execute(
"""
SELECT DISTINCT repository_id
FROM observed_facts
WHERE kind = 'framework' AND name LIKE ?
""",
(framework,),
).fetchall()
filters.append({row["repository_id"] for row in rows})
if ability:
rows = connection.execute(
"""
SELECT DISTINCT repository_id
FROM approved_abilities
WHERE name LIKE ? OR description LIKE ?
""",
(f"%{ability}%", f"%{ability}%"),
).fetchall()
filters.append({row["repository_id"] for row in rows})
if capability:
rows = connection.execute(
"""
SELECT DISTINCT repository_id
FROM approved_capabilities
WHERE name LIKE ? OR description LIKE ?
""",
(f"%{capability}%", f"%{capability}%"),
).fetchall()
filters.append({row["repository_id"] for row in rows})
if not filters:
return None
repository_ids = set.intersection(*filters)
return sorted(repository_ids)
def _repository_filter_sql(
self,
repository_ids: list[int] | None,
) -> tuple[str, tuple[int, ...]]:
if repository_ids is None:
return "", ()
placeholders = ", ".join("?" for _ in repository_ids)
return f"AND r.id IN ({placeholders})", tuple(repository_ids)
def _insert_facts(
self,
connection: sqlite3.Connection,
*,
repository_id: int,
analysis_run_id: int,
snapshot_id: int,
facts: list[FactCandidate],
) -> None:
connection.executemany(
"""
INSERT INTO observed_facts
(repository_id, analysis_run_id, snapshot_id, kind, path, name, value, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(
repository_id,
analysis_run_id,
snapshot_id,
fact.kind,
fact.path,
fact.name,
fact.value,
json.dumps(fact.metadata),
)
for fact in facts
],
)
def _update_approved_row(
self,
*,
table: str,
label: str,
repository_id: int,
row_id: int,
values: dict[str, str | float | int | None],
) -> None:
assignments: list[str] = []
params: list[str | float | int] = []
for column, value in values.items():
if value is None:
continue
assignments.append(f"{column} = ?")
params.append(value)
if not assignments:
self._ensure_approved_row(
table=table,
label=label,
repository_id=repository_id,
row_id=row_id,
)
return
params.extend([row_id, repository_id])
with self.connect() as connection:
cursor = connection.execute(
f"""
UPDATE {table}
SET {", ".join(assignments)}
WHERE id = ? AND repository_id = ?
""",
params,
)
if cursor.rowcount == 0:
raise NotFoundError(
f"{label} {row_id} was not found for repository {repository_id}"
)
def _delete_approved_row(
self,
*,
table: str,
label: str,
repository_id: int,
row_id: int,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
f"DELETE FROM {table} WHERE id = ? AND repository_id = ?",
(row_id, repository_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
f"{label} {row_id} was not found for repository {repository_id}"
)
def _ensure_approved_row(
self,
*,
table: str,
label: str,
repository_id: int,
row_id: int,
) -> None:
with self.connect() as connection:
row = connection.execute(
f"SELECT id FROM {table} WHERE id = ? AND repository_id = ?",
(row_id, repository_id),
).fetchone()
if row is None:
raise NotFoundError(
f"{label} {row_id} was not found for repository {repository_id}"
)
def _source_refs_to_json(self, source_refs: list[SourceReference]) -> str:
return json.dumps(
[
{
"fact_id": source_ref.fact_id,
"path": source_ref.path,
"kind": source_ref.kind,
"name": source_ref.name,
"line": source_ref.line,
}
for source_ref in source_refs
]
)
def _attributes_to_json(self, attributes: list[str]) -> str:
return json.dumps([item.strip() for item in attributes if item.strip()])
def _attributes_from_json(self, value: str) -> list[str]:
if not value:
return []
parsed = json.loads(value)
if not isinstance(parsed, list):
return []
return [str(item) for item in parsed if str(item).strip()]
def _source_refs_from_json(self, value: str) -> list[SourceReference]:
return [
SourceReference(
fact_id=item.get("fact_id"),
path=item.get("path", ""),
kind=item.get("kind", ""),
name=item.get("name", ""),
line=item.get("line"),
)
for item in json.loads(value)
]
@staticmethod
def _repository_from_row(row: sqlite3.Row) -> Repository:
return Repository(
id=row["id"],
name=row["name"],
url=row["url"],
description=row["description"],
branch=row["branch"],
status=row["status"],
)
@staticmethod
def _snapshot_from_row(row: sqlite3.Row) -> RepositorySnapshot:
return RepositorySnapshot(
id=row["id"],
repository_id=row["repository_id"],
commit_hash=row["commit_hash"],
branch=row["branch"],
source_path=row["source_path"],
file_count=row["file_count"],
)
@staticmethod
def _analysis_run_from_row(row: sqlite3.Row) -> AnalysisRun:
return AnalysisRun(
id=row["id"],
repository_id=row["repository_id"],
snapshot_id=row["snapshot_id"],
status=row["status"],
started_at=row["started_at"],
completed_at=row["completed_at"],
error_message=row["error_message"],
scanner_version=row["scanner_version"],
)
@staticmethod
def _observed_fact_from_row(row: sqlite3.Row) -> ObservedFact:
return ObservedFact(
id=row["id"],
repository_id=row["repository_id"],
analysis_run_id=row["analysis_run_id"],
snapshot_id=row["snapshot_id"],
kind=row["kind"],
path=row["path"],
name=row["name"],
value=row["value"],
metadata=json.loads(row["metadata"]),
)
@staticmethod
def _content_chunk_from_row(row: sqlite3.Row) -> ContentChunk:
return ContentChunk(
id=row["id"],
repository_id=row["repository_id"],
analysis_run_id=row["analysis_run_id"],
snapshot_id=row["snapshot_id"],
path=row["path"],
kind=row["kind"],
start_line=row["start_line"],
end_line=row["end_line"],
text=row["text"],
metadata=json.loads(row["metadata"]),
)
@staticmethod
def _expectation_gap_from_row(row: sqlite3.Row) -> ExpectationGap:
return ExpectationGap(
id=row["id"],
repository_id=row["repository_id"],
analysis_run_id=row["analysis_run_id"],
expected_type=row["expected_type"],
expected_name=row["expected_name"],
source=row["source"],
notes=row["notes"],
status=row["status"],
created_at=row["created_at"],
)