generated from coulomb/repo-seed
866 lines
26 KiB
Python
866 lines
26 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import textwrap
|
|
import urllib.error
|
|
import urllib.request
|
|
from dataclasses import dataclass, field
|
|
from datetime import date
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
STATE_HUB_DIR = Path(__file__).resolve().parent
|
|
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000")
|
|
RULES_TEMPLATES_DIR = STATE_HUB_DIR / "scripts" / "project_rules"
|
|
|
|
KEY_CONTEXT_FILES = [
|
|
"INTENT.md",
|
|
"README.md",
|
|
"SCOPE.md",
|
|
"AGENTS.md",
|
|
"CLAUDE.md",
|
|
"pyproject.toml",
|
|
"package.json",
|
|
"Cargo.toml",
|
|
"go.mod",
|
|
"Makefile",
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class RepoSnapshot:
|
|
path: Path
|
|
project_name: str
|
|
top_level_files: list[str]
|
|
context_files: dict[str, str]
|
|
remote_url: str | None = None
|
|
git_fingerprint: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class RegisterInference:
|
|
project_description: str | None = None
|
|
intent_markdown: str | None = None
|
|
domain_slug: str | None = None
|
|
topic_slug: str | None = None
|
|
topic_title: str | None = None
|
|
repo_slug: str | None = None
|
|
workplan_prefix: str | None = None
|
|
in_scope: list[str] = field(default_factory=list)
|
|
out_of_scope: list[str] = field(default_factory=list)
|
|
current_state: str | None = None
|
|
|
|
|
|
def run_register(args: argparse.Namespace) -> None:
|
|
project_path = Path(args.path).expanduser().resolve()
|
|
if not project_path.is_dir():
|
|
print(f"ERROR: {project_path} is not a directory.")
|
|
sys.exit(1)
|
|
|
|
snapshot = collect_repo_snapshot(project_path)
|
|
print(f"==> Inspecting repo at {snapshot.path}")
|
|
|
|
_check_api(args.api_base)
|
|
domains = _api_get("/domains/?status=active", args.api_base)
|
|
domain_slugs = [d["slug"] for d in domains]
|
|
|
|
inference = infer_registration(snapshot, args, domain_slugs)
|
|
repo_slug = args.repo_slug or inference.repo_slug or _slugify(snapshot.project_name)
|
|
wp_prefix = args.wp_prefix or inference.workplan_prefix or _default_wp_prefix(repo_slug)
|
|
domain = args.domain or inference.domain_slug or _detect_domain_from_files(snapshot)
|
|
project_description = (
|
|
args.description
|
|
or inference.project_description
|
|
or _derive_description_from_files(snapshot)
|
|
or f"{snapshot.project_name} repository."
|
|
)
|
|
|
|
if domain not in domain_slugs:
|
|
domain = _ask_for_domain(domain, domain_slugs)
|
|
|
|
intent_markdown = _resolve_intent_markdown(snapshot, inference, args, project_description)
|
|
topic = _find_or_create_topic(domain, snapshot.project_name, repo_slug, inference, args.api_base)
|
|
topic_id = topic["id"]
|
|
topic_slug = topic.get("slug") or domain
|
|
|
|
print(f"==> Writing State Hub agent files for '{repo_slug}'")
|
|
written = write_registration_files(
|
|
project_path=project_path,
|
|
project_name=snapshot.project_name,
|
|
project_description=project_description,
|
|
domain=domain,
|
|
topic_id=topic_id,
|
|
topic_slug=topic_slug,
|
|
repo_slug=repo_slug,
|
|
wp_prefix=wp_prefix,
|
|
intent_markdown=intent_markdown,
|
|
inference=inference,
|
|
force=args.force,
|
|
)
|
|
for path in written:
|
|
print(f" wrote {path}")
|
|
if not written:
|
|
print(" files already present; nothing overwritten")
|
|
|
|
repo = _register_or_update_repo(
|
|
domain=domain,
|
|
repo_slug=repo_slug,
|
|
project_name=snapshot.project_name,
|
|
project_description=project_description,
|
|
project_path=project_path,
|
|
remote_url=snapshot.remote_url,
|
|
git_fingerprint=snapshot.git_fingerprint,
|
|
topic_id=topic_id,
|
|
api_base=args.api_base,
|
|
)
|
|
_register_host_path(repo_slug, project_path, args.api_base)
|
|
_record_progress(repo_slug, domain, project_path, topic_id, args.api_base)
|
|
|
|
print()
|
|
print("Registration complete!")
|
|
print(f" Project: {snapshot.project_name}")
|
|
print(f" Domain: {domain}")
|
|
print(f" Repo slug: {repo_slug}")
|
|
print(f" Topic ID: {topic_id}")
|
|
print(f" Repo ID: {repo.get('id', '(existing)') if isinstance(repo, dict) else '(unknown)'}")
|
|
print()
|
|
print("Next:")
|
|
print(f" cd {STATE_HUB_DIR}")
|
|
print(f" make fix-consistency REPO={repo_slug}")
|
|
|
|
|
|
def collect_repo_snapshot(project_path: Path) -> RepoSnapshot:
|
|
top_level_files = sorted(
|
|
p.name for p in project_path.iterdir()
|
|
if p.name not in {".git", ".venv", "node_modules"}
|
|
)[:160]
|
|
context_files: dict[str, str] = {}
|
|
for name in KEY_CONTEXT_FILES:
|
|
path = project_path / name
|
|
if path.is_file():
|
|
context_files[name] = _read_limited(path, 12000)
|
|
|
|
return RepoSnapshot(
|
|
path=project_path,
|
|
project_name=project_path.name,
|
|
top_level_files=top_level_files,
|
|
context_files=context_files,
|
|
remote_url=_git_output(project_path, ["config", "--get", "remote.origin.url"]),
|
|
git_fingerprint=_git_output(project_path, ["rev-list", "--max-parents=0", "HEAD"]),
|
|
)
|
|
|
|
|
|
def infer_registration(
|
|
snapshot: RepoSnapshot,
|
|
args: argparse.Namespace,
|
|
domain_slugs: list[str],
|
|
) -> RegisterInference:
|
|
if args.no_llm:
|
|
return RegisterInference()
|
|
|
|
prompt = build_inference_prompt(snapshot, domain_slugs)
|
|
try:
|
|
response_text = _invoke_llm(prompt, args)
|
|
except Exception as exc:
|
|
print(f" LLM inference unavailable ({exc}); falling back to repo files/user prompts.")
|
|
return RegisterInference()
|
|
|
|
parsed = _parse_json_object(response_text)
|
|
if not parsed:
|
|
print(" LLM inference did not return JSON; falling back to repo files/user prompts.")
|
|
return RegisterInference()
|
|
return _normalise_inference(parsed)
|
|
|
|
|
|
def build_inference_prompt(snapshot: RepoSnapshot, domain_slugs: list[str]) -> str:
|
|
context = {
|
|
"project_name": snapshot.project_name,
|
|
"top_level_files": snapshot.top_level_files,
|
|
"remote_url": snapshot.remote_url,
|
|
"git_fingerprint": snapshot.git_fingerprint,
|
|
"context_files": {
|
|
name: text[:6000]
|
|
for name, text in snapshot.context_files.items()
|
|
},
|
|
"available_domain_slugs": domain_slugs,
|
|
}
|
|
return textwrap.dedent(
|
|
f"""
|
|
You are helping register a local repository with Custodian State Hub.
|
|
|
|
Infer the repository identity from the supplied files. Return only
|
|
strict JSON with these keys:
|
|
- project_description: one precise sentence.
|
|
- intent_markdown: complete Markdown for INTENT.md, or null if an
|
|
existing INTENT.md is already adequate.
|
|
- domain_slug: one available domain slug, or null if not inferable.
|
|
- topic_slug: short lowercase topic slug.
|
|
- topic_title: human-readable topic title.
|
|
- repo_slug: lowercase repo slug.
|
|
- workplan_prefix: uppercase prefix ending in -WP.
|
|
- in_scope: array of short responsibility bullets.
|
|
- out_of_scope: array of short non-responsibility bullets.
|
|
- current_state: short maturity/status sentence.
|
|
|
|
Use an available domain slug. Do not invent secrets or claim certainty
|
|
where the files do not support it.
|
|
|
|
Repository context:
|
|
{json.dumps(context, indent=2)}
|
|
"""
|
|
).strip()
|
|
|
|
|
|
def write_registration_files(
|
|
*,
|
|
project_path: Path,
|
|
project_name: str,
|
|
project_description: str,
|
|
domain: str,
|
|
topic_id: str,
|
|
topic_slug: str,
|
|
repo_slug: str,
|
|
wp_prefix: str,
|
|
intent_markdown: str,
|
|
inference: RegisterInference,
|
|
force: bool = False,
|
|
) -> list[Path]:
|
|
written: list[Path] = []
|
|
|
|
values = {
|
|
"{PROJECT_NAME}": project_name,
|
|
"{PROJECT_DESCRIPTION}": project_description,
|
|
"{DOMAIN}": domain,
|
|
"{TOPIC_ID}": topic_id,
|
|
"{REPO_SLUG}": repo_slug,
|
|
"{WP_PREFIX}": wp_prefix,
|
|
}
|
|
|
|
intent_path = project_path / "INTENT.md"
|
|
if force or not intent_path.exists():
|
|
intent_path.write_text(_ensure_trailing_newline(intent_markdown))
|
|
written.append(intent_path)
|
|
|
|
scope_path = project_path / "SCOPE.md"
|
|
if force or not scope_path.exists():
|
|
scope_path.write_text(
|
|
_render_scope(project_name, project_description, inference),
|
|
)
|
|
written.append(scope_path)
|
|
|
|
agents_path = project_path / "AGENTS.md"
|
|
if force or not agents_path.exists():
|
|
agents_template = (RULES_TEMPLATES_DIR / "agents-codex.template").read_text()
|
|
agents = _replace_many(agents_template, values)
|
|
agents_path.write_text(_ensure_trailing_newline(agents))
|
|
written.append(agents_path)
|
|
|
|
workplans_dir = project_path / "workplans"
|
|
workplans_dir.mkdir(exist_ok=True)
|
|
first_workplan = workplans_dir / f"{wp_prefix}-0001-statehub-bootstrap.md"
|
|
if force or not first_workplan.exists():
|
|
first_workplan.write_text(
|
|
_render_first_workplan(
|
|
wp_prefix=wp_prefix,
|
|
project_name=project_name,
|
|
project_description=project_description,
|
|
domain=domain,
|
|
repo_slug=repo_slug,
|
|
topic_slug=topic_slug,
|
|
)
|
|
)
|
|
written.append(first_workplan)
|
|
|
|
brief_path = project_path / ".custodian-brief.md"
|
|
if force or not brief_path.exists():
|
|
brief_path.write_text(
|
|
_render_offline_brief(
|
|
project_name=project_name,
|
|
domain=domain,
|
|
repo_slug=repo_slug,
|
|
topic_id=topic_id,
|
|
wp_prefix=wp_prefix,
|
|
)
|
|
)
|
|
written.append(brief_path)
|
|
|
|
return written
|
|
|
|
|
|
def _invoke_llm(prompt: str, args: argparse.Namespace) -> str:
|
|
from llm_connect import MockLLMAdapter, RunConfig, create_adapter
|
|
|
|
mock_response = os.environ.get("STATEHUB_REGISTER_MOCK_LLM_RESPONSE")
|
|
if args.llm_provider == "mock" or mock_response:
|
|
adapter = MockLLMAdapter(mock_response=mock_response or "{}")
|
|
else:
|
|
adapter = create_adapter(
|
|
args.llm_provider,
|
|
model=args.llm_model,
|
|
api_key=args.llm_api_key,
|
|
)
|
|
config = RunConfig(
|
|
model_name=args.llm_model or "statehub-register",
|
|
temperature=0.1,
|
|
max_tokens=1800,
|
|
timeout_seconds=args.llm_timeout,
|
|
model_params={"json_schema": _inference_json_schema()},
|
|
)
|
|
return adapter.execute_prompt(prompt, config).content
|
|
|
|
|
|
def _inference_json_schema() -> dict[str, Any]:
|
|
return {
|
|
"type": "object",
|
|
"properties": {
|
|
"project_description": {"type": ["string", "null"]},
|
|
"intent_markdown": {"type": ["string", "null"]},
|
|
"domain_slug": {"type": ["string", "null"]},
|
|
"topic_slug": {"type": ["string", "null"]},
|
|
"topic_title": {"type": ["string", "null"]},
|
|
"repo_slug": {"type": ["string", "null"]},
|
|
"workplan_prefix": {"type": ["string", "null"]},
|
|
"in_scope": {"type": "array", "items": {"type": "string"}},
|
|
"out_of_scope": {"type": "array", "items": {"type": "string"}},
|
|
"current_state": {"type": ["string", "null"]},
|
|
},
|
|
"required": [
|
|
"project_description",
|
|
"intent_markdown",
|
|
"domain_slug",
|
|
"topic_slug",
|
|
"topic_title",
|
|
"repo_slug",
|
|
"workplan_prefix",
|
|
"in_scope",
|
|
"out_of_scope",
|
|
"current_state",
|
|
],
|
|
"additionalProperties": False,
|
|
}
|
|
|
|
|
|
def _normalise_inference(data: dict[str, Any]) -> RegisterInference:
|
|
def text(key: str) -> str | None:
|
|
value = data.get(key)
|
|
return value.strip() if isinstance(value, str) and value.strip() else None
|
|
|
|
def items(key: str) -> list[str]:
|
|
value = data.get(key)
|
|
if not isinstance(value, list):
|
|
return []
|
|
return [str(item).strip() for item in value if str(item).strip()]
|
|
|
|
repo_slug = text("repo_slug")
|
|
wp_prefix = text("workplan_prefix")
|
|
return RegisterInference(
|
|
project_description=text("project_description"),
|
|
intent_markdown=text("intent_markdown"),
|
|
domain_slug=_slugify(text("domain_slug") or "") or None,
|
|
topic_slug=_slugify(text("topic_slug") or "") or None,
|
|
topic_title=text("topic_title"),
|
|
repo_slug=_slugify(repo_slug) if repo_slug else None,
|
|
workplan_prefix=_normalise_wp_prefix(wp_prefix) if wp_prefix else None,
|
|
in_scope=items("in_scope"),
|
|
out_of_scope=items("out_of_scope"),
|
|
current_state=text("current_state"),
|
|
)
|
|
|
|
|
|
def _parse_json_object(text: str) -> dict[str, Any] | None:
|
|
stripped = text.strip()
|
|
fence = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", stripped, re.DOTALL)
|
|
if fence:
|
|
stripped = fence.group(1)
|
|
else:
|
|
start = stripped.find("{")
|
|
end = stripped.rfind("}")
|
|
if start >= 0 and end > start:
|
|
stripped = stripped[start:end + 1]
|
|
try:
|
|
parsed = json.loads(stripped)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
return parsed if isinstance(parsed, dict) else None
|
|
|
|
|
|
def _resolve_intent_markdown(
|
|
snapshot: RepoSnapshot,
|
|
inference: RegisterInference,
|
|
args: argparse.Namespace,
|
|
project_description: str,
|
|
) -> str:
|
|
existing = snapshot.context_files.get("INTENT.md")
|
|
if existing:
|
|
return existing
|
|
if args.intent:
|
|
return _intent_from_user_text(args.intent, snapshot.project_name)
|
|
if inference.intent_markdown:
|
|
return inference.intent_markdown
|
|
|
|
derived = _derive_intent_from_files(snapshot, project_description)
|
|
if derived:
|
|
return derived
|
|
|
|
user_intent = _ask_for_intent(snapshot.project_name)
|
|
return _intent_from_user_text(user_intent, snapshot.project_name)
|
|
|
|
|
|
def _derive_intent_from_files(snapshot: RepoSnapshot, project_description: str) -> str | None:
|
|
source = snapshot.context_files.get("README.md") or snapshot.context_files.get("SCOPE.md")
|
|
paragraph = _first_meaningful_paragraph(source or "")
|
|
if not paragraph:
|
|
return None
|
|
return _render_intent(snapshot.project_name, project_description, paragraph)
|
|
|
|
|
|
def _derive_description_from_files(snapshot: RepoSnapshot) -> str | None:
|
|
for name in ("SCOPE.md", "README.md", "INTENT.md"):
|
|
paragraph = _first_meaningful_paragraph(snapshot.context_files.get(name, ""))
|
|
if paragraph:
|
|
return _sentence(paragraph)
|
|
return None
|
|
|
|
|
|
def _detect_domain_from_files(snapshot: RepoSnapshot) -> str | None:
|
|
for text in snapshot.context_files.values():
|
|
match = re.search(r"^domain:\s*([A-Za-z0-9_-]+)", text, re.MULTILINE)
|
|
if match:
|
|
return match.group(1)
|
|
match = re.search(r"\*\*Domain:\*\*\s*([A-Za-z0-9_-]+)", text)
|
|
if match:
|
|
return match.group(1)
|
|
for charter in snapshot.path.rglob("project_charter_v*.md"):
|
|
text = _read_limited(charter, 2000)
|
|
match = re.search(r"^domain:\s*(\S+)", text, re.MULTILINE)
|
|
if match:
|
|
return match.group(1).strip("\"'")
|
|
return None
|
|
|
|
|
|
def _render_intent(project_name: str, project_description: str, source_paragraph: str) -> str:
|
|
return _ensure_trailing_newline(
|
|
f"""---
|
|
repo: { _slugify(project_name) }
|
|
updated: "{date.today().isoformat()}"
|
|
---
|
|
|
|
# INTENT
|
|
|
|
## Why it exists
|
|
|
|
{project_description}
|
|
|
|
{source_paragraph}
|
|
|
|
## Governing principle
|
|
|
|
This repository should stay focused on the purpose above. Work that changes its
|
|
authority, ownership boundaries, or operational promises should be captured in a
|
|
workplan before implementation.
|
|
|
|
## What it enables
|
|
|
|
- A coding agent can understand why the repository exists before changing it.
|
|
- State Hub can register and coordinate work for this repository.
|
|
- Future workplans can stay connected to the repository's intended role.
|
|
"""
|
|
)
|
|
|
|
|
|
def _intent_from_user_text(intent: str, project_name: str) -> str:
|
|
description = _sentence(intent) or f"{project_name} repository."
|
|
return _render_intent(project_name, description, intent.strip())
|
|
|
|
|
|
def _render_scope(
|
|
project_name: str,
|
|
project_description: str,
|
|
inference: RegisterInference,
|
|
) -> str:
|
|
in_scope = inference.in_scope or ["Maintain the repository's primary implementation.", "Keep docs, tests, and operational metadata current."]
|
|
out_scope = inference.out_of_scope or ["Own unrelated adjacent systems.", "Make irreversible operational decisions without human approval."]
|
|
current_state = inference.current_state or "Status: active; implementation and stability should be verified by the repo agent."
|
|
return _ensure_trailing_newline(
|
|
f"""# SCOPE
|
|
|
|
> This file was generated by `statehub register`. Refine it as the repository
|
|
> boundaries become clearer.
|
|
|
|
## One-liner
|
|
|
|
{project_description}
|
|
|
|
## Core Idea
|
|
|
|
{project_name} exists to provide the capability described in INTENT.md.
|
|
|
|
## In Scope
|
|
|
|
{_markdown_bullets(in_scope)}
|
|
|
|
## Out of Scope
|
|
|
|
{_markdown_bullets(out_scope)}
|
|
|
|
## Current State
|
|
|
|
- {current_state}
|
|
|
|
## Getting Oriented
|
|
|
|
- Start with: INTENT.md
|
|
- Agent instructions: AGENTS.md
|
|
- Workplans: workplans/
|
|
"""
|
|
)
|
|
|
|
|
|
def _render_first_workplan(
|
|
*,
|
|
wp_prefix: str,
|
|
project_name: str,
|
|
project_description: str,
|
|
domain: str,
|
|
repo_slug: str,
|
|
topic_slug: str,
|
|
) -> str:
|
|
today = date.today().isoformat()
|
|
workplan_id = f"{wp_prefix}-0001"
|
|
return _ensure_trailing_newline(
|
|
f"""---
|
|
id: {workplan_id}
|
|
type: workplan
|
|
title: "Bootstrap State Hub integration"
|
|
domain: {domain}
|
|
repo: {repo_slug}
|
|
status: ready
|
|
owner: codex
|
|
topic_slug: {topic_slug}
|
|
created: "{today}"
|
|
updated: "{today}"
|
|
---
|
|
|
|
# Bootstrap State Hub integration
|
|
|
|
{project_description}
|
|
|
|
## Review Generated Integration Files
|
|
|
|
```task
|
|
id: {workplan_id}-T01
|
|
status: todo
|
|
priority: high
|
|
```
|
|
|
|
Review `INTENT.md`, `SCOPE.md`, `AGENTS.md`, and `.custodian-brief.md`.
|
|
Replace generated placeholders with repo-specific facts where needed.
|
|
|
|
## Verify Local Developer Workflow
|
|
|
|
```task
|
|
id: {workplan_id}-T02
|
|
status: todo
|
|
priority: high
|
|
```
|
|
|
|
Identify the repo's install, test, lint, build, and run commands. Add or refine
|
|
those commands in the agent instructions so future coding sessions can verify
|
|
changes confidently.
|
|
|
|
## Seed First Real Workplan
|
|
|
|
```task
|
|
id: {workplan_id}-T03
|
|
status: todo
|
|
priority: medium
|
|
```
|
|
|
|
Create the first implementation workplan for the repository's most important
|
|
next change. After workplan file updates, run from `~/state-hub`:
|
|
|
|
```bash
|
|
make fix-consistency REPO={repo_slug}
|
|
```
|
|
"""
|
|
)
|
|
|
|
|
|
def _render_offline_brief(
|
|
*,
|
|
project_name: str,
|
|
domain: str,
|
|
repo_slug: str,
|
|
topic_id: str,
|
|
wp_prefix: str,
|
|
) -> str:
|
|
today = date.today().isoformat()
|
|
return _ensure_trailing_newline(
|
|
f"""<!-- custodian-brief: generated by statehub register; fix-consistency may replace this file -->
|
|
# Custodian Brief - {repo_slug}
|
|
|
|
**Project:** {project_name}
|
|
**Domain:** {domain}
|
|
**State Hub:** http://127.0.0.1:8000
|
|
**Topic ID:** `{topic_id}`
|
|
|
|
## Open Workplans
|
|
|
|
### Bootstrap State Hub integration
|
|
|
|
Workplan file: `workplans/{wp_prefix}-0001-statehub-bootstrap.md`
|
|
|
|
Open tasks:
|
|
- T01 - Review generated integration files
|
|
- T02 - Verify local developer workflow
|
|
- T03 - Seed first real workplan
|
|
|
|
## Session Start
|
|
|
|
1. Read `INTENT.md`, `SCOPE.md`, and `AGENTS.md`.
|
|
2. Check inbox: `GET /messages/?to_agent={repo_slug}&unread_only=true`.
|
|
3. Scan `workplans/`.
|
|
4. Update task statuses in workplan files as work progresses.
|
|
|
|
Last generated: {today}
|
|
"""
|
|
)
|
|
|
|
|
|
def _find_or_create_topic(
|
|
domain: str,
|
|
project_name: str,
|
|
repo_slug: str,
|
|
inference: RegisterInference,
|
|
api_base: str,
|
|
) -> dict[str, Any]:
|
|
topics = _api_get("/topics/?status=active", api_base)
|
|
existing = next((t for t in topics if t.get("domain_slug") == domain), None)
|
|
if existing:
|
|
return existing
|
|
slug = inference.topic_slug or repo_slug
|
|
title = inference.topic_title or project_name
|
|
print(f"==> Creating active topic '{slug}' for domain '{domain}'")
|
|
return _api_post(
|
|
"/topics/",
|
|
{"slug": slug, "title": title, "domain": domain, "status": "active"},
|
|
api_base,
|
|
)
|
|
|
|
|
|
def _register_or_update_repo(
|
|
*,
|
|
domain: str,
|
|
repo_slug: str,
|
|
project_name: str,
|
|
project_description: str,
|
|
project_path: Path,
|
|
remote_url: str | None,
|
|
git_fingerprint: str | None,
|
|
topic_id: str,
|
|
api_base: str,
|
|
) -> dict[str, Any]:
|
|
payload = {
|
|
"domain_slug": domain,
|
|
"slug": repo_slug,
|
|
"name": project_name,
|
|
"local_path": str(project_path),
|
|
"remote_url": remote_url,
|
|
"git_fingerprint": git_fingerprint,
|
|
"description": project_description,
|
|
"topic_id": topic_id,
|
|
}
|
|
print(f"==> Registering repo '{repo_slug}' with State Hub")
|
|
try:
|
|
return _api_post("/repos/", payload, api_base)
|
|
except urllib.error.HTTPError as exc:
|
|
if exc.code != 409:
|
|
raise
|
|
print(" repo already registered; updating path/metadata")
|
|
patch_payload = {k: v for k, v in payload.items() if k != "domain_slug"}
|
|
return _api_patch(f"/repos/{repo_slug}", patch_payload, api_base)
|
|
|
|
|
|
def _register_host_path(repo_slug: str, project_path: Path, api_base: str) -> None:
|
|
host = socket.gethostname()
|
|
print(f"==> Registering host path for {host}")
|
|
_api_post(f"/repos/{repo_slug}/paths", {"host": host, "path": str(project_path)}, api_base)
|
|
|
|
|
|
def _record_progress(
|
|
repo_slug: str,
|
|
domain: str,
|
|
project_path: Path,
|
|
topic_id: str,
|
|
api_base: str,
|
|
) -> None:
|
|
try:
|
|
_api_post(
|
|
"/progress/",
|
|
{
|
|
"topic_id": topic_id,
|
|
"event_type": "milestone",
|
|
"summary": f"Repo registered with State Hub: {repo_slug}",
|
|
"author": "statehub-register",
|
|
"detail": {
|
|
"repo_slug": repo_slug,
|
|
"domain": domain,
|
|
"project_path": str(project_path),
|
|
},
|
|
},
|
|
api_base,
|
|
)
|
|
except Exception as exc:
|
|
print(f" WARNING: Could not record progress event: {exc}")
|
|
|
|
|
|
def _api_get(path: str, api_base: str = API_BASE) -> Any:
|
|
url = api_base.rstrip("/") + path
|
|
try:
|
|
with urllib.request.urlopen(url, timeout=10) as response:
|
|
return json.loads(response.read())
|
|
except urllib.error.URLError as exc:
|
|
print(f"ERROR: Cannot reach State Hub API at {api_base}: {exc}")
|
|
print(f" Start it: cd {STATE_HUB_DIR} && make api")
|
|
sys.exit(1)
|
|
|
|
|
|
def _api_post(path: str, body: dict[str, Any], api_base: str = API_BASE) -> Any:
|
|
return _api_request(path, body, "POST", api_base)
|
|
|
|
|
|
def _api_patch(path: str, body: dict[str, Any], api_base: str = API_BASE) -> Any:
|
|
return _api_request(path, body, "PATCH", api_base)
|
|
|
|
|
|
def _api_request(path: str, body: dict[str, Any], method: str, api_base: str) -> Any:
|
|
url = api_base.rstrip("/") + path
|
|
data = json.dumps({k: v for k, v in body.items() if v is not None}).encode()
|
|
request = urllib.request.Request(
|
|
url,
|
|
data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
method=method,
|
|
)
|
|
with urllib.request.urlopen(request, timeout=10) as response:
|
|
return json.loads(response.read())
|
|
|
|
|
|
def _check_api(api_base: str) -> None:
|
|
print(f"==> Checking API at {api_base}")
|
|
_api_get("/state/health", api_base)
|
|
|
|
|
|
def _ask_for_domain(candidate: str | None, domain_slugs: list[str]) -> str:
|
|
if not sys.stdin.isatty():
|
|
valid = ", ".join(domain_slugs)
|
|
raise SystemExit(f"ERROR: Could not infer a valid domain. Pass --domain. Valid: {valid}")
|
|
if candidate:
|
|
print(f" inferred domain '{candidate}' is not active in State Hub")
|
|
print("Available domains:")
|
|
for slug in domain_slugs:
|
|
print(f" - {slug}")
|
|
while True:
|
|
answer = input("Domain slug: ").strip()
|
|
if answer in domain_slugs:
|
|
return answer
|
|
print("Please enter one of the listed domain slugs.")
|
|
|
|
|
|
def _ask_for_intent(project_name: str) -> str:
|
|
if not sys.stdin.isatty():
|
|
raise SystemExit("ERROR: Could not derive repo intent. Pass --intent or add INTENT.md.")
|
|
print(f"I could not derive the intent for {project_name} from repo files.")
|
|
answer = input("What is this repository for? ").strip()
|
|
if not answer:
|
|
raise SystemExit("ERROR: Intent is required to create INTENT.md.")
|
|
return answer
|
|
|
|
|
|
def _read_limited(path: Path, limit: int) -> str:
|
|
try:
|
|
return path.read_text(errors="replace")[:limit]
|
|
except OSError:
|
|
return ""
|
|
|
|
|
|
def _git_output(path: Path, args: list[str]) -> str | None:
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "-C", str(path), *args],
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
return None
|
|
output = result.stdout.strip()
|
|
return output if result.returncode == 0 and output else None
|
|
|
|
|
|
def _first_meaningful_paragraph(text: str) -> str | None:
|
|
paragraphs = re.split(r"\n\s*\n", text)
|
|
for paragraph in paragraphs:
|
|
cleaned = "\n".join(
|
|
line.strip()
|
|
for line in paragraph.splitlines()
|
|
if line.strip()
|
|
and not line.lstrip().startswith("#")
|
|
and not line.lstrip().startswith("<!--")
|
|
and not line.lstrip().startswith("---")
|
|
).strip()
|
|
if cleaned and not cleaned.startswith(">"):
|
|
return re.sub(r"\s+", " ", cleaned)
|
|
return None
|
|
|
|
|
|
def _sentence(text: str) -> str:
|
|
cleaned = re.sub(r"\s+", " ", text.strip())
|
|
if not cleaned:
|
|
return ""
|
|
match = re.search(r"(.+?[.!?])(?:\s|$)", cleaned)
|
|
sentence = match.group(1) if match else cleaned
|
|
if sentence[-1] not in ".!?":
|
|
sentence += "."
|
|
return sentence
|
|
|
|
|
|
def _replace_many(text: str, replacements: dict[str, str]) -> str:
|
|
for needle, value in replacements.items():
|
|
text = text.replace(needle, value)
|
|
return text
|
|
|
|
|
|
def _slugify(value: str) -> str:
|
|
return re.sub(r"-+", "-", re.sub(r"[^a-z0-9]+", "-", value.lower())).strip("-")
|
|
|
|
|
|
def _normalise_wp_prefix(value: str) -> str:
|
|
cleaned = re.sub(r"[^A-Z0-9-]+", "-", value.upper()).strip("-")
|
|
if cleaned.endswith("-WP"):
|
|
return cleaned
|
|
if cleaned.endswith("WP"):
|
|
return cleaned[:-2].rstrip("-") + "-WP"
|
|
return cleaned + "-WP"
|
|
|
|
|
|
def _default_wp_prefix(repo_slug: str) -> str:
|
|
first = (repo_slug.split("-")[0] or "REPO").upper()
|
|
return f"{first}-WP"
|
|
|
|
|
|
def _markdown_bullets(items: list[str]) -> str:
|
|
return "\n".join(f"- {item}" for item in items)
|
|
|
|
|
|
def _ensure_trailing_newline(text: str) -> str:
|
|
return text if text.endswith("\n") else text + "\n"
|