feat: initial llm-connect package scaffold

Copy markitect.llm module into standalone llm_connect package.
All markitect.* imports replaced with llm_connect.* equivalents.
LLMError base class inlined (no markitect.exceptions dependency).
Verified: from llm_connect import create_adapter works.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-27 07:54:42 +01:00
commit e499edba90
36 changed files with 1783 additions and 0 deletions

67
llm_connect/__init__.py Normal file
View File

@@ -0,0 +1,67 @@
"""
llm-connect — Pluggable LLM adapters.
Provides concrete :class:`LLMAdapter` implementations backed by
OpenRouter (HTTP), Gemini, OpenAI, and Claude Code CLI (subprocess).
Quick start::
from llm_connect import create_adapter
adapter = create_adapter("openrouter", model="anthropic/claude-sonnet-4")
response = adapter.execute_prompt(prompt, run_config)
"""
from llm_connect.models import RunConfig, LLMResponse
from llm_connect.adapter import LLMAdapter, MockLLMAdapter, ErrorLLMAdapter
from llm_connect.factory import create_adapter
from llm_connect.openrouter import OpenRouterAdapter
from llm_connect.claude_code import ClaudeCodeAdapter
from llm_connect.gemini import GeminiAdapter
from llm_connect.openai import OpenAIAdapter
from llm_connect.config import LLMConfig, load_config
from llm_connect.exceptions import (
LLMError,
LLMConfigurationError,
LLMAPIError,
LLMRateLimitError,
LLMTimeoutError,
LLMSubprocessError,
)
from llm_connect.embedding_adapter import EmbeddingAdapter
from llm_connect.embedding_openai import OpenAICompatibleEmbeddingAdapter
from llm_connect.embedding_cache import EmbeddingCache
from llm_connect.embedding_factory import create_embedding_adapter
from llm_connect.similarity import (
cosine_similarity,
similarity_matrix,
find_similar_pairs,
)
__all__ = [
"RunConfig",
"LLMResponse",
"LLMAdapter",
"MockLLMAdapter",
"ErrorLLMAdapter",
"create_adapter",
"OpenRouterAdapter",
"ClaudeCodeAdapter",
"GeminiAdapter",
"OpenAIAdapter",
"LLMConfig",
"load_config",
"LLMError",
"LLMConfigurationError",
"LLMAPIError",
"LLMRateLimitError",
"LLMTimeoutError",
"LLMSubprocessError",
"EmbeddingAdapter",
"OpenAICompatibleEmbeddingAdapter",
"EmbeddingCache",
"create_embedding_adapter",
"cosine_similarity",
"similarity_matrix",
"find_similar_pairs",
]

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

86
llm_connect/_http.py Normal file
View File

@@ -0,0 +1,86 @@
"""
Thin synchronous HTTP helper built on :mod:`urllib.request`.
Translates HTTP errors into typed :mod:`markitect.llm.exceptions`.
"""
import json
import urllib.request
import urllib.error
from typing import Dict, Any, Optional
from llm_connect.exceptions import (
LLMAPIError,
LLMRateLimitError,
LLMTimeoutError,
)
def post_json(
url: str,
payload: Dict[str, Any],
headers: Optional[Dict[str, str]] = None,
timeout: int = 300,
) -> Dict[str, Any]:
"""POST *payload* as JSON and return the parsed response body.
Raises:
LLMRateLimitError: on HTTP 429
LLMAPIError: on other non-2xx responses
LLMTimeoutError: on socket / read timeout
"""
data = json.dumps(payload).encode()
req = urllib.request.Request(
url,
data=data,
headers={"Content-Type": "application/json", **(headers or {})},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode()
try:
return json.loads(body)
except json.JSONDecodeError as exc:
preview = body[:300].replace("\n", "\\n")
raise LLMAPIError(
f"Invalid JSON response from {url}: {exc} — body preview: {preview!r}",
cause=exc,
) from exc
except urllib.error.HTTPError as exc:
body = ""
try:
body = exc.read().decode()
except Exception:
pass
if exc.code == 429:
raise LLMRateLimitError(
f"Rate limited (429) from {url}",
status_code=429,
response_body=body,
cause=exc,
) from exc
raise LLMAPIError(
f"HTTP {exc.code} from {url}",
status_code=exc.code,
response_body=body,
cause=exc,
) from exc
except urllib.error.URLError as exc:
if "timed out" in str(exc.reason):
raise LLMTimeoutError(
f"Request to {url} timed out after {timeout}s",
cause=exc,
) from exc
raise LLMAPIError(
f"URL error for {url}: {exc.reason}",
cause=exc,
) from exc
except TimeoutError as exc:
raise LLMTimeoutError(
f"Request to {url} timed out after {timeout}s",
cause=exc,
) from exc

View File

@@ -0,0 +1,16 @@
"""
Rough token estimation for backends that don't return usage data.
Uses the ~4 characters per token heuristic common across English LLM tokenizers.
"""
def estimate_tokens(text: str) -> int:
"""Estimate the number of tokens in *text*.
This is intentionally coarse — it is only used by the Claude Code CLI
adapter where real token counts are unavailable.
"""
if not text:
return 0
return max(1, len(text) // 4)

169
llm_connect/adapter.py Normal file
View File

@@ -0,0 +1,169 @@
"""
LLM adapter interface for pluggable model providers.
Implements abstraction layer for LLM integration, supporting
multiple providers (OpenAI, Anthropic, local models, etc.).
"""
from abc import ABC, abstractmethod
from typing import Dict, Any
from llm_connect.models import RunConfig, LLMResponse
class LLMAdapter(ABC):
"""
Abstract base class for LLM providers.
Enables pluggable LLM backends without prescribing implementation.
Implementations can wrap OpenAI, Anthropic, or other APIs.
"""
@abstractmethod
def execute_prompt(
self,
prompt: str,
config: RunConfig,
) -> LLMResponse:
"""
Execute a prompt with the LLM.
Args:
prompt: Compiled prompt text
config: Execution configuration
Returns:
LLMResponse with generated content
Raises:
Exception: On LLM API errors
"""
pass
@abstractmethod
def validate_config(self, config: RunConfig) -> bool:
"""
Validate that configuration is supported.
Args:
config: Configuration to validate
Returns:
True if valid, False otherwise
"""
pass
class MockLLMAdapter(LLMAdapter):
"""
Mock LLM adapter for testing.
Returns deterministic responses without calling external APIs.
"""
def __init__(self, mock_response: str = "Mock LLM response"):
"""
Initialize mock adapter.
Args:
mock_response: Response to return
"""
self.mock_response = mock_response
self.call_count = 0
self.last_prompt = None
self.last_config = None
def execute_prompt(
self,
prompt: str,
config: RunConfig,
) -> LLMResponse:
"""
Return mock response.
Args:
prompt: Prompt (stored for inspection)
config: Config (stored for inspection)
Returns:
Mock LLMResponse
"""
self.call_count += 1
self.last_prompt = prompt
self.last_config = config
return LLMResponse(
content=self.mock_response,
model=config.model_name,
usage={
"prompt_tokens": len(prompt.split()),
"completion_tokens": len(self.mock_response.split()),
"total_tokens": len(prompt.split()) + len(self.mock_response.split()),
},
finish_reason="stop",
metadata={"mock": True},
)
def validate_config(self, config: RunConfig) -> bool:
"""
Mock validation always succeeds.
Args:
config: Configuration
Returns:
Always True
"""
return True
def reset(self) -> None:
"""Reset mock state."""
self.call_count = 0
self.last_prompt = None
self.last_config = None
class ErrorLLMAdapter(LLMAdapter):
"""
Mock adapter that always raises an error.
Useful for testing error handling.
"""
def __init__(self, error_message: str = "Mock LLM error"):
"""
Initialize error adapter.
Args:
error_message: Error message to raise
"""
self.error_message = error_message
def execute_prompt(
self,
prompt: str,
config: RunConfig,
) -> LLMResponse:
"""
Raise error.
Args:
prompt: Prompt
config: Config
Raises:
RuntimeError: Always
"""
raise RuntimeError(self.error_message)
def validate_config(self, config: RunConfig) -> bool:
"""
Validation succeeds.
Args:
config: Configuration
Returns:
True
"""
return True

View File

@@ -0,0 +1,94 @@
"""
Claude Code CLI adapter — runs the ``claude`` CLI as a subprocess.
"""
import subprocess
from typing import Optional
from llm_connect.adapter import LLMAdapter
from llm_connect.models import RunConfig, LLMResponse
from llm_connect.config import LLMConfig
from llm_connect._token_estimator import estimate_tokens
from llm_connect.exceptions import (
LLMSubprocessError,
LLMTimeoutError,
)
class ClaudeCodeAdapter(LLMAdapter):
"""LLM adapter that shells out to the ``claude`` CLI with ``--print``.
The compiled prompt is piped via **stdin** to avoid shell argument
length limits (compiled prompts can exceed 30 KB).
"""
def __init__(
self,
cli_path: str = "claude",
model: Optional[str] = None,
config: Optional[LLMConfig] = None,
):
self._config = config or LLMConfig(provider="claude-code")
self._cli_path = cli_path or self._config.claude_cli_path
self._model = model
# ── LLMAdapter interface ────────────────────────────────────────
def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse:
cmd = [self._cli_path, "--print"]
if self._model:
cmd.extend(["--model", self._model])
timeout = config.timeout_seconds or self._config.timeout_seconds
try:
result = subprocess.run(
cmd,
input=prompt,
capture_output=True,
text=True,
timeout=timeout,
)
except subprocess.TimeoutExpired as exc:
raise LLMTimeoutError(
f"claude CLI timed out after {timeout}s",
cause=exc,
) from exc
if result.returncode != 0:
raise LLMSubprocessError(
f"claude CLI exited with code {result.returncode}",
return_code=result.returncode,
stderr=result.stderr,
)
content = result.stdout
prompt_tokens = estimate_tokens(prompt)
completion_tokens = estimate_tokens(content)
return LLMResponse(
content=content,
model=self._model or "claude-code-cli",
usage={
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
},
finish_reason="stop",
metadata={
"provider": "claude-code",
"cli_path": self._cli_path,
},
)
def validate_config(self, config: RunConfig) -> bool:
try:
result = subprocess.run(
[self._cli_path, "--version"],
capture_output=True,
text=True,
timeout=10,
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return False

108
llm_connect/config.py Normal file
View File

@@ -0,0 +1,108 @@
"""
LLM configuration and API key resolution.
"""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional, Dict, Any
import os
@dataclass
class LLMConfig:
"""Configuration for an LLM adapter.
Attributes:
provider: Backend identifier (``"openrouter"`` or ``"claude-code"``).
model: Model name / path sent to the provider.
api_key: Resolved API key (may be ``None`` for CLI backends).
api_base: Base URL for HTTP-based providers.
claude_cli_path: Path to the ``claude`` CLI binary.
timeout_seconds: Per-request timeout.
max_retries: Number of retry attempts on transient errors.
extra: Arbitrary provider-specific overrides.
"""
provider: str = "openrouter"
model: str = "anthropic/claude-sonnet-4"
api_key: Optional[str] = None
api_base: str = "https://openrouter.ai/api/v1"
claude_cli_path: str = "claude"
timeout_seconds: int = 300
max_retries: int = 3
extra: Dict[str, Any] = field(default_factory=dict)
def resolve_api_key(
explicit: Optional[str] = None,
env_var: str = "OPENROUTER_API_KEY",
key_file_paths: Optional[list[Path]] = None,
) -> Optional[str]:
"""Return an API key from the first available source.
Resolution order:
1. *explicit* argument (passed directly by caller)
2. Environment variable *env_var*
3. First readable file in *key_file_paths* whose content is non-empty
Returns ``None`` if no key can be found.
"""
if explicit:
return explicit
from_env = os.environ.get(env_var)
if from_env:
return from_env.strip()
for path in key_file_paths or []:
try:
text = path.read_text().strip()
if text:
return text
except OSError:
continue
return None
def find_project_root(start: Optional[Path] = None) -> Optional[Path]:
"""Walk up from *start* (default CWD) looking for ``pyproject.toml``.
Returns the directory containing the marker file, or ``None``.
"""
current = (start or Path.cwd()).resolve()
for directory in [current, *current.parents]:
if (directory / "pyproject.toml").is_file():
return directory
return None
def load_config(
provider: str = "openrouter",
model: Optional[str] = None,
api_key: Optional[str] = None,
**overrides: Any,
) -> LLMConfig:
"""Build an :class:`LLMConfig` with sensible defaults.
For the ``openrouter`` provider the API key is resolved via
:func:`resolve_api_key` (env var → project-root key file).
"""
root = find_project_root()
key_file_paths = [root / "apikey-openrouter.txt"] if root else []
resolved_key = api_key
if provider == "openrouter" and not resolved_key:
resolved_key = resolve_api_key(
explicit=None,
env_var="OPENROUTER_API_KEY",
key_file_paths=key_file_paths,
)
defaults: Dict[str, Any] = {
"provider": provider,
"model": model or "anthropic/claude-sonnet-4",
"api_key": resolved_key,
}
defaults.update(overrides)
return LLMConfig(**defaults)

View File

@@ -0,0 +1,34 @@
"""
Abstract base class for embedding adapters.
Embedding adapters convert text into float vectors. This is a separate
hierarchy from :class:`LLMAdapter` (text generation) because the API
contract is fundamentally different: text in, float vectors out.
"""
from abc import ABC, abstractmethod
class EmbeddingAdapter(ABC):
"""Base class for all embedding adapters."""
@abstractmethod
def embed(self, texts: list[str]) -> list[list[float]]:
"""Embed a batch of texts into vectors.
Args:
texts: One or more strings to embed.
Returns:
A list of embedding vectors, one per input text,
in the same order as *texts*.
"""
@abstractmethod
def validate(self) -> bool:
"""Check that the adapter is configured correctly.
Returns:
``True`` if the adapter has a valid configuration
(e.g. API key present), ``False`` otherwise.
"""

View File

@@ -0,0 +1,64 @@
"""
File-based embedding cache.
Stores embedding vectors in a single JSON file keyed by entity slug.
Each entry includes a content digest so stale embeddings are
automatically invalidated when entity content changes.
"""
import json
from pathlib import Path
from typing import Optional
class EmbeddingCache:
"""Persistent cache for embedding vectors.
Structure on disk (``embeddings.json``)::
{
"division-of-labour": {"digest": "abc123", "vector": [0.1, ...]},
...
}
"""
def __init__(self, cache_dir: Path):
self._path = cache_dir / "embeddings.json"
self._data: dict[str, dict] = {}
self._hits = 0
self._misses = 0
self._load()
def get(self, slug: str, content_digest: str) -> Optional[list[float]]:
"""Return the cached vector if *content_digest* matches, else ``None``."""
entry = self._data.get(slug)
if entry is not None and entry.get("digest") == content_digest:
self._hits += 1
return entry["vector"]
self._misses += 1
return None
def put(self, slug: str, content_digest: str, vector: list[float]) -> None:
"""Store or overwrite the embedding for *slug*."""
self._data[slug] = {"digest": content_digest, "vector": vector}
def save(self) -> None:
"""Write cache to disk."""
self._path.parent.mkdir(parents=True, exist_ok=True)
self._path.write_text(json.dumps(self._data, separators=(",", ":")))
def stats(self) -> dict:
"""Return cache statistics."""
return {
"entries": len(self._data),
"hits": self._hits,
"misses": self._misses,
}
def _load(self) -> None:
"""Read cache from disk if it exists."""
if self._path.is_file():
try:
self._data = json.loads(self._path.read_text())
except (json.JSONDecodeError, OSError):
self._data = {}

View File

@@ -0,0 +1,50 @@
"""
Factory for creating embedding adapters by provider name.
"""
from typing import Optional, Any
from llm_connect.embedding_adapter import EmbeddingAdapter
from llm_connect.exceptions import LLMConfigurationError
_EMBEDDING_PROVIDERS = {
"openai": "llm_connect.embedding_openai.OpenAICompatibleEmbeddingAdapter",
"openrouter": "llm_connect.embedding_openai.OpenAICompatibleEmbeddingAdapter",
}
def create_embedding_adapter(
provider: str = "openai",
model: Optional[str] = None,
api_key: Optional[str] = None,
**kwargs: Any,
) -> EmbeddingAdapter:
"""Instantiate an :class:`EmbeddingAdapter` for the given *provider*.
Args:
provider: ``"openai"`` or ``"openrouter"``.
model: Embedding model name (e.g. ``"text-embedding-3-small"``).
api_key: Explicit API key.
**kwargs: Extra keyword arguments forwarded to the adapter.
Returns:
A ready-to-use :class:`EmbeddingAdapter` instance.
Raises:
LLMConfigurationError: If *provider* is not recognised.
"""
if provider not in _EMBEDDING_PROVIDERS:
known = ", ".join(sorted(_EMBEDDING_PROVIDERS))
raise LLMConfigurationError(
f"Unknown embedding provider {provider!r}. Choose from: {known}",
context={"provider": provider},
)
# Lazy import
fqn = _EMBEDDING_PROVIDERS[provider]
module_path, class_name = fqn.rsplit(".", 1)
import importlib
mod = importlib.import_module(module_path)
cls = getattr(mod, class_name)
return cls(model=model, api_key=api_key, provider=provider, **kwargs)

View File

@@ -0,0 +1,125 @@
"""
OpenAI-compatible embedding adapter.
Works with both OpenAI (``/v1/embeddings``) and OpenRouter
(``/api/v1/embeddings``) since they share the same API format.
The *provider* parameter determines the default base URL and
API key environment variable.
"""
import time
from typing import Optional, Dict, Any
from llm_connect.embedding_adapter import EmbeddingAdapter
from llm_connect.config import resolve_api_key, find_project_root
from llm_connect._http import post_json
from llm_connect.exceptions import (
LLMConfigurationError,
LLMAPIError,
LLMRateLimitError,
)
_DEFAULT_MODEL = "text-embedding-3-small"
_PROVIDER_DEFAULTS: Dict[str, Dict[str, str]] = {
"openai": {
"api_base": "https://api.openai.com/v1",
"env_var": "OPENAI_API_KEY",
},
"openrouter": {
"api_base": "https://openrouter.ai/api/v1",
"env_var": "OPENROUTER_API_KEY",
},
}
class OpenAICompatibleEmbeddingAdapter(EmbeddingAdapter):
"""Embedding adapter for OpenAI-compatible endpoints.
A single class handles both OpenAI and OpenRouter because they
expose the same ``/embeddings`` endpoint format.
"""
def __init__(
self,
model: Optional[str] = None,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
provider: str = "openai",
max_retries: int = 3,
):
if provider not in _PROVIDER_DEFAULTS:
known = ", ".join(sorted(_PROVIDER_DEFAULTS))
raise LLMConfigurationError(
f"Unknown embedding provider {provider!r}. Choose from: {known}",
context={"provider": provider},
)
defaults = _PROVIDER_DEFAULTS[provider]
self._model = model or _DEFAULT_MODEL
self._api_base = (api_base or defaults["api_base"]).rstrip("/")
self._max_retries = max_retries
self._provider = provider
# Resolve API key
env_var = defaults["env_var"]
root = find_project_root()
key_file_paths = [root / f"apikey-{provider}.txt"] if root else []
self._api_key = resolve_api_key(
explicit=api_key,
env_var=env_var,
key_file_paths=key_file_paths,
)
def embed(self, texts: list[str]) -> list[list[float]]:
"""Embed texts via the OpenAI-compatible ``/embeddings`` endpoint.
Raises:
LLMConfigurationError: If no API key is configured.
LLMAPIError: On HTTP errors after retries are exhausted.
"""
if not self._api_key:
raise LLMConfigurationError(
"No API key configured for embedding adapter",
context={"provider": self._provider},
)
url = f"{self._api_base}/embeddings"
payload: Dict[str, Any] = {
"model": self._model,
"input": texts,
}
headers = {"Authorization": f"Bearer {self._api_key}"}
data = self._post_with_retries(url, payload, headers)
# Response: {"data": [{"embedding": [...], "index": 0}, ...]}
# Sort by index to guarantee input order.
items = sorted(data["data"], key=lambda d: d["index"])
return [item["embedding"] for item in items]
def validate(self) -> bool:
"""Return ``True`` if an API key is available."""
return self._api_key is not None
def _post_with_retries(
self,
url: str,
payload: Dict[str, Any],
headers: Dict[str, str],
) -> Dict[str, Any]:
last_exc: Optional[Exception] = None
for attempt in range(self._max_retries + 1):
try:
return post_json(url, payload, headers)
except LLMRateLimitError as exc:
last_exc = exc
if attempt < self._max_retries:
time.sleep(2 ** attempt)
except LLMAPIError as exc:
if exc.status_code >= 500 and attempt < self._max_retries:
last_exc = exc
time.sleep(2 ** attempt)
else:
raise
raise last_exc # type: ignore[misc]

85
llm_connect/exceptions.py Normal file
View File

@@ -0,0 +1,85 @@
"""
LLM-specific exceptions.
"""
from typing import Optional, Dict, Any
class LLMError(Exception):
"""Base exception for all LLM operations."""
def __init__(
self,
message: str,
cause: Optional[Exception] = None,
context: Optional[Dict[str, Any]] = None,
):
super().__init__(message)
self.cause = cause
self.context = context or {}
if cause:
self.__cause__ = cause
def __str__(self) -> str:
base = super().__str__()
if self.context:
ctx = ", ".join(f"{k}={v}" for k, v in self.context.items())
base = f"{base} [Context: {ctx}]"
return base
class LLMConfigurationError(LLMError):
"""Missing API key, invalid model name, or bad provider config."""
pass
class LLMAPIError(LLMError):
"""HTTP-level failure from an LLM provider API.
Attributes:
status_code: HTTP status code (e.g. 500, 502).
response_body: Raw response body text, if available.
"""
def __init__(
self,
message: str,
status_code: int = 0,
response_body: str = "",
cause: Optional[Exception] = None,
context: Optional[Dict[str, Any]] = None,
):
super().__init__(message, cause=cause, context=context)
self.status_code = status_code
self.response_body = response_body
class LLMRateLimitError(LLMAPIError):
"""429 Too Many Requests from the provider."""
pass
class LLMTimeoutError(LLMError):
"""Request or subprocess exceeded the configured timeout."""
pass
class LLMSubprocessError(LLMError):
"""Claude Code CLI subprocess failed.
Attributes:
return_code: Process exit code.
stderr: Captured stderr text.
"""
def __init__(
self,
message: str,
return_code: int = 1,
stderr: str = "",
cause: Optional[Exception] = None,
context: Optional[Dict[str, Any]] = None,
):
super().__init__(message, cause=cause, context=context)
self.return_code = return_code
self.stderr = stderr

60
llm_connect/factory.py Normal file
View File

@@ -0,0 +1,60 @@
"""
Factory for creating LLM adapters by provider name.
"""
from typing import Optional, Dict, Any
from llm_connect.adapter import LLMAdapter
from llm_connect.exceptions import LLMConfigurationError
# Lazy imports to avoid pulling in every adapter at module load time.
_PROVIDERS: Dict[str, str] = {
"openrouter": "llm_connect.openrouter.OpenRouterAdapter",
"claude-code": "llm_connect.claude_code.ClaudeCodeAdapter",
"gemini": "llm_connect.gemini.GeminiAdapter",
"openai": "llm_connect.openai.OpenAIAdapter",
}
def create_adapter(
provider: str = "openrouter",
model: Optional[str] = None,
api_key: Optional[str] = None,
system_prompt: Optional[str] = None,
**kwargs: Any,
) -> LLMAdapter:
"""Instantiate an :class:`LLMAdapter` for the given *provider*.
Args:
provider: ``"openrouter"``, ``"claude-code"``, ``"gemini"``, or ``"openai"``.
model: Model name (passed to the adapter constructor).
api_key: Explicit API key (OpenRouter / Gemini / OpenAI).
system_prompt: Optional system prompt (OpenRouter / Gemini / OpenAI).
**kwargs: Extra keyword arguments forwarded to the adapter.
Returns:
A ready-to-use :class:`LLMAdapter` instance.
Raises:
LLMConfigurationError: If *provider* is not recognised.
"""
if provider not in _PROVIDERS:
known = ", ".join(sorted(_PROVIDERS))
raise LLMConfigurationError(
f"Unknown LLM provider {provider!r}. Choose from: {known}",
context={"provider": provider},
)
# Lazy import
fqn = _PROVIDERS[provider]
module_path, class_name = fqn.rsplit(".", 1)
import importlib
mod = importlib.import_module(module_path)
cls = getattr(mod, class_name)
if provider in ("openrouter", "gemini", "openai"):
return cls(model=model, api_key=api_key, system_prompt=system_prompt, **kwargs)
elif provider == "claude-code":
return cls(model=model, **kwargs)
else:
return cls(**kwargs) # pragma: no cover

115
llm_connect/gemini.py Normal file
View File

@@ -0,0 +1,115 @@
"""
Google Gemini adapter — calls the Generative Language REST API directly.
"""
import time
from typing import Optional, Dict, Any
from llm_connect.adapter import LLMAdapter
from llm_connect.models import RunConfig, LLMResponse
from llm_connect.config import resolve_api_key, find_project_root
from llm_connect._http import post_json
from llm_connect.exceptions import LLMConfigurationError
_DEFAULT_MODEL = "gemini-2.5-flash"
_API_BASE = "https://generativelanguage.googleapis.com/v1beta"
class GeminiAdapter(LLMAdapter):
"""LLM adapter that calls the Google Generative Language API.
Supports the free tier of Gemini models via a Google AI Studio API key.
"""
def __init__(
self,
model: Optional[str] = None,
api_key: Optional[str] = None,
system_prompt: Optional[str] = None,
**_kwargs: Any,
):
self._model = model or _DEFAULT_MODEL
self._system_prompt = system_prompt
root = find_project_root()
key_file_paths = [root / "apikey-geminifree.txt"] if root else []
self._api_key = resolve_api_key(
explicit=api_key,
env_var="GEMINI_API_KEY",
key_file_paths=key_file_paths,
)
if not self._api_key:
raise LLMConfigurationError(
"No Gemini API key found. Set GEMINI_API_KEY or create "
"apikey-geminifree.txt in the project root.",
context={"provider": "gemini"},
)
# ── LLMAdapter interface ────────────────────────────────────────
def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse:
model = self._model
# Build Gemini request
contents: list[Dict[str, Any]] = []
if self._system_prompt:
contents.append({
"role": "user",
"parts": [{"text": self._system_prompt}],
})
contents.append({
"role": "model",
"parts": [{"text": "Understood."}],
})
contents.append({
"role": "user",
"parts": [{"text": prompt}],
})
payload: Dict[str, Any] = {
"contents": contents,
"generationConfig": {
"temperature": config.temperature,
"maxOutputTokens": config.max_tokens,
},
}
url = f"{_API_BASE}/models/{model}:generateContent?key={self._api_key}"
start = time.time()
data = post_json(url, payload, timeout=config.timeout_seconds)
latency = time.time() - start
# Parse Gemini response
candidates = data.get("candidates", [])
if not candidates:
content = ""
finish_reason = "error"
else:
parts = candidates[0].get("content", {}).get("parts", [])
content = "".join(p.get("text", "") for p in parts)
finish_reason = candidates[0].get("finishReason", "STOP").lower()
usage_meta = data.get("usageMetadata", {})
return LLMResponse(
content=content,
model=model,
usage={
"prompt_tokens": usage_meta.get("promptTokenCount", 0),
"completion_tokens": usage_meta.get("candidatesTokenCount", 0),
"total_tokens": usage_meta.get("totalTokenCount", 0),
},
finish_reason=finish_reason,
metadata={
"provider": "gemini",
"latency_seconds": round(latency, 3),
},
)
def validate_config(self, config: RunConfig) -> bool:
if not self._api_key:
return False
if not (0.0 <= config.temperature <= 2.0):
return False
return True

86
llm_connect/models.py Normal file
View File

@@ -0,0 +1,86 @@
"""
Shared data models for LLM execution.
These classes are the canonical definitions; they are re-exported by
markitect.prompts.execution.models for backward compatibility.
"""
from dataclasses import dataclass, field
from typing import Dict, Any
@dataclass
class RunConfig:
"""
Configuration for prompt execution.
Attributes:
model_name: LLM model to use
temperature: Model temperature (0.0-1.0)
max_tokens: Maximum tokens to generate
model_params: Additional model parameters
max_depth: Maximum generation depth for nested runs
skip_if_exists: Skip if identical InputBundleHash exists
timeout_seconds: Execution timeout
"""
model_name: str = "gpt-4"
temperature: float = 0.7
max_tokens: int = 2000
model_params: Dict[str, Any] = field(default_factory=dict)
max_depth: int = 3
skip_if_exists: bool = True
timeout_seconds: int = 300
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"model_name": self.model_name,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"model_params": self.model_params,
"max_depth": self.max_depth,
"skip_if_exists": self.skip_if_exists,
"timeout_seconds": self.timeout_seconds,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "RunConfig":
"""Create from dictionary."""
return cls(
model_name=data.get("model_name", "gpt-4"),
temperature=data.get("temperature", 0.7),
max_tokens=data.get("max_tokens", 2000),
model_params=data.get("model_params", {}),
max_depth=data.get("max_depth", 3),
skip_if_exists=data.get("skip_if_exists", True),
timeout_seconds=data.get("timeout_seconds", 300),
)
@dataclass
class LLMResponse:
"""
Response from LLM execution.
Attributes:
content: Generated content
model: Model used
usage: Token usage statistics
finish_reason: Why generation stopped
metadata: Additional response metadata
"""
content: str
model: str
usage: Dict[str, int] = field(default_factory=dict)
finish_reason: str = "stop"
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"content": self.content,
"model": self.model,
"usage": self.usage,
"finish_reason": self.finish_reason,
"metadata": self.metadata,
}

129
llm_connect/openai.py Normal file
View File

@@ -0,0 +1,129 @@
"""
OpenAI (ChatGPT) adapter — calls the OpenAI chat completions API.
"""
import time
from typing import Optional, Dict, Any
from llm_connect.adapter import LLMAdapter
from llm_connect.models import RunConfig, LLMResponse
from llm_connect.config import resolve_api_key, find_project_root
from llm_connect._http import post_json
from llm_connect.exceptions import (
LLMConfigurationError,
LLMAPIError,
LLMRateLimitError,
)
_DEFAULT_MODEL = "gpt-4.1-mini"
_API_BASE = "https://api.openai.com/v1"
class OpenAIAdapter(LLMAdapter):
"""LLM adapter that calls the OpenAI chat completions endpoint."""
def __init__(
self,
model: Optional[str] = None,
api_key: Optional[str] = None,
system_prompt: Optional[str] = None,
max_retries: int = 3,
**_kwargs: Any,
):
self._model = model or _DEFAULT_MODEL
self._system_prompt = system_prompt
self._max_retries = max_retries
root = find_project_root()
key_file_paths = [root / "apikey-chatgpt.txt"] if root else []
self._api_key = resolve_api_key(
explicit=api_key,
env_var="OPENAI_API_KEY",
key_file_paths=key_file_paths,
)
if not self._api_key:
raise LLMConfigurationError(
"No OpenAI API key found. Set OPENAI_API_KEY or create "
"apikey-chatgpt.txt in the project root.",
context={"provider": "openai"},
)
# ── LLMAdapter interface ────────────────────────────────────────
def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse:
model = self._model
messages: list[Dict[str, str]] = []
if self._system_prompt:
messages.append({"role": "system", "content": self._system_prompt})
messages.append({"role": "user", "content": prompt})
payload: Dict[str, Any] = {
"model": model,
"messages": messages,
"temperature": config.temperature,
"max_tokens": config.max_tokens,
}
headers = {
"Authorization": f"Bearer {self._api_key}",
}
url = f"{_API_BASE}/chat/completions"
start = time.time()
data = self._post_with_retries(url, payload, headers, config.timeout_seconds)
latency = time.time() - start
# Parse response (OpenAI chat completions format)
choice = data.get("choices", [{}])[0]
content = choice.get("message", {}).get("content", "")
finish_reason = choice.get("finish_reason", "stop")
usage = data.get("usage", {})
return LLMResponse(
content=content,
model=data.get("model", model),
usage={
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
finish_reason=finish_reason,
metadata={
"provider": "openai",
"latency_seconds": round(latency, 3),
"response_id": data.get("id", ""),
},
)
def validate_config(self, config: RunConfig) -> bool:
if not self._api_key:
return False
if not (0.0 <= config.temperature <= 2.0):
return False
return True
# ── Internals ───────────────────────────────────────────────────
def _post_with_retries(
self,
url: str,
payload: Dict[str, Any],
headers: Dict[str, str],
timeout: int,
) -> Dict[str, Any]:
last_exc: Optional[Exception] = None
for attempt in range(self._max_retries + 1):
try:
return post_json(url, payload, headers, timeout=timeout)
except LLMRateLimitError as exc:
last_exc = exc
if attempt < self._max_retries:
time.sleep(2 ** attempt)
except LLMAPIError as exc:
if exc.status_code >= 500 and attempt < self._max_retries:
last_exc = exc
time.sleep(2 ** attempt)
else:
raise
raise last_exc # type: ignore[misc]

139
llm_connect/openrouter.py Normal file
View File

@@ -0,0 +1,139 @@
"""
OpenRouter adapter — calls the OpenAI-compatible chat completions API.
"""
import time
from typing import Optional, Dict, Any
from llm_connect.adapter import LLMAdapter
from llm_connect.models import RunConfig, LLMResponse
from llm_connect.config import LLMConfig, resolve_api_key, find_project_root
from llm_connect._http import post_json
from llm_connect.exceptions import (
LLMConfigurationError,
LLMAPIError,
LLMRateLimitError,
)
_DEFAULT_MODEL = "anthropic/claude-sonnet-4"
class OpenRouterAdapter(LLMAdapter):
"""LLM adapter that calls the OpenRouter chat completions endpoint.
Constructor args override values from *config*; *config* overrides
global defaults. The model used for a given call is resolved as:
``constructor model > RunConfig.model_name > default``.
"""
def __init__(
self,
model: Optional[str] = None,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
config: Optional[LLMConfig] = None,
system_prompt: Optional[str] = None,
extra_headers: Optional[Dict[str, str]] = None,
max_retries: Optional[int] = None,
):
self._config = config or LLMConfig()
self._model = model or self._config.model or _DEFAULT_MODEL
self._api_base = (api_base or self._config.api_base).rstrip("/")
self._system_prompt = system_prompt
self._extra_headers = extra_headers or {}
self._max_retries = max_retries if max_retries is not None else self._config.max_retries
# Resolve API key
root = find_project_root()
key_file_paths = [root / "apikey-openrouter.txt"] if root else []
self._api_key = resolve_api_key(
explicit=api_key or self._config.api_key,
env_var="OPENROUTER_API_KEY",
key_file_paths=key_file_paths,
)
# ── LLMAdapter interface ────────────────────────────────────────
def execute_prompt(self, prompt: str, config: RunConfig) -> LLMResponse:
model = self._model if self._model != _DEFAULT_MODEL else (config.model_name or self._model)
messages: list[Dict[str, str]] = []
if self._system_prompt:
messages.append({"role": "system", "content": self._system_prompt})
messages.append({"role": "user", "content": prompt})
payload: Dict[str, Any] = {
"model": model,
"messages": messages,
"temperature": config.temperature,
"max_tokens": config.max_tokens,
}
# Merge extra model_params from RunConfig
if config.model_params:
payload.update(config.model_params)
headers = {
"Authorization": f"Bearer {self._api_key}",
**self._extra_headers,
}
url = f"{self._api_base}/chat/completions"
start = time.time()
data = self._post_with_retries(url, payload, headers, config.timeout_seconds)
latency = time.time() - start
# Parse response
choice = data.get("choices", [{}])[0]
content = choice.get("message", {}).get("content", "")
finish_reason = choice.get("finish_reason", "stop")
usage = data.get("usage", {})
return LLMResponse(
content=content,
model=data.get("model", model),
usage={
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
finish_reason=finish_reason,
metadata={
"provider": "openrouter",
"latency_seconds": round(latency, 3),
"response_id": data.get("id", ""),
},
)
def validate_config(self, config: RunConfig) -> bool:
if not self._api_key:
return False
if not (self._model or config.model_name):
return False
if not (0.0 <= config.temperature <= 2.0):
return False
return True
# ── Internals ───────────────────────────────────────────────────
def _post_with_retries(
self,
url: str,
payload: Dict[str, Any],
headers: Dict[str, str],
timeout: int,
) -> Dict[str, Any]:
last_exc: Optional[Exception] = None
for attempt in range(self._max_retries + 1):
try:
return post_json(url, payload, headers, timeout=timeout)
except LLMRateLimitError as exc:
last_exc = exc
if attempt < self._max_retries:
time.sleep(2 ** attempt)
except LLMAPIError as exc:
if exc.status_code >= 500 and attempt < self._max_retries:
last_exc = exc
time.sleep(2 ** attempt)
else:
raise
raise last_exc # type: ignore[misc]

64
llm_connect/similarity.py Normal file
View File

@@ -0,0 +1,64 @@
"""
Pure-Python vector similarity utilities.
No external dependencies — uses :mod:`math` only. Sufficient for the
current entity scale (~100s). numpy can be substituted later if needed.
"""
import math
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Cosine similarity between two vectors.
Returns a float in [-1, 1]. Returns 0.0 if either vector has
zero magnitude (to avoid division by zero).
"""
dot = sum(x * y for x, y in zip(a, b))
mag_a = math.sqrt(sum(x * x for x in a))
mag_b = math.sqrt(sum(x * x for x in b))
if mag_a == 0.0 or mag_b == 0.0:
return 0.0
return dot / (mag_a * mag_b)
def similarity_matrix(embeddings: list[list[float]]) -> list[list[float]]:
"""Build an NxN cosine similarity matrix.
``matrix[i][j]`` is the cosine similarity between
``embeddings[i]`` and ``embeddings[j]``.
"""
n = len(embeddings)
mat: list[list[float]] = [[0.0] * n for _ in range(n)]
for i in range(n):
mat[i][i] = 1.0
for j in range(i + 1, n):
sim = cosine_similarity(embeddings[i], embeddings[j])
mat[i][j] = sim
mat[j][i] = sim
return mat
def find_similar_pairs(
embeddings: dict[str, list[float]],
threshold: float = 0.80,
) -> list[tuple[str, str, float]]:
"""Find all pairs with cosine similarity >= *threshold*.
Args:
embeddings: Mapping of slug → embedding vector.
threshold: Minimum similarity to include (default 0.80).
Returns:
List of ``(slug_a, slug_b, similarity)`` tuples sorted by
similarity descending.
"""
slugs = sorted(embeddings)
pairs: list[tuple[str, str, float]] = []
for i, slug_a in enumerate(slugs):
for slug_b in slugs[i + 1:]:
sim = cosine_similarity(embeddings[slug_a], embeddings[slug_b])
if sim >= threshold:
pairs.append((slug_a, slug_b, sim))
pairs.sort(key=lambda t: t[2], reverse=True)
return pairs

271
llm_connect/toml_config.py Normal file
View File

@@ -0,0 +1,271 @@
"""
TOML-based LLM configuration: defaults, preferences, and resolution.
Config files:
- Directory: ``<dir-with-pyproject.toml>/.markitect.toml``
- User: ``~/.config/markitect/config.toml``
Resolution order (highest → lowest):
1. CLI flags (``--provider``, ``--model``)
2. ``MARKITECT_HELPER_MODEL`` env var (model only)
3. User preference (``[llm.preference]`` in user config)
4. Directory preference (``[llm.preference]`` in directory config)
5. Directory default (``[llm.default]`` in directory config)
6. User default (``[llm.default]`` in user config)
7. Hardcoded fallback
"""
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import toml
from llm_connect.config import find_project_root
# ── Constants ─────────────────────────────────────────────────────────────
HARDCODED_PROVIDER = "gemini"
HARDCODED_MODEL = "gemini-2.5-flash"
# Default (markitect) values kept for backward compatibility.
MODEL_ENV_VAR = "MARKITECT_HELPER_MODEL"
USER_CONFIG_DIR = Path.home() / ".config" / "markitect"
USER_CONFIG_PATH = USER_CONFIG_DIR / "config.toml"
DIR_CONFIG_NAME = ".markitect.toml"
# ── App-name helpers ───────────────────────────────────────────────────────
def _model_env_var(app_name: str) -> str:
return f"{app_name.upper()}_HELPER_MODEL"
def _user_config_path(app_name: str) -> Path:
return Path.home() / ".config" / app_name / "config.toml"
def _dir_config_name(app_name: str) -> str:
return f".{app_name}.toml"
# ── Data classes ──────────────────────────────────────────────────────────
@dataclass
class LLMLayer:
"""One layer of provider/model configuration (may be partial)."""
provider: Optional[str] = None
model: Optional[str] = None
@dataclass
class ResolvedLLM:
"""Fully-resolved provider + model with source attribution."""
provider: str
model: str
provider_source: str
model_source: str
# ── Read / Write / Clear ─────────────────────────────────────────────────
def _read_llm_section(path: Path, section: str) -> LLMLayer:
"""Read ``[llm.<section>]`` from a TOML file. Returns empty layer on error."""
try:
data = toml.load(path)
except (OSError, toml.TomlDecodeError):
return LLMLayer()
llm = data.get("llm", {})
sec = llm.get(section, {})
return LLMLayer(
provider=sec.get("provider"),
model=sec.get("model"),
)
def _write_llm_section(path: Path, section: str, layer: LLMLayer) -> None:
"""Merge ``[llm.<section>]`` into a TOML file. Creates dirs as needed."""
path.parent.mkdir(parents=True, exist_ok=True)
try:
data = toml.load(path)
except (OSError, toml.TomlDecodeError):
data = {}
llm = data.setdefault("llm", {})
sec = llm.setdefault(section, {})
if layer.provider is not None:
sec["provider"] = layer.provider
if layer.model is not None:
sec["model"] = layer.model
with open(path, "w") as f:
toml.dump(data, f)
def _clear_llm_section(path: Path, section: str) -> bool:
"""Remove ``[llm.<section>]``. Returns True if something was cleared."""
try:
data = toml.load(path)
except (OSError, toml.TomlDecodeError):
return False
llm = data.get("llm")
if not isinstance(llm, dict) or section not in llm:
return False
del llm[section]
# Clean up empty [llm] table.
if not llm:
del data["llm"]
with open(path, "w") as f:
toml.dump(data, f)
return True
# ── Directory config path helper ─────────────────────────────────────────
def _dir_config_path(app_name: str = "markitect") -> Optional[Path]:
root = find_project_root()
if root is None:
return None
return root / _dir_config_name(app_name)
# ── Resolution ───────────────────────────────────────────────────────────
def resolve_llm(
cli_provider: Optional[str] = None,
cli_model: Optional[str] = None,
app_name: str = "markitect",
) -> ResolvedLLM:
"""Walk the 7-level priority chain and return a fully resolved config.
Provider and model are resolved independently — each takes the value
from its highest-priority source.
Args:
cli_provider: Provider override from CLI.
cli_model: Model override from CLI.
app_name: Application name used to derive config paths and the
env-var prefix (e.g. ``"railiance"`` → ``RAILIANCE_HELPER_MODEL``
and ``~/.config/railiance/config.toml``).
"""
dir_path = _dir_config_path(app_name)
user_cfg = _user_config_path(app_name)
env_var = _model_env_var(app_name)
# Build the layers (highest priority first).
layers: list[tuple[str, LLMLayer]] = []
# 1. CLI flags
layers.append(("CLI flag", LLMLayer(provider=cli_provider, model=cli_model)))
# 2. Env var (model only)
env_model = os.environ.get(env_var) or None
layers.append((f"env {env_var}", LLMLayer(model=env_model)))
# 3. User preference
layers.append((
"user preference",
_read_llm_section(user_cfg, "preference"),
))
# 4. Directory preference
if dir_path:
layers.append((
"directory preference",
_read_llm_section(dir_path, "preference"),
))
# 5. Directory default
if dir_path:
layers.append((
"directory default",
_read_llm_section(dir_path, "default"),
))
# 6. User default
layers.append((
"user default",
_read_llm_section(user_cfg, "default"),
))
# 7. Hardcoded
layers.append(("hardcoded", LLMLayer(provider=HARDCODED_PROVIDER, model=HARDCODED_MODEL)))
# Resolve provider and model independently (first non-None wins).
provider = HARDCODED_PROVIDER
provider_source = "hardcoded"
model = HARDCODED_MODEL
model_source = "hardcoded"
for source, layer in layers:
if layer.provider:
provider = layer.provider
provider_source = source
break
for source, layer in layers:
if layer.model:
model = layer.model
model_source = source
break
return ResolvedLLM(
provider=provider,
model=model,
provider_source=provider_source,
model_source=model_source,
)
def get_default_layers(app_name: str = "markitect") -> list[tuple[str, LLMLayer]]:
"""Return only the default layers for display."""
dir_path = _dir_config_path(app_name)
user_cfg = _user_config_path(app_name)
dir_cfg_name = _dir_config_name(app_name)
layers: list[tuple[str, LLMLayer]] = []
if dir_path:
layers.append((
f"Directory default ({dir_cfg_name})",
_read_llm_section(dir_path, "default"),
))
layers.append((
f"User default ({user_cfg})",
_read_llm_section(user_cfg, "default"),
))
layers.append((
"Hardcoded",
LLMLayer(provider=HARDCODED_PROVIDER, model=HARDCODED_MODEL),
))
return layers
def get_preference_layers(app_name: str = "markitect") -> list[tuple[str, LLMLayer]]:
"""Return only the preference layers for display."""
dir_path = _dir_config_path(app_name)
user_cfg = _user_config_path(app_name)
dir_cfg_name = _dir_config_name(app_name)
layers: list[tuple[str, LLMLayer]] = []
layers.append((
f"User preference ({user_cfg})",
_read_llm_section(user_cfg, "preference"),
))
if dir_path:
layers.append((
f"Directory preference ({dir_cfg_name})",
_read_llm_section(dir_path, "preference"),
))
return layers

21
pyproject.toml Normal file
View File

@@ -0,0 +1,21 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.backends.legacy:build"
[project]
name = "llm-connect"
version = "0.1.0"
description = "Pluggable LLM adapters for OpenRouter, Gemini, OpenAI and Claude Code CLI"
requires-python = ">=3.10"
dependencies = [
"toml",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
]
[tool.setuptools.packages.find]
where = ["."]
include = ["llm_connect*"]