perf(doi): fingerprint-based DB cache for DoI results

Adds doi_cache table (migration k8f9a0b1c2d3). Results are stored after
each evaluation and reused on subsequent requests when the fingerprint
matches. Fingerprint covers repo.updated_at, latest TPSC snapshot_at,
latest goal updated_at, and mtime of SCOPE.md / CLAUDE.md / tpsc.yaml.

Behaviour:
- Summary (warm cache, nothing changed): ~0.4s (was 0.9s)
- Summary (one repo stale): ~0.9s (only stale repos recomputed)
- Single repo (cache hit): ~0.2s (was 40s for full check)
- Single repo ?force_refresh=true: ~2s (full C7/C13 subprocess check)

Total journey: 108s (original) → 6s → <1s → 0.2s (cached single repo)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-20 01:47:19 +01:00
parent 245cd72ba3
commit 1ee0343f75
5 changed files with 251 additions and 38 deletions

View File

@@ -45,6 +45,35 @@ class DoIReport:
checked_at: str = field(default_factory=lambda: datetime.now(tz=timezone.utc).isoformat())
def compute_fingerprint(
repo: dict,
latest_tpsc_snap_at: str | None,
latest_goal_updated_at: str | None,
) -> str:
"""Compute a pipe-joined fingerprint of all inputs that affect DoI criteria.
If any component changes, the fingerprint changes and the cache is invalidated:
- repo.updated_at → covers last_sbom_at, remote_url, host_paths, domain changes
- latest_tpsc_snap_at → C9 (TPSC snapshot exists)
- latest_goal_updated_at → C10 (active repo goal)
- mtime of SCOPE.md, CLAUDE.md, tpsc.yaml → C5, C6, C9, C11, C12
"""
parts = [
str(repo.get("updated_at") or ""),
str(latest_tpsc_snap_at or ""),
str(latest_goal_updated_at or ""),
]
repo_path = _resolve_path(repo)
if repo_path:
for fname in ("SCOPE.md", "CLAUDE.md", "tpsc.yaml"):
f = Path(repo_path) / fname
try:
parts.append(f"{fname}:{f.stat().st_mtime:.3f}")
except FileNotFoundError:
parts.append(f"{fname}:absent")
return "|".join(parts)
def _resolve_path(repo: dict) -> str:
hostname = socket.gethostname()
host_paths = repo.get("host_paths") or {}

View File

@@ -18,6 +18,7 @@ from api.models.agent_message import AgentMessage
from api.models.capability_catalog import CapabilityCatalog
from api.models.capability_request import CapabilityRequest
from api.models.tpsc import TPSCCatalog, TPSCSnapshot, TPSCEntry
from api.models.doi_cache import DOICache
__all__ = [
"Base",
@@ -40,4 +41,5 @@ __all__ = [
"CapabilityCatalog",
"CapabilityRequest",
"TPSCCatalog", "TPSCSnapshot", "TPSCEntry",
"DOICache",
]

27
api/models/doi_cache.py Normal file
View File

@@ -0,0 +1,27 @@
import uuid
from datetime import datetime
from sqlalchemy import Boolean, DateTime, ForeignKey, String, Text, func
from sqlalchemy.dialects.postgresql import JSON, UUID
from sqlalchemy.orm import Mapped, mapped_column
from api.models.base import Base
class DOICache(Base):
__tablename__ = "doi_cache"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
repo_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("managed_repos.id", ondelete="CASCADE"),
nullable=False, unique=True, index=True,
)
tier: Mapped[str] = mapped_column(String(20), nullable=False)
core_pass: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default="false")
standard_pass: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default="false")
full_pass: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default="false")
criteria: Mapped[list | None] = mapped_column(JSON, nullable=True)
# Pipe-joined string of timestamps/mtimes used to detect staleness
fingerprint: Mapped[str] = mapped_column(Text, nullable=False)
checked_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())

View File

@@ -1,14 +1,14 @@
import asyncio
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy import case, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import func
from api.database import get_session
from api.doi_engine import evaluate as _doi_evaluate
from api.doi_engine import compute_fingerprint, evaluate as _doi_evaluate
from api.models.doi_cache import DOICache
from api.models.domain import Domain
from api.models.managed_repo import ManagedRepo
from api.models.repo_goal import RepoGoal
@@ -76,65 +76,134 @@ async def register_repo(
@router.get("/doi/summary", response_model=list[DoISummaryEntry])
async def doi_summary(session: AsyncSession = Depends(get_session)) -> list[DoISummaryEntry]:
"""Return DoI tier for all active repos, worst tier first."""
"""Return DoI tier for all active repos, worst tier first.
Results are cached in doi_cache. A repo is only re-evaluated when its
fingerprint changes (repo record updated, new TPSC snapshot, goal change,
or a key file mtime changes on disk).
"""
repos_result = await session.execute(
select(ManagedRepo).where(ManagedRepo.status == "active").order_by(ManagedRepo.name)
)
repos = list(repos_result.scalars().all())
repo_ids = [r.id for r in repos]
id_to_slug = {r.id: r.slug for r in repos}
# ── 3 bulk DB queries instead of 48 HTTP self-calls ───────────────────────
# C2: domain status by slug
# ── Bulk DB queries for fingerprint inputs ────────────────────────────────
domains_result = await session.execute(select(Domain))
domain_obj_map = {d.id: d for d in domains_result.scalars().all()}
domain_map = {d.id: d.slug for d in domain_obj_map.values()}
domain_status = {d.slug: d.status for d in domain_obj_map.values()}
# C9: TPSC snapshot count per repo_id
repo_ids = [r.id for r in repos]
# Latest TPSC snapshot timestamp per repo (for fingerprint + C9 count)
tpsc_result = await session.execute(
select(TPSCSnapshot.repo_id, func.count().label("cnt"))
select(TPSCSnapshot.repo_id,
func.count().label("cnt"),
func.max(TPSCSnapshot.snapshot_at).label("latest"))
.where(TPSCSnapshot.repo_id.in_(repo_ids))
.group_by(TPSCSnapshot.repo_id)
)
id_to_slug = {r.id: r.slug for r in repos}
tpsc_snap_counts = {id_to_slug[row.repo_id]: row.cnt for row in tpsc_result if row.repo_id in id_to_slug}
tpsc_by_id = {row.repo_id: row for row in tpsc_result}
tpsc_snap_counts = {id_to_slug[rid]: row.cnt for rid, row in tpsc_by_id.items() if rid in id_to_slug}
tpsc_snap_latest = {id_to_slug[rid]: str(row.latest) for rid, row in tpsc_by_id.items() if rid in id_to_slug}
# C10: active repo goal count per repo_id
# Latest goal updated_at + active count per repo (for fingerprint + C10)
goals_result = await session.execute(
select(RepoGoal.repo_id, func.count().label("cnt"))
.where(RepoGoal.repo_id.in_(repo_ids), RepoGoal.status == "active")
select(RepoGoal.repo_id,
func.count().label("total"),
func.sum(case((RepoGoal.status == "active", 1), else_=0)).label("active_cnt"),
func.max(RepoGoal.updated_at).label("latest"))
.where(RepoGoal.repo_id.in_(repo_ids))
.group_by(RepoGoal.repo_id)
)
active_goal_counts = {id_to_slug[row.repo_id]: row.cnt for row in goals_result if row.repo_id in id_to_slug}
goals_by_id = {row.repo_id: row for row in goals_result}
active_goal_counts = {id_to_slug[rid]: int(row.active_cnt or 0) for rid, row in goals_by_id.items() if rid in id_to_slug}
goals_latest = {id_to_slug[rid]: str(row.latest) for rid, row in goals_by_id.items() if rid in id_to_slug}
prefetch = {
"domain_status": domain_status,
"tpsc_snap_counts": tpsc_snap_counts,
"active_goal_counts": active_goal_counts,
}
# Load existing cache rows
cache_result = await session.execute(
select(DOICache).where(DOICache.repo_id.in_(repo_ids))
)
cache_by_repo_id = {row.repo_id: row for row in cache_result.scalars().all()}
# ─────────────────────────────────────────────────────────────────────────
async def _check_one(repo: ManagedRepo) -> DoISummaryEntry:
prefetch = {
"domain_status": domain_status,
"tpsc_snap_counts": tpsc_snap_counts,
"active_goal_counts": active_goal_counts,
}
async def _get_or_refresh(repo: ManagedRepo) -> DoISummaryEntry:
slug = repo.slug
repo_dict = {
"slug": repo.slug,
"slug": slug,
"domain_slug": domain_map.get(repo.domain_id),
"local_path": repo.local_path,
"remote_url": repo.remote_url,
"host_paths": repo.host_paths or {},
"last_sbom_at": str(repo.last_sbom_at) if repo.last_sbom_at else None,
"updated_at": str(repo.updated_at) if repo.updated_at else "",
}
fp = compute_fingerprint(
repo_dict,
tpsc_snap_latest.get(slug),
goals_latest.get(slug),
)
cached = cache_by_repo_id.get(repo.id)
if cached and cached.fingerprint == fp:
# Cache hit — return stored result
return DoISummaryEntry(
repo_slug=slug,
domain_slug=domain_map.get(repo.domain_id),
tier=cached.tier,
core_pass=cached.core_pass,
standard_pass=cached.standard_pass,
full_pass=cached.full_pass,
checked_at=cached.checked_at.isoformat(),
)
# Cache miss — evaluate and store
report = await _doi_evaluate(repo_dict, skip_consistency=True, prefetch=prefetch)
now = datetime.now(tz=timezone.utc)
if cached:
cached.tier = report.tier
cached.core_pass = report.core_pass
cached.standard_pass = report.standard_pass
cached.full_pass = report.full_pass
cached.criteria = [{"id": c.id, "label": c.label, "tier": c.tier,
"status": c.status, "detail": c.detail}
for c in report.criteria]
cached.fingerprint = fp
cached.checked_at = now
cached.updated_at = now
else:
session.add(DOICache(
repo_id=repo.id,
tier=report.tier,
core_pass=report.core_pass,
standard_pass=report.standard_pass,
full_pass=report.full_pass,
criteria=[{"id": c.id, "label": c.label, "tier": c.tier,
"status": c.status, "detail": c.detail}
for c in report.criteria],
fingerprint=fp,
checked_at=now,
updated_at=now,
))
return DoISummaryEntry(
repo_slug=repo.slug,
repo_slug=slug,
domain_slug=domain_map.get(repo.domain_id),
tier=report.tier,
core_pass=report.core_pass,
standard_pass=report.standard_pass,
full_pass=report.full_pass,
checked_at=report.checked_at,
checked_at=now.isoformat(),
)
entries: list[DoISummaryEntry] = list(await asyncio.gather(*[_check_one(r) for r in repos]))
entries: list[DoISummaryEntry] = list(await asyncio.gather(*[_get_or_refresh(r) for r in repos]))
await session.commit()
tier_order = {"none": 0, "core": 1, "standard": 2, "full": 3}
entries.sort(key=lambda e: tier_order.get(e.tier, 0))
@@ -142,12 +211,29 @@ async def doi_summary(session: AsyncSession = Depends(get_session)) -> list[DoIS
@router.get("/{slug}/doi", response_model=DoIReport)
async def get_repo_doi(slug: str, session: AsyncSession = Depends(get_session)) -> DoIReport:
"""Evaluate the 14 DoI criteria for a single repo."""
async def get_repo_doi(
slug: str,
force_refresh: bool = False,
session: AsyncSession = Depends(get_session),
) -> DoIReport:
"""Evaluate the 14 DoI criteria for a single repo (full check including C7/C13).
Results are cached by fingerprint. Pass ?force_refresh=true to bypass the cache.
"""
repo = await _get_repo_by_slug(slug, session)
domain_result = await session.execute(select(Domain).where(Domain.id == repo.domain_id))
domain_obj = domain_result.scalar_one_or_none()
# Fingerprint inputs for this single repo
tpsc_row = (await session.execute(
select(func.count().label("cnt"), func.max(TPSCSnapshot.snapshot_at).label("latest"))
.where(TPSCSnapshot.repo_id == repo.id)
)).one()
goal_row = (await session.execute(
select(func.max(RepoGoal.updated_at).label("latest"))
.where(RepoGoal.repo_id == repo.id)
)).one()
repo_dict = {
"slug": repo.slug,
"domain_slug": domain_obj.slug if domain_obj else None,
@@ -155,19 +241,50 @@ async def get_repo_doi(slug: str, session: AsyncSession = Depends(get_session))
"remote_url": repo.remote_url,
"host_paths": repo.host_paths or {},
"last_sbom_at": str(repo.last_sbom_at) if repo.last_sbom_at else None,
"updated_at": str(repo.updated_at) if repo.updated_at else "",
}
fp = compute_fingerprint(repo_dict, str(tpsc_row.latest) if tpsc_row.latest else None,
str(goal_row.latest) if goal_row.latest else None)
# Check cache (unless force_refresh)
cached = (await session.execute(
select(DOICache).where(DOICache.repo_id == repo.id)
)).scalar_one_or_none()
if not force_refresh and cached and cached.fingerprint == fp and cached.criteria:
return DoIReport(
repo_slug=slug,
tier=cached.tier,
core_pass=cached.core_pass,
standard_pass=cached.standard_pass,
full_pass=cached.full_pass,
checked_at=cached.checked_at.isoformat(),
criteria=[DoICriterion(**c) for c in cached.criteria],
)
# Full evaluation (includes C7/C13 consistency subprocesses)
report = await _doi_evaluate(repo_dict)
now = datetime.now(tz=timezone.utc)
criteria_json = [{"id": c.id, "label": c.label, "tier": c.tier,
"status": c.status, "detail": c.detail} for c in report.criteria]
if cached:
cached.tier = report.tier; cached.core_pass = report.core_pass
cached.standard_pass = report.standard_pass; cached.full_pass = report.full_pass
cached.criteria = criteria_json; cached.fingerprint = fp
cached.checked_at = now; cached.updated_at = now
else:
session.add(DOICache(repo_id=repo.id, tier=report.tier,
core_pass=report.core_pass, standard_pass=report.standard_pass,
full_pass=report.full_pass, criteria=criteria_json,
fingerprint=fp, checked_at=now, updated_at=now))
await session.commit()
return DoIReport(
repo_slug=report.repo_slug,
tier=report.tier,
core_pass=report.core_pass,
standard_pass=report.standard_pass,
full_pass=report.full_pass,
checked_at=report.checked_at,
criteria=[
DoICriterion(id=c.id, label=c.label, tier=c.tier, status=c.status, detail=c.detail)
for c in report.criteria
],
repo_slug=report.repo_slug, tier=report.tier,
core_pass=report.core_pass, standard_pass=report.standard_pass,
full_pass=report.full_pass, checked_at=report.checked_at,
criteria=[DoICriterion(id=c.id, label=c.label, tier=c.tier,
status=c.status, detail=c.detail) for c in report.criteria],
)

View File

@@ -0,0 +1,38 @@
"""doi_cache: materialised DoI results with fingerprint-based invalidation
Revision ID: k8f9a0b1c2d3
Revises: j7e8f9a0b1c2
Create Date: 2026-03-20
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSON
import uuid
revision = "k8f9a0b1c2d3"
down_revision = "j7e8f9a0b1c2"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
"doi_cache",
sa.Column("id", UUID(as_uuid=True), primary_key=True, default=uuid.uuid4),
sa.Column("repo_id", UUID(as_uuid=True),
sa.ForeignKey("managed_repos.id", ondelete="CASCADE"),
nullable=False, unique=True),
sa.Column("tier", sa.String(20), nullable=False),
sa.Column("core_pass", sa.Boolean, nullable=False, server_default="false"),
sa.Column("standard_pass", sa.Boolean, nullable=False, server_default="false"),
sa.Column("full_pass", sa.Boolean, nullable=False, server_default="false"),
sa.Column("criteria", JSON, nullable=True), # full criterion list
sa.Column("fingerprint", sa.Text, nullable=False), # pipe-joined timestamps
sa.Column("checked_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index("ix_doi_cache_repo_id", "doi_cache", ["repo_id"])
def downgrade() -> None:
op.drop_table("doi_cache")