Files
the-custodian/state-hub/api/routers/repos.py
tegwick 17303d2519 feat(state-hub): Interface Change Registry (CUST-WP-0033 T01-T06)
Adds first-class tracking for API and interface mutations across the
agent ecosystem. Breaking changes are documented, affected repos are
notified via inbox, and agents discover pending changes at session
start via the dispatch endpoint.

- Migration q4l5m6n7o8p9: interface_changes table
- Model/schema: InterfaceChange with draft→published→resolved lifecycle
- Router: POST/GET/PATCH /interface-changes/, /publish, /resolve actions
  (auto-notify affected repo agents on publish; progress event on origin)
- Dispatch: GET /repos/{slug}/dispatch now returns pending_interface_changes
- MCP tools: register_interface_change, list_interface_changes,
  publish_interface_change, resolve_interface_change
- Dashboard: /interface-changes page with type badges, planned calendar,
  published cards, and draft table
- EP-CUST-ICR-001 registered: webhook subscriptions (deliberately deferred)

First record: trailing-slash normalisation (2026-04-26), published,
affecting repo-registry — visible in repo-registry dispatch immediately.

223 tests passing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-26 15:29:08 +02:00

557 lines
21 KiB
Python

import asyncio
import json
import socket
import subprocess
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import case, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from api.config import settings
from api.database import get_session
from api.doi_engine import compute_fingerprint, evaluate as _doi_evaluate
from api.models.doi_cache import DOICache
from api.models.domain import Domain
from api.models.interface_change import InterfaceChange
from api.models.managed_repo import ManagedRepo
from api.models.repo_goal import RepoGoal
from api.models.tpsc import TPSCSnapshot
from api.models.task import Task
from api.models.workstream import Workstream
from api.schemas.doi import DoICriterion, DoIReport, DoISummaryEntry
from api.schemas.managed_repo import (
DispatchTask,
DispatchWorkstream,
PendingInterfaceChange,
RepoCreate,
RepoDispatch,
RepoPathRegister,
RepoRead,
RepoUpdate,
)
router = APIRouter(prefix="/repos", tags=["repos"])
@router.get("/", response_model=list[RepoRead])
async def list_repos(
domain: str | None = None,
session: AsyncSession = Depends(get_session),
) -> list[ManagedRepo]:
q = select(ManagedRepo).order_by(ManagedRepo.name)
if domain:
domain_row = await session.execute(select(Domain).where(Domain.slug == domain))
domain_obj = domain_row.scalar_one_or_none()
if domain_obj is None:
raise HTTPException(status_code=404, detail=f"Domain '{domain}' not found")
q = q.where(ManagedRepo.domain_id == domain_obj.id)
result = await session.execute(q)
return list(result.scalars().all())
@router.post("/", response_model=RepoRead, status_code=status.HTTP_201_CREATED)
async def register_repo(
body: RepoCreate,
session: AsyncSession = Depends(get_session),
) -> ManagedRepo:
domain_row = await session.execute(select(Domain).where(Domain.slug == body.domain_slug))
domain_obj = domain_row.scalar_one_or_none()
if domain_obj is None:
raise HTTPException(status_code=404, detail=f"Domain '{body.domain_slug}' not found")
existing = await session.execute(select(ManagedRepo).where(ManagedRepo.slug == body.slug))
if existing.scalar_one_or_none():
raise HTTPException(status_code=409, detail=f"Repo slug '{body.slug}' already exists")
repo = ManagedRepo(
domain_id=domain_obj.id,
slug=body.slug,
name=body.name,
local_path=body.local_path,
remote_url=body.remote_url,
git_fingerprint=body.git_fingerprint,
description=body.description,
topic_id=body.topic_id,
)
session.add(repo)
await session.commit()
await session.refresh(repo)
return repo
@router.get("/by-fingerprint", response_model=list[RepoRead])
async def get_repo_by_fingerprint(
hash: str,
remote_url: str | None = None,
session: AsyncSession = Depends(get_session),
) -> list[ManagedRepo]:
"""Look up repos by git root-commit SHA-1 fingerprint.
The fingerprint is the output of ``git rev-list --max-parents=0 HEAD`` and
is identical across every clone of the same repository. Repos that share
git history (forks, monorepo splits) will have the same fingerprint.
Pass ``remote_url`` to narrow results to a specific remote — useful when
multiple repos share the same ancestor commit.
Returns an empty list if no match is found.
"""
q = select(ManagedRepo).where(ManagedRepo.git_fingerprint == hash)
if remote_url:
q = q.where(ManagedRepo.remote_url == remote_url)
result = await session.execute(q)
return list(result.scalars().all())
@router.get("/by-remote", response_model=RepoRead)
async def get_repo_by_remote_url(
url: str,
session: AsyncSession = Depends(get_session),
) -> ManagedRepo:
"""Look up a repo by its git remote URL (fallback; prefer /by-fingerprint)."""
result = await session.execute(select(ManagedRepo).where(ManagedRepo.remote_url == url))
repo = result.scalar_one_or_none()
if repo is None:
raise HTTPException(status_code=404, detail=f"No repo with remote_url '{url}' found")
return repo
@router.get("/doi/summary", response_model=list[DoISummaryEntry])
async def doi_summary(session: AsyncSession = Depends(get_session)) -> list[DoISummaryEntry]:
"""Return DoI tier for all active repos, worst tier first.
Results are cached in doi_cache. A repo is only re-evaluated when its
fingerprint changes (repo record updated, new TPSC snapshot, goal change,
or a key file mtime changes on disk).
"""
repos_result = await session.execute(
select(ManagedRepo).where(ManagedRepo.status == "active").order_by(ManagedRepo.name)
)
repos = list(repos_result.scalars().all())
repo_ids = [r.id for r in repos]
id_to_slug = {r.id: r.slug for r in repos}
# ── Bulk DB queries for fingerprint inputs ────────────────────────────────
domains_result = await session.execute(select(Domain))
domain_obj_map = {d.id: d for d in domains_result.scalars().all()}
domain_map = {d.id: d.slug for d in domain_obj_map.values()}
domain_status = {d.slug: d.status for d in domain_obj_map.values()}
# Latest TPSC snapshot timestamp per repo (for fingerprint + C9 count)
tpsc_result = await session.execute(
select(TPSCSnapshot.repo_id,
func.count().label("cnt"),
func.max(TPSCSnapshot.snapshot_at).label("latest"))
.where(TPSCSnapshot.repo_id.in_(repo_ids))
.group_by(TPSCSnapshot.repo_id)
)
tpsc_by_id = {row.repo_id: row for row in tpsc_result}
tpsc_snap_counts = {id_to_slug[rid]: row.cnt for rid, row in tpsc_by_id.items() if rid in id_to_slug}
tpsc_snap_latest = {id_to_slug[rid]: str(row.latest) for rid, row in tpsc_by_id.items() if rid in id_to_slug}
# Latest goal updated_at + active count per repo (for fingerprint + C10)
goals_result = await session.execute(
select(RepoGoal.repo_id,
func.count().label("total"),
func.sum(case((RepoGoal.status == "active", 1), else_=0)).label("active_cnt"),
func.max(RepoGoal.updated_at).label("latest"))
.where(RepoGoal.repo_id.in_(repo_ids))
.group_by(RepoGoal.repo_id)
)
goals_by_id = {row.repo_id: row for row in goals_result}
active_goal_counts = {id_to_slug[rid]: int(row.active_cnt or 0) for rid, row in goals_by_id.items() if rid in id_to_slug}
goals_latest = {id_to_slug[rid]: str(row.latest) for rid, row in goals_by_id.items() if rid in id_to_slug}
# Load existing cache rows
cache_result = await session.execute(
select(DOICache).where(DOICache.repo_id.in_(repo_ids))
)
cache_by_repo_id = {row.repo_id: row for row in cache_result.scalars().all()}
# ─────────────────────────────────────────────────────────────────────────
prefetch = {
"domain_status": domain_status,
"tpsc_snap_counts": tpsc_snap_counts,
"active_goal_counts": active_goal_counts,
}
async def _get_or_refresh(repo: ManagedRepo) -> DoISummaryEntry:
slug = repo.slug
repo_dict = {
"slug": slug,
"domain_slug": domain_map.get(repo.domain_id),
"local_path": repo.local_path,
"remote_url": repo.remote_url,
"host_paths": repo.host_paths or {},
"last_sbom_at": str(repo.last_sbom_at) if repo.last_sbom_at else None,
"updated_at": str(repo.updated_at) if repo.updated_at else "",
}
fp = compute_fingerprint(
repo_dict,
tpsc_snap_latest.get(slug),
goals_latest.get(slug),
)
cached = cache_by_repo_id.get(repo.id)
if cached and cached.fingerprint == fp:
# Cache hit — return stored result
return DoISummaryEntry(
repo_slug=slug,
domain_slug=domain_map.get(repo.domain_id),
tier=cached.tier,
core_pass=cached.core_pass,
standard_pass=cached.standard_pass,
full_pass=cached.full_pass,
checked_at=cached.checked_at.isoformat(),
)
# Cache miss — evaluate and store
report = await _doi_evaluate(repo_dict, skip_consistency=True, prefetch=prefetch)
now = datetime.now(tz=timezone.utc)
if cached:
cached.tier = report.tier
cached.core_pass = report.core_pass
cached.standard_pass = report.standard_pass
cached.full_pass = report.full_pass
cached.criteria = [{"id": c.id, "label": c.label, "tier": c.tier,
"status": c.status, "detail": c.detail}
for c in report.criteria]
cached.fingerprint = fp
cached.checked_at = now
cached.updated_at = now
else:
session.add(DOICache(
repo_id=repo.id,
tier=report.tier,
core_pass=report.core_pass,
standard_pass=report.standard_pass,
full_pass=report.full_pass,
criteria=[{"id": c.id, "label": c.label, "tier": c.tier,
"status": c.status, "detail": c.detail}
for c in report.criteria],
fingerprint=fp,
checked_at=now,
updated_at=now,
))
return DoISummaryEntry(
repo_slug=slug,
domain_slug=domain_map.get(repo.domain_id),
tier=report.tier,
core_pass=report.core_pass,
standard_pass=report.standard_pass,
full_pass=report.full_pass,
checked_at=now.isoformat(),
)
entries: list[DoISummaryEntry] = list(await asyncio.gather(*[_get_or_refresh(r) for r in repos]))
await session.commit()
tier_order = {"none": 0, "core": 1, "standard": 2, "full": 3}
entries.sort(key=lambda e: tier_order.get(e.tier, 0))
return entries
@router.get("/{slug}/doi", response_model=DoIReport)
async def get_repo_doi(
slug: str,
force_refresh: bool = False,
session: AsyncSession = Depends(get_session),
) -> DoIReport:
"""Evaluate the 14 DoI criteria for a single repo (full check including C7/C13).
Results are cached by fingerprint. Pass ?force_refresh=true to bypass the cache.
"""
repo = await _get_repo_by_slug(slug, session)
domain_result = await session.execute(select(Domain).where(Domain.id == repo.domain_id))
domain_obj = domain_result.scalar_one_or_none()
# Fingerprint inputs for this single repo
tpsc_row = (await session.execute(
select(func.count().label("cnt"), func.max(TPSCSnapshot.snapshot_at).label("latest"))
.where(TPSCSnapshot.repo_id == repo.id)
)).one()
goal_row = (await session.execute(
select(func.max(RepoGoal.updated_at).label("latest"))
.where(RepoGoal.repo_id == repo.id)
)).one()
repo_dict = {
"slug": repo.slug,
"domain_slug": domain_obj.slug if domain_obj else None,
"local_path": repo.local_path,
"remote_url": repo.remote_url,
"host_paths": repo.host_paths or {},
"last_sbom_at": str(repo.last_sbom_at) if repo.last_sbom_at else None,
"updated_at": str(repo.updated_at) if repo.updated_at else "",
}
fp = compute_fingerprint(repo_dict, str(tpsc_row.latest) if tpsc_row.latest else None,
str(goal_row.latest) if goal_row.latest else None)
# Check cache (unless force_refresh)
cached = (await session.execute(
select(DOICache).where(DOICache.repo_id == repo.id)
)).scalar_one_or_none()
if not force_refresh and cached and cached.fingerprint == fp and cached.criteria:
return DoIReport(
repo_slug=slug,
tier=cached.tier,
core_pass=cached.core_pass,
standard_pass=cached.standard_pass,
full_pass=cached.full_pass,
checked_at=cached.checked_at.isoformat(),
criteria=[DoICriterion(**c) for c in cached.criteria],
)
# Full evaluation (includes C7/C13 consistency subprocesses)
report = await _doi_evaluate(repo_dict)
now = datetime.now(tz=timezone.utc)
criteria_json = [{"id": c.id, "label": c.label, "tier": c.tier,
"status": c.status, "detail": c.detail} for c in report.criteria]
if cached:
cached.tier = report.tier; cached.core_pass = report.core_pass
cached.standard_pass = report.standard_pass; cached.full_pass = report.full_pass
cached.criteria = criteria_json; cached.fingerprint = fp
cached.checked_at = now; cached.updated_at = now
else:
session.add(DOICache(repo_id=repo.id, tier=report.tier,
core_pass=report.core_pass, standard_pass=report.standard_pass,
full_pass=report.full_pass, criteria=criteria_json,
fingerprint=fp, checked_at=now, updated_at=now))
await session.commit()
return DoIReport(
repo_slug=report.repo_slug, tier=report.tier,
core_pass=report.core_pass, standard_pass=report.standard_pass,
full_pass=report.full_pass, checked_at=report.checked_at,
criteria=[DoICriterion(id=c.id, label=c.label, tier=c.tier,
status=c.status, detail=c.detail) for c in report.criteria],
)
@router.get("/by-id/{repo_id}", response_model=RepoRead)
async def get_repo_by_id(
repo_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
) -> ManagedRepo:
repo = await session.get(ManagedRepo, repo_id)
if repo is None:
raise HTTPException(status_code=404, detail=f"Repo '{repo_id}' not found")
return repo
@router.get("/{slug}", response_model=RepoRead)
async def get_repo(
slug: str,
session: AsyncSession = Depends(get_session),
) -> ManagedRepo:
return await _get_repo_by_slug(slug, session)
@router.patch("/{slug}", response_model=RepoRead)
async def update_repo(
slug: str,
body: RepoUpdate,
session: AsyncSession = Depends(get_session),
) -> ManagedRepo:
repo = await _get_repo_by_slug(slug, session)
for field, value in body.model_dump(exclude_unset=True).items():
setattr(repo, field, value)
await session.commit()
await session.refresh(repo)
return repo
@router.post("/{slug}/paths", response_model=RepoRead)
async def register_host_path(
slug: str,
body: RepoPathRegister,
session: AsyncSession = Depends(get_session),
) -> ManagedRepo:
"""Register or update the local path for a specific host.
Merges {"host": path} into host_paths without overwriting other entries.
Use this when a repo lives at a different absolute path on different machines.
"""
repo = await _get_repo_by_slug(slug, session)
updated = dict(repo.host_paths or {})
updated[body.host] = body.path
repo.host_paths = updated
await session.commit()
await session.refresh(repo)
return repo
@router.patch("/{slug}/archive", response_model=RepoRead)
async def archive_repo(
slug: str,
session: AsyncSession = Depends(get_session),
) -> ManagedRepo:
repo = await _get_repo_by_slug(slug, session)
repo.status = "archived"
await session.commit()
await session.refresh(repo)
return repo
@router.get("/{slug}/dispatch", response_model=RepoDispatch)
async def get_repo_dispatch(
slug: str,
session: AsyncSession = Depends(get_session),
) -> RepoDispatch:
"""Return active workstreams, pending tasks, and goal for a repo.
This endpoint is the foundation for autonomous agent sessions: an agent can
call it at session start to discover what work is pending without needing to
read state-hub summary or scan workplan files manually.
"""
repo = await _get_repo_by_slug(slug, session)
# Active goal
goal_result = await session.execute(
select(RepoGoal)
.where(RepoGoal.repo_id == repo.id, RepoGoal.status == "active")
.order_by(RepoGoal.priority)
.limit(1)
)
goal_obj = goal_result.scalar_one_or_none()
active_goal = None
if goal_obj:
active_goal = {
"id": str(goal_obj.id),
"title": goal_obj.title,
"description": goal_obj.description,
"priority": goal_obj.priority,
}
# Active workstreams
ws_result = await session.execute(
select(Workstream)
.where(Workstream.repo_id == repo.id, Workstream.status == "active")
.order_by(Workstream.created_at)
)
workstreams = list(ws_result.scalars().all())
dispatch_workstreams: list[DispatchWorkstream] = []
all_interventions: list[DispatchTask] = []
for ws in workstreams:
task_result = await session.execute(
select(Task)
.where(Task.workstream_id == ws.id, Task.status.in_(["todo", "in_progress"]))
.order_by(Task.created_at)
)
tasks = list(task_result.scalars().all())
pending = [
DispatchTask(
id=t.id,
title=t.title,
priority=t.priority,
status=t.status,
needs_human=t.needs_human,
)
for t in tasks
]
interventions = [t for t in pending if t.needs_human]
all_interventions.extend(interventions)
dispatch_workstreams.append(
DispatchWorkstream(
id=ws.id,
title=ws.title,
status=ws.status,
pending_tasks=pending,
)
)
# Published interface changes that affect this repo and are not yet resolved
ic_result = await session.execute(
select(InterfaceChange).where(
InterfaceChange.status == "published",
InterfaceChange.affected_repo_slugs.contains([slug]),
).order_by(InterfaceChange.published_at.desc())
)
pending_changes = [
PendingInterfaceChange(
id=ic.id,
title=ic.title,
change_type=ic.change_type,
interface_type=ic.interface_type,
origin_repo_slug=ic.repo.slug,
affected_paths=ic.affected_paths or [],
planned_for=ic.planned_for,
published_at=ic.published_at,
)
for ic in ic_result.scalars().all()
]
return RepoDispatch(
repo_slug=slug,
active_goal=active_goal,
active_workstreams=dispatch_workstreams,
human_interventions=all_interventions,
pending_interface_changes=pending_changes,
last_state_synced_at=repo.last_state_synced_at,
)
@router.post("/{slug}/sync")
async def sync_repo_consistency(
slug: str,
fix: bool = True,
session: AsyncSession = Depends(get_session),
) -> dict:
"""Run ADR-001 consistency check (and optional --fix) for a repo via HTTP.
Intended for non-Claude-Code agents (e.g. Codex) that cannot use MCP tools
but need to sync workplan file state to the state-hub DB after making changes.
Returns the raw JSON output from consistency_check.py.
Query param ?fix=false to run check-only without writing.
"""
repo = await _get_repo_by_slug(slug, session)
hostname = socket.gethostname()
host_paths = repo.host_paths or {}
repo_path = host_paths.get(hostname)
if not repo_path or not Path(repo_path).exists():
raise HTTPException(
status_code=503,
detail=(
f"No accessible path for repo '{slug}' on host '{hostname}'. "
f"Register with: POST /repos/{slug}/paths/"
),
)
script = Path(__file__).parent.parent.parent / "scripts" / "consistency_check.py"
cmd = [sys.executable, str(script), "--repo", slug, "--json",
"--api-base", settings.api_base]
if fix:
cmd.append("--fix")
result = await asyncio.to_thread(
subprocess.run, cmd, capture_output=True, text=True
)
try:
return json.loads(result.stdout)
except Exception:
raise HTTPException(
status_code=500,
detail=f"Consistency check failed: {result.stderr or result.stdout or '(no output)'}",
)
async def _get_repo_by_slug(slug: str, session: AsyncSession) -> ManagedRepo:
result = await session.execute(select(ManagedRepo).where(ManagedRepo.slug == slug))
repo = result.scalar_one_or_none()
if repo is None:
raise HTTPException(status_code=404, detail=f"Repo '{slug}' not found")
return repo