"""Dependency-light local HTTP API for guide-board.""" from __future__ import annotations import json import threading import uuid from dataclasses import dataclass from datetime import datetime, timezone from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any from urllib.parse import urlparse from guide_board.discovery import discover_extensions from guide_board.errors import GuideBoardError from guide_board.execution import run_assessment from guide_board.io import load_json from guide_board.planning import ( build_run_plan, validate_assessment_profile, validate_target_profile, ) @dataclass(frozen=True) class ServiceHandle: """Background service handle for tests and local embedding.""" server: "GuideBoardHTTPServer" thread: threading.Thread @property def host(self) -> str: return str(self.server.server_address[0]) @property def port(self) -> int: return int(self.server.server_address[1]) @property def url(self) -> str: return f"http://{self.host}:{self.port}" def stop(self) -> None: self.server.shutdown() self.server.server_close() self.thread.join(timeout=5) @dataclass(frozen=True) class ServiceContext: root: Path extension_dirs: list[Path] jobs: "JobStore" class JobStore: def __init__(self) -> None: self._lock = threading.Lock() self._jobs: dict[str, dict[str, Any]] = {} def create(self, request: dict[str, Any]) -> dict[str, Any]: now = _now() job = { "job_id": uuid.uuid4().hex, "status": "queued", "created_at": now, "updated_at": now, "request": request, "result": None, "error": None, } with self._lock: self._jobs[job["job_id"]] = job return dict(job) def update(self, job_id: str, **updates: Any) -> dict[str, Any]: with self._lock: if job_id not in self._jobs: raise HttpProblem(404, f"run job not found: {job_id}") job = dict(self._jobs[job_id]) job.update(updates) job["updated_at"] = _now() self._jobs[job_id] = job return dict(job) def get(self, job_id: str) -> dict[str, Any]: with self._lock: if job_id not in self._jobs: raise HttpProblem(404, f"run job not found: {job_id}") return dict(self._jobs[job_id]) def list(self) -> list[dict[str, Any]]: with self._lock: return [dict(job) for job in self._jobs.values()] def summary(self) -> dict[str, int]: counts: dict[str, int] = {} with self._lock: for job in self._jobs.values(): counts[job["status"]] = counts.get(job["status"], 0) + 1 return counts class HttpProblem(Exception): def __init__(self, status_code: int, message: str) -> None: super().__init__(message) self.status_code = status_code self.message = message class GuideBoardHTTPServer(ThreadingHTTPServer): context: ServiceContext daemon_threads = True class GuideBoardRequestHandler(BaseHTTPRequestHandler): server: GuideBoardHTTPServer server_version = "GuideBoardLocalAPI/0.1" def do_GET(self) -> None: self._handle("GET") def do_POST(self) -> None: self._handle("POST") def log_message(self, format: str, *args: Any) -> None: return def _handle(self, method: str) -> None: parsed = urlparse(self.path) try: response, status_code = self._route(method, parsed.path) except HttpProblem as exc: response = _error_response(exc.message, exc.__class__.__name__, exc.status_code) status_code = exc.status_code except GuideBoardError as exc: response = _error_response(str(exc), exc.__class__.__name__, 400) status_code = 400 except (OSError, ValueError, json.JSONDecodeError) as exc: response = _error_response(str(exc), exc.__class__.__name__, 400) status_code = 400 except Exception as exc: response = _error_response(str(exc), exc.__class__.__name__, 500) status_code = 500 self._send_json(status_code, response) def _route(self, method: str, path: str) -> tuple[dict[str, Any], int]: if method == "GET" and path == "/health": return self._health(), 200 if method == "GET" and path == "/extensions": return self._extensions(), 200 if method == "POST" and path == "/profiles/validate": return self._validate_profile(), 200 if method == "POST" and path == "/assessments/plan": return self._plan_assessment(), 200 if method == "GET" and path == "/runs": return {"runs": self.server.context.jobs.list()}, 200 if method == "POST" and path == "/runs": return self._start_run(), 202 run_match = _match_run_path(path) if method == "GET" and run_match is not None: job_id, suffix = run_match if suffix is None: return self.server.context.jobs.get(job_id), 200 if suffix == "reports": return self._run_reports(job_id), 200 raise HttpProblem(404, f"endpoint not found: {method} {path}") def _health(self) -> dict[str, Any]: return { "status": "ok", "service": "guide-board", "root": str(self.server.context.root), "jobs": self.server.context.jobs.summary(), } def _extensions(self) -> dict[str, Any]: root = self.server.context.root return { "extensions": [ { "id": extension.id, "name": extension.manifest["name"], "version": extension.manifest["version"], "type": extension.manifest["extension_type"], "path": _display_path(root, extension.path), "source": extension.source, } for extension in discover_extensions(root, self.server.context.extension_dirs) ] } def _validate_profile(self) -> dict[str, Any]: payload = self._read_payload() kind = _required_string(payload, "kind") path = _path_from_payload(self.server.context.root, payload, "path") if kind == "target": profile = validate_target_profile(path) return { "status": "valid", "profile_kind": "target", "profile_id": profile["id"], "path": str(path), } if kind == "assessment": profile = validate_assessment_profile(path) return { "status": "valid", "profile_kind": "assessment", "profile_id": profile["id"], "path": str(path), } raise HttpProblem(400, "kind must be 'target' or 'assessment'") def _plan_assessment(self) -> dict[str, Any]: payload = self._read_payload() target = _path_from_payload(self.server.context.root, payload, "target") assessment = _path_from_payload(self.server.context.root, payload, "assessment") extension_dirs = _extension_dirs_from_payload(self.server.context, payload) return build_run_plan(self.server.context.root, target, assessment, extension_dirs) def _start_run(self) -> dict[str, Any]: payload = self._read_payload() target = _path_from_payload(self.server.context.root, payload, "target") assessment = _path_from_payload(self.server.context.root, payload, "assessment") output_dir = _optional_path_from_payload(self.server.context.root, payload, "output_dir") extension_dirs = _extension_dirs_from_payload(self.server.context, payload) request = { "target": str(target), "assessment": str(assessment), "output_dir": str(output_dir) if output_dir is not None else None, "extension_dirs": [str(path) for path in extension_dirs], } job = self.server.context.jobs.create(request) thread = threading.Thread( target=_execute_run_job, args=( self.server.context, job["job_id"], target, assessment, output_dir, extension_dirs, ), daemon=True, ) thread.start() return job def _run_reports(self, job_id: str) -> dict[str, Any]: job = self.server.context.jobs.get(job_id) result = job.get("result") if job["status"] != "succeeded" or not isinstance(result, dict): raise HttpProblem(409, f"reports are not available for job status {job['status']}") report_path = Path(result["report"]) package_path = Path(result["assessment_package"]) retention_path = Path(result["retention_summary"]) try: report_markdown = report_path.read_text(encoding="utf-8") assessment_package = load_json(package_path) retention_summary = load_json(retention_path) except OSError as exc: raise HttpProblem(404, f"run report artifact is missing: {exc}") from exc return { "job_id": job_id, "run_id": result["run_id"], "status": job["status"], "assessment_status": result["status"], "paths": { "report": str(report_path), "assessment_package": str(package_path), "retention_summary": str(retention_path), }, "report": { "path": str(report_path), "markdown": report_markdown, }, "assessment_package": { "path": str(package_path), "json": assessment_package, }, "retention_summary": { "path": str(retention_path), "json": retention_summary, }, } def _read_payload(self) -> dict[str, Any]: length = int(self.headers.get("Content-Length", "0") or "0") if length == 0: return {} payload = json.loads(self.rfile.read(length).decode("utf-8")) if not isinstance(payload, dict): raise HttpProblem(400, "request body must be a JSON object") return payload def _send_json(self, status_code: int, payload: dict[str, Any]) -> None: body = json.dumps(payload, indent=2, sort_keys=True).encode("utf-8") self.send_response(status_code) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("Access-Control-Allow-Origin", "http://127.0.0.1") self.end_headers() self.wfile.write(body) def build_server( root: Path, extension_dirs: list[Path] | None = None, host: str = "127.0.0.1", port: int = 8080, ) -> GuideBoardHTTPServer: server = GuideBoardHTTPServer((host, port), GuideBoardRequestHandler) server.context = ServiceContext( root=root.expanduser().resolve(), extension_dirs=[path.expanduser().resolve() for path in extension_dirs or []], jobs=JobStore(), ) return server def start_service( root: Path, extension_dirs: list[Path] | None = None, host: str = "127.0.0.1", port: int = 8080, ) -> ServiceHandle: server = build_server(root, extension_dirs, host, port) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() return ServiceHandle(server=server, thread=thread) def serve_forever( root: Path, extension_dirs: list[Path] | None = None, host: str = "127.0.0.1", port: int = 8080, ) -> None: server = build_server(root, extension_dirs, host, port) try: server.serve_forever() finally: server.server_close() def _execute_run_job( context: ServiceContext, job_id: str, target: Path, assessment: Path, output_dir: Path | None, extension_dirs: list[Path], ) -> None: context.jobs.update(job_id, status="running", started_at=_now()) try: result = run_assessment(context.root, target, assessment, output_dir, extension_dirs) except (GuideBoardError, OSError, ValueError) as exc: context.jobs.update( job_id, status="failed", completed_at=_now(), error={"type": exc.__class__.__name__, "message": str(exc)}, ) except Exception as exc: context.jobs.update( job_id, status="failed", completed_at=_now(), error={"type": exc.__class__.__name__, "message": str(exc)}, ) else: context.jobs.update(job_id, status="succeeded", completed_at=_now(), result=result) def _path_from_payload(root: Path, payload: dict[str, Any], field: str) -> Path: return _resolve_path(root, _required_string(payload, field)) def _optional_path_from_payload(root: Path, payload: dict[str, Any], field: str) -> Path | None: value = payload.get(field) if value is None: return None if not isinstance(value, str) or not value: raise HttpProblem(400, f"{field} must be a non-empty string") return _resolve_path(root, value) def _extension_dirs_from_payload( context: ServiceContext, payload: dict[str, Any], ) -> list[Path]: extension_dirs = list(context.extension_dirs) value = payload.get("extension_dirs") if value is None: return extension_dirs if not isinstance(value, list) or not all(isinstance(item, str) and item for item in value): raise HttpProblem(400, "extension_dirs must be a list of non-empty strings") extension_dirs.extend(_resolve_path(context.root, item) for item in value) return extension_dirs def _required_string(payload: dict[str, Any], field: str) -> str: value = payload.get(field) if not isinstance(value, str) or not value: raise HttpProblem(400, f"{field} must be a non-empty string") return value def _resolve_path(root: Path, value: str) -> Path: path = Path(value).expanduser() if not path.is_absolute(): path = root / path return path.resolve() def _match_run_path(path: str) -> tuple[str, str | None] | None: parts = [part for part in path.split("/") if part] if len(parts) == 2 and parts[0] == "runs": return parts[1], None if len(parts) == 3 and parts[0] == "runs": return parts[1], parts[2] return None def _display_path(root: Path, path: Path) -> str: try: return str(path.resolve().relative_to(root.resolve())) except ValueError: return str(path.resolve()) def _error_response(message: str, error_type: str, status_code: int) -> dict[str, Any]: return { "error": { "type": error_type, "status": status_code, "message": message, } } def _now() -> str: return datetime.now(timezone.utc).isoformat()