""" Claude Code CLI adapter — runs the ``claude`` CLI as a subprocess. """ import asyncio import json import os import subprocess from pathlib import Path from typing import Optional from llm_connect.adapter import LLMAdapter from llm_connect.models import RunConfig, LLMResponse from llm_connect.config import LLMConfig from llm_connect._token_estimator import estimate_tokens from llm_connect.exceptions import ( LLMSubprocessError, LLMTimeoutError, ) class ClaudeCodeAdapter(LLMAdapter): """LLM adapter that shells out to the ``claude`` CLI with ``--print``. The compiled prompt is piped via **stdin** to avoid shell argument length limits (compiled prompts can exceed 30 KB). """ def __init__( self, cli_path: Optional[str] = None, model: Optional[str] = None, config: Optional[LLMConfig] = None, ): self._config = config or LLMConfig(provider="claude-code") self._cli_path = cli_path or self._resolve_cli_path() self._model = model # ── LLMAdapter interface ──────────────────────────────────────── def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse: self._preflight_budget(config) cmd = self._build_command(config) timeout = config.timeout_seconds or self._config.timeout_seconds try: result = subprocess.run( cmd, input=prompt, capture_output=True, text=True, timeout=timeout, ) except subprocess.TimeoutExpired as exc: raise LLMTimeoutError( f"claude CLI timed out after {timeout}s", cause=exc, ) from exc if result.returncode != 0: raise LLMSubprocessError( f"claude CLI exited with code {result.returncode}", return_code=result.returncode, stderr=result.stderr, ) content = _unwrap_cli_json_envelope(result.stdout, config) prompt_tokens = estimate_tokens(prompt) completion_tokens = estimate_tokens(content) response = LLMResponse( content=content, model=self._model or "claude-code-cli", usage={ "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, }, finish_reason="stop", metadata={ "provider": "claude-code", "cli_path": self._cli_path, }, ) self._consume_budget(config, response) return response async def async_execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse: """Native async implementation using asyncio.create_subprocess_exec.""" self._preflight_budget(config) cmd = self._build_command(config) timeout = config.timeout_seconds or self._config.timeout_seconds try: proc = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout_bytes, stderr_bytes = await asyncio.wait_for( proc.communicate(input=prompt.encode()), timeout=timeout, ) except asyncio.TimeoutError as exc: raise LLMTimeoutError( f"claude CLI timed out after {timeout}s", cause=exc, ) from exc if proc.returncode != 0: raise LLMSubprocessError( f"claude CLI exited with code {proc.returncode}", return_code=proc.returncode, stderr=stderr_bytes.decode(), ) content = _unwrap_cli_json_envelope(stdout_bytes.decode(), config) prompt_tokens = estimate_tokens(prompt) completion_tokens = estimate_tokens(content) response = LLMResponse( content=content, model=self._model or "claude-code-cli", usage={ "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, }, finish_reason="stop", metadata={ "provider": "claude-code", "cli_path": self._cli_path, "async": True, }, ) self._consume_budget(config, response) return response def validate_config(self, config: RunConfig) -> bool: try: result = subprocess.run( [self._cli_path, "--version"], capture_output=True, text=True, timeout=10, ) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError, OSError): return False def _build_command(self, config: RunConfig) -> list[str]: cmd = [self._cli_path, "--print"] if self._model: cmd.extend(["--model", self._model]) json_schema = _json_schema_arg(config) if json_schema: cmd.extend(["--json-schema", json_schema]) # With --json-schema alone the CLI prints conversational text on # stdout while the structured payload ships on a sidecar channel # callers cannot reach. --output-format json forces the structured # response (wrapped in an envelope) onto stdout. cmd.extend(["--output-format", "json"]) return cmd def _resolve_cli_path(self) -> str: configured = ( os.environ.get("LLM_CONNECT_CLAUDE_CLI_PATH") or os.environ.get("CLAUDE_CLI_PATH") or self._config.claude_cli_path ) if configured and configured != "claude": return configured local_cli = Path.home() / ".local" / "bin" / "claude" if local_cli.exists(): return str(local_cli) return configured or "claude" def _json_schema_arg(config: RunConfig) -> str | None: schema = (config.model_params or {}).get("json_schema") if not schema: return None if isinstance(schema, str): return schema if isinstance(schema, dict): return json.dumps(schema, separators=(",", ":")) return None # Envelope field names Claude Code's `--output-format json` is known to use # for the model's primary textual response. Used as a fall-back when no field # carries a JSON-parseable payload (e.g. plain prose generation). _ENVELOPE_TEXT_FIELDS = ("result", "result_text", "content", "text", "output") def _unwrap_cli_json_envelope(stdout: str, config: RunConfig) -> str: """Extract the model's payload from Claude CLI's --output-format json envelope. Only runs when --json-schema was set (the only code path that adds --output-format json to the CLI invocation). Other callers keep the raw stdout behavior unchanged. Strategy: when --json-schema is set the caller wants JSON back, so prefer any envelope field whose value is itself valid JSON (dict, list, or a string that parses as JSON). This handles two observed envelope shapes: 1. Short prompts where the model emits the structured payload directly in the `result` field as a JSON-encoded string. 2. Longer prompts where the model emits a conversational preamble in `result` and the schema-enforced JSON in a separate field (the exact field name varies across CLI versions). Fall back to the first text field only when no JSON-bearing field exists, so non-schema callers via this code path still see the model's prose. Surface the raw envelope as a last resort so the operator can see what shape arrived and extend the strategy. """ if not _json_schema_arg(config): return stdout text = stdout.strip() if not text: return stdout try: envelope = json.loads(text) except json.JSONDecodeError: return stdout if not isinstance(envelope, dict): return stdout json_payload = _find_json_payload(envelope) if json_payload is not None: return json_payload for key in _ENVELOPE_TEXT_FIELDS: value = envelope.get(key) if isinstance(value, str): return value if isinstance(value, (dict, list)): return json.dumps(value) return stdout def _find_json_payload(envelope: dict) -> str | None: """Return the first envelope value that represents valid JSON. Insertion order is preserved by Python dicts, so this prefers fields the CLI lists earliest in its envelope. Skips obvious metadata keys (cost, usage, timing) so we never accidentally pick a numeric or telemetry value. """ for key, value in envelope.items(): if key in _ENVELOPE_METADATA_KEYS: continue if isinstance(value, (dict, list)): return json.dumps(value) if isinstance(value, str): stripped = value.strip() if stripped.startswith(("{", "[")): try: json.loads(stripped) except json.JSONDecodeError: continue return stripped return None # Envelope keys that carry telemetry, never the model payload. _ENVELOPE_METADATA_KEYS = frozenset({ "type", "subtype", "model", "usage", "total_cost_usd", "cost_usd", "duration_ms", "duration_api_ms", "num_turns", "session_id", "is_error", "stop_reason", "permission_denials", "uuid", })