diff --git a/Makefile b/Makefile index a7f1b7e..5075d05 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,9 @@ dashboard-check: dashboard-install install-cli: install mkdir -p ~/.local/bin ln -sf "$(shell pwd)/.venv/bin/custodian" ~/.local/bin/custodian + ln -sf "$(shell pwd)/.venv/bin/statehub" ~/.local/bin/statehub @echo "Installed: custodian → $$(readlink -f ~/.local/bin/custodian)" + @echo "Installed: statehub → $$(readlink -f ~/.local/bin/statehub)" @echo "Make sure ~/.local/bin is on your PATH:" @echo " echo 'export PATH=\"\$$HOME/.local/bin:\$$PATH\"' >> ~/.bashrc && source ~/.bashrc" diff --git a/custodian_cli.py b/custodian_cli.py index 49ef9eb..70f554b 100644 --- a/custodian_cli.py +++ b/custodian_cli.py @@ -21,6 +21,8 @@ import urllib.error import urllib.request from pathlib import Path +from statehub_register import run_register as run_statehub_register + STATE_HUB_DIR = Path(__file__).resolve().parent API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000") TEMPLATE = STATE_HUB_DIR / "scripts" / "project_claude_md.template" @@ -468,11 +470,51 @@ def cmd_status(_args: argparse.Namespace) -> None: def main() -> None: parser = argparse.ArgumentParser( - prog="custodian", + prog=Path(sys.argv[0]).name, description="Custodian State Hub CLI", ) sub = parser.add_subparsers(dest="command", required=True) + # register + statehub_reg = sub.add_parser( + "register", + help="Register the current repo with State Hub and prime it for Codex", + ) + statehub_reg.add_argument("--path", default=os.getcwd(), help="Repo directory (defaults to cwd)") + statehub_reg.add_argument("--domain", default=None, help="State Hub domain slug") + statehub_reg.add_argument("--repo-slug", default=None, help="Repo slug (auto-detected if omitted)") + statehub_reg.add_argument("--wp-prefix", default=None, help="Workplan prefix, e.g. STATE-WP") + statehub_reg.add_argument("--description", default=None, help="One-sentence repo description") + statehub_reg.add_argument( + "--intent", + default=None, + help="Repo intent text to use when INTENT.md is absent and inference is insufficient", + ) + statehub_reg.add_argument("--api-base", default=API_BASE, help="State Hub API base URL") + statehub_reg.add_argument( + "--llm-provider", + default=os.environ.get("STATEHUB_REGISTER_LLM_PROVIDER", "claude-code"), + help="llm-connect provider: claude-code, openrouter, openai, gemini, or mock", + ) + statehub_reg.add_argument( + "--llm-model", + default=os.environ.get("STATEHUB_REGISTER_LLM_MODEL"), + help="Model name passed to llm-connect", + ) + statehub_reg.add_argument( + "--llm-api-key", + default=os.environ.get("STATEHUB_REGISTER_LLM_API_KEY"), + help="API key for API-backed llm-connect providers", + ) + statehub_reg.add_argument( + "--llm-timeout", + type=int, + default=int(os.environ.get("STATEHUB_REGISTER_LLM_TIMEOUT", "120")), + help="LLM timeout in seconds", + ) + statehub_reg.add_argument("--no-llm", action="store_true", help="Skip LLM inference and use files/prompts") + statehub_reg.add_argument("--force", action="store_true", help="Overwrite generated repo files") + # register-project reg = sub.add_parser("register-project", help="Register a project with the State Hub") reg.add_argument( @@ -513,7 +555,9 @@ def main() -> None: args = parser.parse_args() - if args.command == "register-project": + if args.command == "register": + run_statehub_register(args) + elif args.command == "register-project": cmd_register(args) elif args.command == "ingest-sbom": cmd_ingest_sbom(args) diff --git a/pyproject.toml b/pyproject.toml index a019b0e..5afa228 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ dependencies = [ [project.scripts] custodian = "custodian_cli:main" +statehub = "custodian_cli:main" [build-system] requires = ["hatchling"] @@ -29,7 +30,7 @@ build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["api", "mcp_server", "task_flow_engine"] -artifacts = ["custodian_cli.py"] +artifacts = ["custodian_cli.py", "statehub_register.py"] [tool.uv.sources] llm-connect = { path = "/home/worsch/llm-connect", editable = true } diff --git a/statehub_register.py b/statehub_register.py new file mode 100644 index 0000000..309df78 --- /dev/null +++ b/statehub_register.py @@ -0,0 +1,865 @@ +from __future__ import annotations + +import argparse +import json +import os +import re +import socket +import subprocess +import sys +import textwrap +import urllib.error +import urllib.request +from dataclasses import dataclass, field +from datetime import date +from pathlib import Path +from typing import Any + + +STATE_HUB_DIR = Path(__file__).resolve().parent +API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000") +RULES_TEMPLATES_DIR = STATE_HUB_DIR / "scripts" / "project_rules" + +KEY_CONTEXT_FILES = [ + "INTENT.md", + "README.md", + "SCOPE.md", + "AGENTS.md", + "CLAUDE.md", + "pyproject.toml", + "package.json", + "Cargo.toml", + "go.mod", + "Makefile", +] + + +@dataclass +class RepoSnapshot: + path: Path + project_name: str + top_level_files: list[str] + context_files: dict[str, str] + remote_url: str | None = None + git_fingerprint: str | None = None + + +@dataclass +class RegisterInference: + project_description: str | None = None + intent_markdown: str | None = None + domain_slug: str | None = None + topic_slug: str | None = None + topic_title: str | None = None + repo_slug: str | None = None + workplan_prefix: str | None = None + in_scope: list[str] = field(default_factory=list) + out_of_scope: list[str] = field(default_factory=list) + current_state: str | None = None + + +def run_register(args: argparse.Namespace) -> None: + project_path = Path(args.path).expanduser().resolve() + if not project_path.is_dir(): + print(f"ERROR: {project_path} is not a directory.") + sys.exit(1) + + snapshot = collect_repo_snapshot(project_path) + print(f"==> Inspecting repo at {snapshot.path}") + + _check_api(args.api_base) + domains = _api_get("/domains/?status=active", args.api_base) + domain_slugs = [d["slug"] for d in domains] + + inference = infer_registration(snapshot, args, domain_slugs) + repo_slug = args.repo_slug or inference.repo_slug or _slugify(snapshot.project_name) + wp_prefix = args.wp_prefix or inference.workplan_prefix or _default_wp_prefix(repo_slug) + domain = args.domain or inference.domain_slug or _detect_domain_from_files(snapshot) + project_description = ( + args.description + or inference.project_description + or _derive_description_from_files(snapshot) + or f"{snapshot.project_name} repository." + ) + + if domain not in domain_slugs: + domain = _ask_for_domain(domain, domain_slugs) + + intent_markdown = _resolve_intent_markdown(snapshot, inference, args, project_description) + topic = _find_or_create_topic(domain, snapshot.project_name, repo_slug, inference, args.api_base) + topic_id = topic["id"] + topic_slug = topic.get("slug") or domain + + print(f"==> Writing State Hub agent files for '{repo_slug}'") + written = write_registration_files( + project_path=project_path, + project_name=snapshot.project_name, + project_description=project_description, + domain=domain, + topic_id=topic_id, + topic_slug=topic_slug, + repo_slug=repo_slug, + wp_prefix=wp_prefix, + intent_markdown=intent_markdown, + inference=inference, + force=args.force, + ) + for path in written: + print(f" wrote {path}") + if not written: + print(" files already present; nothing overwritten") + + repo = _register_or_update_repo( + domain=domain, + repo_slug=repo_slug, + project_name=snapshot.project_name, + project_description=project_description, + project_path=project_path, + remote_url=snapshot.remote_url, + git_fingerprint=snapshot.git_fingerprint, + topic_id=topic_id, + api_base=args.api_base, + ) + _register_host_path(repo_slug, project_path, args.api_base) + _record_progress(repo_slug, domain, project_path, topic_id, args.api_base) + + print() + print("Registration complete!") + print(f" Project: {snapshot.project_name}") + print(f" Domain: {domain}") + print(f" Repo slug: {repo_slug}") + print(f" Topic ID: {topic_id}") + print(f" Repo ID: {repo.get('id', '(existing)') if isinstance(repo, dict) else '(unknown)'}") + print() + print("Next:") + print(f" cd {STATE_HUB_DIR}") + print(f" make fix-consistency REPO={repo_slug}") + + +def collect_repo_snapshot(project_path: Path) -> RepoSnapshot: + top_level_files = sorted( + p.name for p in project_path.iterdir() + if p.name not in {".git", ".venv", "node_modules"} + )[:160] + context_files: dict[str, str] = {} + for name in KEY_CONTEXT_FILES: + path = project_path / name + if path.is_file(): + context_files[name] = _read_limited(path, 12000) + + return RepoSnapshot( + path=project_path, + project_name=project_path.name, + top_level_files=top_level_files, + context_files=context_files, + remote_url=_git_output(project_path, ["config", "--get", "remote.origin.url"]), + git_fingerprint=_git_output(project_path, ["rev-list", "--max-parents=0", "HEAD"]), + ) + + +def infer_registration( + snapshot: RepoSnapshot, + args: argparse.Namespace, + domain_slugs: list[str], +) -> RegisterInference: + if args.no_llm: + return RegisterInference() + + prompt = build_inference_prompt(snapshot, domain_slugs) + try: + response_text = _invoke_llm(prompt, args) + except Exception as exc: + print(f" LLM inference unavailable ({exc}); falling back to repo files/user prompts.") + return RegisterInference() + + parsed = _parse_json_object(response_text) + if not parsed: + print(" LLM inference did not return JSON; falling back to repo files/user prompts.") + return RegisterInference() + return _normalise_inference(parsed) + + +def build_inference_prompt(snapshot: RepoSnapshot, domain_slugs: list[str]) -> str: + context = { + "project_name": snapshot.project_name, + "top_level_files": snapshot.top_level_files, + "remote_url": snapshot.remote_url, + "git_fingerprint": snapshot.git_fingerprint, + "context_files": { + name: text[:6000] + for name, text in snapshot.context_files.items() + }, + "available_domain_slugs": domain_slugs, + } + return textwrap.dedent( + f""" + You are helping register a local repository with Custodian State Hub. + + Infer the repository identity from the supplied files. Return only + strict JSON with these keys: + - project_description: one precise sentence. + - intent_markdown: complete Markdown for INTENT.md, or null if an + existing INTENT.md is already adequate. + - domain_slug: one available domain slug, or null if not inferable. + - topic_slug: short lowercase topic slug. + - topic_title: human-readable topic title. + - repo_slug: lowercase repo slug. + - workplan_prefix: uppercase prefix ending in -WP. + - in_scope: array of short responsibility bullets. + - out_of_scope: array of short non-responsibility bullets. + - current_state: short maturity/status sentence. + + Use an available domain slug. Do not invent secrets or claim certainty + where the files do not support it. + + Repository context: + {json.dumps(context, indent=2)} + """ + ).strip() + + +def write_registration_files( + *, + project_path: Path, + project_name: str, + project_description: str, + domain: str, + topic_id: str, + topic_slug: str, + repo_slug: str, + wp_prefix: str, + intent_markdown: str, + inference: RegisterInference, + force: bool = False, +) -> list[Path]: + written: list[Path] = [] + + values = { + "{PROJECT_NAME}": project_name, + "{PROJECT_DESCRIPTION}": project_description, + "{DOMAIN}": domain, + "{TOPIC_ID}": topic_id, + "{REPO_SLUG}": repo_slug, + "{WP_PREFIX}": wp_prefix, + } + + intent_path = project_path / "INTENT.md" + if force or not intent_path.exists(): + intent_path.write_text(_ensure_trailing_newline(intent_markdown)) + written.append(intent_path) + + scope_path = project_path / "SCOPE.md" + if force or not scope_path.exists(): + scope_path.write_text( + _render_scope(project_name, project_description, inference), + ) + written.append(scope_path) + + agents_path = project_path / "AGENTS.md" + if force or not agents_path.exists(): + agents_template = (RULES_TEMPLATES_DIR / "agents-codex.template").read_text() + agents = _replace_many(agents_template, values) + agents_path.write_text(_ensure_trailing_newline(agents)) + written.append(agents_path) + + workplans_dir = project_path / "workplans" + workplans_dir.mkdir(exist_ok=True) + first_workplan = workplans_dir / f"{wp_prefix}-0001-statehub-bootstrap.md" + if force or not first_workplan.exists(): + first_workplan.write_text( + _render_first_workplan( + wp_prefix=wp_prefix, + project_name=project_name, + project_description=project_description, + domain=domain, + repo_slug=repo_slug, + topic_slug=topic_slug, + ) + ) + written.append(first_workplan) + + brief_path = project_path / ".custodian-brief.md" + if force or not brief_path.exists(): + brief_path.write_text( + _render_offline_brief( + project_name=project_name, + domain=domain, + repo_slug=repo_slug, + topic_id=topic_id, + wp_prefix=wp_prefix, + ) + ) + written.append(brief_path) + + return written + + +def _invoke_llm(prompt: str, args: argparse.Namespace) -> str: + from llm_connect import MockLLMAdapter, RunConfig, create_adapter + + mock_response = os.environ.get("STATEHUB_REGISTER_MOCK_LLM_RESPONSE") + if args.llm_provider == "mock" or mock_response: + adapter = MockLLMAdapter(mock_response=mock_response or "{}") + else: + adapter = create_adapter( + args.llm_provider, + model=args.llm_model, + api_key=args.llm_api_key, + ) + config = RunConfig( + model_name=args.llm_model or "statehub-register", + temperature=0.1, + max_tokens=1800, + timeout_seconds=args.llm_timeout, + model_params={"json_schema": _inference_json_schema()}, + ) + return adapter.execute_prompt(prompt, config).content + + +def _inference_json_schema() -> dict[str, Any]: + return { + "type": "object", + "properties": { + "project_description": {"type": ["string", "null"]}, + "intent_markdown": {"type": ["string", "null"]}, + "domain_slug": {"type": ["string", "null"]}, + "topic_slug": {"type": ["string", "null"]}, + "topic_title": {"type": ["string", "null"]}, + "repo_slug": {"type": ["string", "null"]}, + "workplan_prefix": {"type": ["string", "null"]}, + "in_scope": {"type": "array", "items": {"type": "string"}}, + "out_of_scope": {"type": "array", "items": {"type": "string"}}, + "current_state": {"type": ["string", "null"]}, + }, + "required": [ + "project_description", + "intent_markdown", + "domain_slug", + "topic_slug", + "topic_title", + "repo_slug", + "workplan_prefix", + "in_scope", + "out_of_scope", + "current_state", + ], + "additionalProperties": False, + } + + +def _normalise_inference(data: dict[str, Any]) -> RegisterInference: + def text(key: str) -> str | None: + value = data.get(key) + return value.strip() if isinstance(value, str) and value.strip() else None + + def items(key: str) -> list[str]: + value = data.get(key) + if not isinstance(value, list): + return [] + return [str(item).strip() for item in value if str(item).strip()] + + repo_slug = text("repo_slug") + wp_prefix = text("workplan_prefix") + return RegisterInference( + project_description=text("project_description"), + intent_markdown=text("intent_markdown"), + domain_slug=_slugify(text("domain_slug") or "") or None, + topic_slug=_slugify(text("topic_slug") or "") or None, + topic_title=text("topic_title"), + repo_slug=_slugify(repo_slug) if repo_slug else None, + workplan_prefix=_normalise_wp_prefix(wp_prefix) if wp_prefix else None, + in_scope=items("in_scope"), + out_of_scope=items("out_of_scope"), + current_state=text("current_state"), + ) + + +def _parse_json_object(text: str) -> dict[str, Any] | None: + stripped = text.strip() + fence = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", stripped, re.DOTALL) + if fence: + stripped = fence.group(1) + else: + start = stripped.find("{") + end = stripped.rfind("}") + if start >= 0 and end > start: + stripped = stripped[start:end + 1] + try: + parsed = json.loads(stripped) + except json.JSONDecodeError: + return None + return parsed if isinstance(parsed, dict) else None + + +def _resolve_intent_markdown( + snapshot: RepoSnapshot, + inference: RegisterInference, + args: argparse.Namespace, + project_description: str, +) -> str: + existing = snapshot.context_files.get("INTENT.md") + if existing: + return existing + if args.intent: + return _intent_from_user_text(args.intent, snapshot.project_name) + if inference.intent_markdown: + return inference.intent_markdown + + derived = _derive_intent_from_files(snapshot, project_description) + if derived: + return derived + + user_intent = _ask_for_intent(snapshot.project_name) + return _intent_from_user_text(user_intent, snapshot.project_name) + + +def _derive_intent_from_files(snapshot: RepoSnapshot, project_description: str) -> str | None: + source = snapshot.context_files.get("README.md") or snapshot.context_files.get("SCOPE.md") + paragraph = _first_meaningful_paragraph(source or "") + if not paragraph: + return None + return _render_intent(snapshot.project_name, project_description, paragraph) + + +def _derive_description_from_files(snapshot: RepoSnapshot) -> str | None: + for name in ("SCOPE.md", "README.md", "INTENT.md"): + paragraph = _first_meaningful_paragraph(snapshot.context_files.get(name, "")) + if paragraph: + return _sentence(paragraph) + return None + + +def _detect_domain_from_files(snapshot: RepoSnapshot) -> str | None: + for text in snapshot.context_files.values(): + match = re.search(r"^domain:\s*([A-Za-z0-9_-]+)", text, re.MULTILINE) + if match: + return match.group(1) + match = re.search(r"\*\*Domain:\*\*\s*([A-Za-z0-9_-]+)", text) + if match: + return match.group(1) + for charter in snapshot.path.rglob("project_charter_v*.md"): + text = _read_limited(charter, 2000) + match = re.search(r"^domain:\s*(\S+)", text, re.MULTILINE) + if match: + return match.group(1).strip("\"'") + return None + + +def _render_intent(project_name: str, project_description: str, source_paragraph: str) -> str: + return _ensure_trailing_newline( + f"""--- +repo: { _slugify(project_name) } +updated: "{date.today().isoformat()}" +--- + +# INTENT + +## Why it exists + +{project_description} + +{source_paragraph} + +## Governing principle + +This repository should stay focused on the purpose above. Work that changes its +authority, ownership boundaries, or operational promises should be captured in a +workplan before implementation. + +## What it enables + +- A coding agent can understand why the repository exists before changing it. +- State Hub can register and coordinate work for this repository. +- Future workplans can stay connected to the repository's intended role. +""" + ) + + +def _intent_from_user_text(intent: str, project_name: str) -> str: + description = _sentence(intent) or f"{project_name} repository." + return _render_intent(project_name, description, intent.strip()) + + +def _render_scope( + project_name: str, + project_description: str, + inference: RegisterInference, +) -> str: + in_scope = inference.in_scope or ["Maintain the repository's primary implementation.", "Keep docs, tests, and operational metadata current."] + out_scope = inference.out_of_scope or ["Own unrelated adjacent systems.", "Make irreversible operational decisions without human approval."] + current_state = inference.current_state or "Status: active; implementation and stability should be verified by the repo agent." + return _ensure_trailing_newline( + f"""# SCOPE + +> This file was generated by `statehub register`. Refine it as the repository +> boundaries become clearer. + +## One-liner + +{project_description} + +## Core Idea + +{project_name} exists to provide the capability described in INTENT.md. + +## In Scope + +{_markdown_bullets(in_scope)} + +## Out of Scope + +{_markdown_bullets(out_scope)} + +## Current State + +- {current_state} + +## Getting Oriented + +- Start with: INTENT.md +- Agent instructions: AGENTS.md +- Workplans: workplans/ +""" + ) + + +def _render_first_workplan( + *, + wp_prefix: str, + project_name: str, + project_description: str, + domain: str, + repo_slug: str, + topic_slug: str, +) -> str: + today = date.today().isoformat() + workplan_id = f"{wp_prefix}-0001" + return _ensure_trailing_newline( + f"""--- +id: {workplan_id} +type: workplan +title: "Bootstrap State Hub integration" +domain: {domain} +repo: {repo_slug} +status: ready +owner: codex +topic_slug: {topic_slug} +created: "{today}" +updated: "{today}" +--- + +# Bootstrap State Hub integration + +{project_description} + +## Review Generated Integration Files + +```task +id: {workplan_id}-T01 +status: todo +priority: high +``` + +Review `INTENT.md`, `SCOPE.md`, `AGENTS.md`, and `.custodian-brief.md`. +Replace generated placeholders with repo-specific facts where needed. + +## Verify Local Developer Workflow + +```task +id: {workplan_id}-T02 +status: todo +priority: high +``` + +Identify the repo's install, test, lint, build, and run commands. Add or refine +those commands in the agent instructions so future coding sessions can verify +changes confidently. + +## Seed First Real Workplan + +```task +id: {workplan_id}-T03 +status: todo +priority: medium +``` + +Create the first implementation workplan for the repository's most important +next change. After workplan file updates, run from `~/state-hub`: + +```bash +make fix-consistency REPO={repo_slug} +``` +""" + ) + + +def _render_offline_brief( + *, + project_name: str, + domain: str, + repo_slug: str, + topic_id: str, + wp_prefix: str, +) -> str: + today = date.today().isoformat() + return _ensure_trailing_newline( + f""" +# Custodian Brief - {repo_slug} + +**Project:** {project_name} +**Domain:** {domain} +**State Hub:** http://127.0.0.1:8000 +**Topic ID:** `{topic_id}` + +## Open Workplans + +### Bootstrap State Hub integration + +Workplan file: `workplans/{wp_prefix}-0001-statehub-bootstrap.md` + +Open tasks: +- T01 - Review generated integration files +- T02 - Verify local developer workflow +- T03 - Seed first real workplan + +## Session Start + +1. Read `INTENT.md`, `SCOPE.md`, and `AGENTS.md`. +2. Check inbox: `GET /messages/?to_agent={repo_slug}&unread_only=true`. +3. Scan `workplans/`. +4. Update task statuses in workplan files as work progresses. + +Last generated: {today} +""" + ) + + +def _find_or_create_topic( + domain: str, + project_name: str, + repo_slug: str, + inference: RegisterInference, + api_base: str, +) -> dict[str, Any]: + topics = _api_get("/topics/?status=active", api_base) + existing = next((t for t in topics if t.get("domain_slug") == domain), None) + if existing: + return existing + slug = inference.topic_slug or repo_slug + title = inference.topic_title or project_name + print(f"==> Creating active topic '{slug}' for domain '{domain}'") + return _api_post( + "/topics/", + {"slug": slug, "title": title, "domain": domain, "status": "active"}, + api_base, + ) + + +def _register_or_update_repo( + *, + domain: str, + repo_slug: str, + project_name: str, + project_description: str, + project_path: Path, + remote_url: str | None, + git_fingerprint: str | None, + topic_id: str, + api_base: str, +) -> dict[str, Any]: + payload = { + "domain_slug": domain, + "slug": repo_slug, + "name": project_name, + "local_path": str(project_path), + "remote_url": remote_url, + "git_fingerprint": git_fingerprint, + "description": project_description, + "topic_id": topic_id, + } + print(f"==> Registering repo '{repo_slug}' with State Hub") + try: + return _api_post("/repos/", payload, api_base) + except urllib.error.HTTPError as exc: + if exc.code != 409: + raise + print(" repo already registered; updating path/metadata") + patch_payload = {k: v for k, v in payload.items() if k != "domain_slug"} + return _api_patch(f"/repos/{repo_slug}", patch_payload, api_base) + + +def _register_host_path(repo_slug: str, project_path: Path, api_base: str) -> None: + host = socket.gethostname() + print(f"==> Registering host path for {host}") + _api_post(f"/repos/{repo_slug}/paths", {"host": host, "path": str(project_path)}, api_base) + + +def _record_progress( + repo_slug: str, + domain: str, + project_path: Path, + topic_id: str, + api_base: str, +) -> None: + try: + _api_post( + "/progress/", + { + "topic_id": topic_id, + "event_type": "milestone", + "summary": f"Repo registered with State Hub: {repo_slug}", + "author": "statehub-register", + "detail": { + "repo_slug": repo_slug, + "domain": domain, + "project_path": str(project_path), + }, + }, + api_base, + ) + except Exception as exc: + print(f" WARNING: Could not record progress event: {exc}") + + +def _api_get(path: str, api_base: str = API_BASE) -> Any: + url = api_base.rstrip("/") + path + try: + with urllib.request.urlopen(url, timeout=10) as response: + return json.loads(response.read()) + except urllib.error.URLError as exc: + print(f"ERROR: Cannot reach State Hub API at {api_base}: {exc}") + print(f" Start it: cd {STATE_HUB_DIR} && make api") + sys.exit(1) + + +def _api_post(path: str, body: dict[str, Any], api_base: str = API_BASE) -> Any: + return _api_request(path, body, "POST", api_base) + + +def _api_patch(path: str, body: dict[str, Any], api_base: str = API_BASE) -> Any: + return _api_request(path, body, "PATCH", api_base) + + +def _api_request(path: str, body: dict[str, Any], method: str, api_base: str) -> Any: + url = api_base.rstrip("/") + path + data = json.dumps({k: v for k, v in body.items() if v is not None}).encode() + request = urllib.request.Request( + url, + data=data, + headers={"Content-Type": "application/json"}, + method=method, + ) + with urllib.request.urlopen(request, timeout=10) as response: + return json.loads(response.read()) + + +def _check_api(api_base: str) -> None: + print(f"==> Checking API at {api_base}") + _api_get("/state/health", api_base) + + +def _ask_for_domain(candidate: str | None, domain_slugs: list[str]) -> str: + if not sys.stdin.isatty(): + valid = ", ".join(domain_slugs) + raise SystemExit(f"ERROR: Could not infer a valid domain. Pass --domain. Valid: {valid}") + if candidate: + print(f" inferred domain '{candidate}' is not active in State Hub") + print("Available domains:") + for slug in domain_slugs: + print(f" - {slug}") + while True: + answer = input("Domain slug: ").strip() + if answer in domain_slugs: + return answer + print("Please enter one of the listed domain slugs.") + + +def _ask_for_intent(project_name: str) -> str: + if not sys.stdin.isatty(): + raise SystemExit("ERROR: Could not derive repo intent. Pass --intent or add INTENT.md.") + print(f"I could not derive the intent for {project_name} from repo files.") + answer = input("What is this repository for? ").strip() + if not answer: + raise SystemExit("ERROR: Intent is required to create INTENT.md.") + return answer + + +def _read_limited(path: Path, limit: int) -> str: + try: + return path.read_text(errors="replace")[:limit] + except OSError: + return "" + + +def _git_output(path: Path, args: list[str]) -> str | None: + try: + result = subprocess.run( + ["git", "-C", str(path), *args], + check=False, + capture_output=True, + text=True, + timeout=5, + ) + except (OSError, subprocess.TimeoutExpired): + return None + output = result.stdout.strip() + return output if result.returncode == 0 and output else None + + +def _first_meaningful_paragraph(text: str) -> str | None: + paragraphs = re.split(r"\n\s*\n", text) + for paragraph in paragraphs: + cleaned = "\n".join( + line.strip() + for line in paragraph.splitlines() + if line.strip() + and not line.lstrip().startswith("#") + and not line.lstrip().startswith("