Files
state-hub/api/routers/repos.py
tegwick 82552b8d59 feat(repos): multi-machine path support via host_paths
Adds a JSONB column `host_paths` to managed_repos mapping
hostname → absolute local path. Fixes the consistency-checker
failure when the same repo lives at different paths on different
machines (e.g. /home/worsch/marki-docx on the workstation vs
/home/tegwick/marki-docx on custodiancore).

Changes:
- Migration g4b5c6d7e8f9: adds host_paths JSONB (default {})
- Model: host_paths Mapped[dict] column
- Schemas: host_paths in RepoRead; new RepoPathRegister schema
- Router: POST /repos/{slug}/paths/ — merges one host entry
- consistency_check.py: resolve_repo_path() prefers host_paths
  [hostname] over local_path; --repo-path CLI override added
- MCP: update_repo_path(slug, path, host?) tool
- Makefile: register-path target; REPO_PATH passthrough on
  check-consistency and fix-consistency targets
- TOOLS.md: documents update_repo_path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-16 16:30:55 +01:00

211 lines
6.6 KiB
Python

import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session
from api.models.domain import Domain
from api.models.managed_repo import ManagedRepo
from api.models.repo_goal import RepoGoal
from api.models.task import Task
from api.models.workstream import Workstream
from api.schemas.managed_repo import (
DispatchTask,
DispatchWorkstream,
RepoCreate,
RepoDispatch,
RepoPathRegister,
RepoRead,
RepoUpdate,
)
router = APIRouter(prefix="/repos", tags=["repos"])
@router.get("/", response_model=list[RepoRead])
async def list_repos(
domain: str | None = None,
session: AsyncSession = Depends(get_session),
) -> list[ManagedRepo]:
q = select(ManagedRepo).order_by(ManagedRepo.name)
if domain:
domain_row = await session.execute(select(Domain).where(Domain.slug == domain))
domain_obj = domain_row.scalar_one_or_none()
if domain_obj is None:
raise HTTPException(status_code=404, detail=f"Domain '{domain}' not found")
q = q.where(ManagedRepo.domain_id == domain_obj.id)
result = await session.execute(q)
return list(result.scalars().all())
@router.post("/", response_model=RepoRead, status_code=status.HTTP_201_CREATED)
async def register_repo(
body: RepoCreate,
session: AsyncSession = Depends(get_session),
) -> ManagedRepo:
domain_row = await session.execute(select(Domain).where(Domain.slug == body.domain_slug))
domain_obj = domain_row.scalar_one_or_none()
if domain_obj is None:
raise HTTPException(status_code=404, detail=f"Domain '{body.domain_slug}' not found")
existing = await session.execute(select(ManagedRepo).where(ManagedRepo.slug == body.slug))
if existing.scalar_one_or_none():
raise HTTPException(status_code=409, detail=f"Repo slug '{body.slug}' already exists")
repo = ManagedRepo(
domain_id=domain_obj.id,
slug=body.slug,
name=body.name,
local_path=body.local_path,
remote_url=body.remote_url,
description=body.description,
topic_id=body.topic_id,
)
session.add(repo)
await session.commit()
await session.refresh(repo)
return repo
@router.get("/{slug}/", response_model=RepoRead)
async def get_repo(
slug: str,
session: AsyncSession = Depends(get_session),
) -> ManagedRepo:
return await _get_repo_by_slug(slug, session)
@router.patch("/{slug}/", response_model=RepoRead)
async def update_repo(
slug: str,
body: RepoUpdate,
session: AsyncSession = Depends(get_session),
) -> ManagedRepo:
repo = await _get_repo_by_slug(slug, session)
for field, value in body.model_dump(exclude_unset=True).items():
setattr(repo, field, value)
await session.commit()
await session.refresh(repo)
return repo
@router.post("/{slug}/paths/", response_model=RepoRead)
async def register_host_path(
slug: str,
body: RepoPathRegister,
session: AsyncSession = Depends(get_session),
) -> ManagedRepo:
"""Register or update the local path for a specific host.
Merges {"host": path} into host_paths without overwriting other entries.
Use this when a repo lives at a different absolute path on different machines.
"""
repo = await _get_repo_by_slug(slug, session)
updated = dict(repo.host_paths or {})
updated[body.host] = body.path
repo.host_paths = updated
await session.commit()
await session.refresh(repo)
return repo
@router.patch("/{slug}/archive", response_model=RepoRead)
async def archive_repo(
slug: str,
session: AsyncSession = Depends(get_session),
) -> ManagedRepo:
repo = await _get_repo_by_slug(slug, session)
repo.status = "archived"
await session.commit()
await session.refresh(repo)
return repo
@router.get("/{slug}/dispatch", response_model=RepoDispatch)
async def get_repo_dispatch(
slug: str,
session: AsyncSession = Depends(get_session),
) -> RepoDispatch:
"""Return active workstreams, pending tasks, and goal for a repo.
This endpoint is the foundation for autonomous agent sessions: an agent can
call it at session start to discover what work is pending without needing to
read state-hub summary or scan workplan files manually.
"""
repo = await _get_repo_by_slug(slug, session)
# Active goal
goal_result = await session.execute(
select(RepoGoal)
.where(RepoGoal.repo_id == repo.id, RepoGoal.status == "active")
.order_by(RepoGoal.priority)
.limit(1)
)
goal_obj = goal_result.scalar_one_or_none()
active_goal = None
if goal_obj:
active_goal = {
"id": str(goal_obj.id),
"title": goal_obj.title,
"description": goal_obj.description,
"priority": goal_obj.priority,
}
# Active workstreams
ws_result = await session.execute(
select(Workstream)
.where(Workstream.repo_id == repo.id, Workstream.status == "active")
.order_by(Workstream.created_at)
)
workstreams = list(ws_result.scalars().all())
dispatch_workstreams: list[DispatchWorkstream] = []
all_interventions: list[DispatchTask] = []
for ws in workstreams:
task_result = await session.execute(
select(Task)
.where(Task.workstream_id == ws.id, Task.status.in_(["todo", "in_progress"]))
.order_by(Task.created_at)
)
tasks = list(task_result.scalars().all())
pending = [
DispatchTask(
id=t.id,
title=t.title,
priority=t.priority,
status=t.status,
needs_human=t.needs_human,
)
for t in tasks
]
interventions = [t for t in pending if t.needs_human]
all_interventions.extend(interventions)
dispatch_workstreams.append(
DispatchWorkstream(
id=ws.id,
title=ws.title,
status=ws.status,
pending_tasks=pending,
)
)
return RepoDispatch(
repo_slug=slug,
active_goal=active_goal,
active_workstreams=dispatch_workstreams,
human_interventions=all_interventions,
last_state_synced_at=repo.last_state_synced_at,
)
async def _get_repo_by_slug(slug: str, session: AsyncSession) -> ManagedRepo:
result = await session.execute(select(ManagedRepo).where(ManagedRepo.slug == slug))
repo = result.scalar_one_or_none()
if repo is None:
raise HTTPException(status_code=404, detail=f"Repo '{slug}' not found")
return repo