diff --git a/state-hub/mcp_server/server.py b/state-hub/mcp_server/server.py index 5579f6b..a0edc32 100644 --- a/state-hub/mcp_server/server.py +++ b/state-hub/mcp_server/server.py @@ -2222,6 +2222,113 @@ def get_doi_summary() -> str: return json.dumps(_get("/repos/doi/summary"), indent=2) +# --------------------------------------------------------------------------- +# Interactive / ad-hoc task recording +# --------------------------------------------------------------------------- + + +@mcp.tool() +def record_interactive_task( + title: str, + repo_slug: str, + tokens_in: Optional[int] = None, + tokens_out: Optional[int] = None, + model: Optional[str] = None, + agent: Optional[str] = None, + description: Optional[str] = None, + session_id: Optional[str] = None, +) -> str: + """Record ad-hoc interactive work as a task with token consumption. + + Finds or creates a persistent 'interactive-' workstream for the repo, + creates the task, marks it done immediately, and records a token event using + the three-tier logic (exact > heuristic). + + Use this for work done outside a formal workplan: quick fixes, config changes, + code reviews, one-off investigations, or any session work worth tracking. + + Args: + title: Short description of the work done + repo_slug: Registered repo slug, e.g. 'the-custodian', 'inter-hub' + tokens_in: Exact input token count for this task (Tier 1 — best practice) + tokens_out: Exact output token count for this task (Tier 1) + model: Model identifier, e.g. 'claude-sonnet-4-6' + agent: Agent name, e.g. 'custodian', 'ralph' + description: Optional longer description of what was done + session_id: Agent session identifier + """ + # Resolve repo + repos = _get("/repos/") + if isinstance(repos, dict) and "error" in repos: + return json.dumps(repos) + repo = next((r for r in (repos or []) if r.get("slug") == repo_slug), None) + if not repo: + return json.dumps({"error": f"Repo not found: {repo_slug!r}. Register it first with register_repo()."}) + + repo_id = repo["id"] + domain_slug = repo.get("domain_slug") or repo.get("domain") + ws_slug = f"interactive-{repo_slug}" + + # Find or create the interactive workstream + existing = _get("/workstreams/", {"slug": ws_slug}) + ws = existing[0] if isinstance(existing, list) and existing else None + + if not ws: + # Find a topic for this domain to satisfy the FK + topics = _get("/topics/") + topic = next( + (t for t in (topics if isinstance(topics, list) else []) + if t.get("domain_slug") == domain_slug or t.get("domain") == domain_slug), + None, + ) + if not topic: + return json.dumps({"error": f"No topic found for domain {domain_slug!r} — cannot create workstream."}) + + ws = _post("/workstreams", { + "topic_id": topic["id"], + "slug": ws_slug, + "title": f"Interactive — {repo_slug}", + "description": "Ad-hoc tasks created outside a formal workplan.", + "owner": "custodian", + "repo_id": repo_id, + }) + if "error" in ws: + return json.dumps(ws) + + # Create task + task = _post("/tasks", { + "workstream_id": ws["id"], + "title": title, + "description": description, + "priority": "medium", + }) + if "error" in task: + return json.dumps(task) + + # Mark done — triggers three-tier token recording in the router + body: dict[str, Any] = { + "status": "done", + "model": model, + "agent": agent, + "session_id": session_id, + } + if tokens_in is not None: + body["tokens_in"] = tokens_in + if tokens_out is not None: + body["tokens_out"] = tokens_out + + _patch(f"/tasks/{task['id']}", body) + + tier = "exact" if tokens_in is not None else "heuristic" + return json.dumps({ + "task_id": task["id"], + "workstream_id": ws["id"], + "workstream_slug": ws_slug, + "title": title, + "token_tier": tier, + }, indent=2) + + # --------------------------------------------------------------------------- # Token events # ---------------------------------------------------------------------------