feat(llm): add embedding adapter with cache and similarity utils (S1.3)

Add OpenAI-compatible embedding support (works with both OpenAI and
OpenRouter), file-based embedding cache with content-digest invalidation,
and pure-Python cosine similarity utilities for downstream redundancy
detection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-19 01:22:21 +01:00
parent 9031e1162c
commit 267368eb60
7 changed files with 588 additions and 0 deletions

View File

@@ -26,6 +26,15 @@ from markitect.llm.exceptions import (
LLMTimeoutError,
LLMSubprocessError,
)
from markitect.llm.embedding_adapter import EmbeddingAdapter
from markitect.llm.embedding_openai import OpenAICompatibleEmbeddingAdapter
from markitect.llm.embedding_cache import EmbeddingCache
from markitect.llm.embedding_factory import create_embedding_adapter
from markitect.llm.similarity import (
cosine_similarity,
similarity_matrix,
find_similar_pairs,
)
__all__ = [
"create_adapter",
@@ -41,4 +50,11 @@ __all__ = [
"LLMRateLimitError",
"LLMTimeoutError",
"LLMSubprocessError",
"EmbeddingAdapter",
"OpenAICompatibleEmbeddingAdapter",
"EmbeddingCache",
"create_embedding_adapter",
"cosine_similarity",
"similarity_matrix",
"find_similar_pairs",
]

View File

@@ -0,0 +1,34 @@
"""
Abstract base class for embedding adapters.
Embedding adapters convert text into float vectors. This is a separate
hierarchy from :class:`LLMAdapter` (text generation) because the API
contract is fundamentally different: text in, float vectors out.
"""
from abc import ABC, abstractmethod
class EmbeddingAdapter(ABC):
"""Base class for all embedding adapters."""
@abstractmethod
def embed(self, texts: list[str]) -> list[list[float]]:
"""Embed a batch of texts into vectors.
Args:
texts: One or more strings to embed.
Returns:
A list of embedding vectors, one per input text,
in the same order as *texts*.
"""
@abstractmethod
def validate(self) -> bool:
"""Check that the adapter is configured correctly.
Returns:
``True`` if the adapter has a valid configuration
(e.g. API key present), ``False`` otherwise.
"""

View File

@@ -0,0 +1,64 @@
"""
File-based embedding cache.
Stores embedding vectors in a single JSON file keyed by entity slug.
Each entry includes a content digest so stale embeddings are
automatically invalidated when entity content changes.
"""
import json
from pathlib import Path
from typing import Optional
class EmbeddingCache:
"""Persistent cache for embedding vectors.
Structure on disk (``embeddings.json``)::
{
"division-of-labour": {"digest": "abc123", "vector": [0.1, ...]},
...
}
"""
def __init__(self, cache_dir: Path):
self._path = cache_dir / "embeddings.json"
self._data: dict[str, dict] = {}
self._hits = 0
self._misses = 0
self._load()
def get(self, slug: str, content_digest: str) -> Optional[list[float]]:
"""Return the cached vector if *content_digest* matches, else ``None``."""
entry = self._data.get(slug)
if entry is not None and entry.get("digest") == content_digest:
self._hits += 1
return entry["vector"]
self._misses += 1
return None
def put(self, slug: str, content_digest: str, vector: list[float]) -> None:
"""Store or overwrite the embedding for *slug*."""
self._data[slug] = {"digest": content_digest, "vector": vector}
def save(self) -> None:
"""Write cache to disk."""
self._path.parent.mkdir(parents=True, exist_ok=True)
self._path.write_text(json.dumps(self._data, separators=(",", ":")))
def stats(self) -> dict:
"""Return cache statistics."""
return {
"entries": len(self._data),
"hits": self._hits,
"misses": self._misses,
}
def _load(self) -> None:
"""Read cache from disk if it exists."""
if self._path.is_file():
try:
self._data = json.loads(self._path.read_text())
except (json.JSONDecodeError, OSError):
self._data = {}

View File

@@ -0,0 +1,50 @@
"""
Factory for creating embedding adapters by provider name.
"""
from typing import Optional, Any
from markitect.llm.embedding_adapter import EmbeddingAdapter
from markitect.llm.exceptions import LLMConfigurationError
_EMBEDDING_PROVIDERS = {
"openai": "markitect.llm.embedding_openai.OpenAICompatibleEmbeddingAdapter",
"openrouter": "markitect.llm.embedding_openai.OpenAICompatibleEmbeddingAdapter",
}
def create_embedding_adapter(
provider: str = "openai",
model: Optional[str] = None,
api_key: Optional[str] = None,
**kwargs: Any,
) -> EmbeddingAdapter:
"""Instantiate an :class:`EmbeddingAdapter` for the given *provider*.
Args:
provider: ``"openai"`` or ``"openrouter"``.
model: Embedding model name (e.g. ``"text-embedding-3-small"``).
api_key: Explicit API key.
**kwargs: Extra keyword arguments forwarded to the adapter.
Returns:
A ready-to-use :class:`EmbeddingAdapter` instance.
Raises:
LLMConfigurationError: If *provider* is not recognised.
"""
if provider not in _EMBEDDING_PROVIDERS:
known = ", ".join(sorted(_EMBEDDING_PROVIDERS))
raise LLMConfigurationError(
f"Unknown embedding provider {provider!r}. Choose from: {known}",
context={"provider": provider},
)
# Lazy import
fqn = _EMBEDDING_PROVIDERS[provider]
module_path, class_name = fqn.rsplit(".", 1)
import importlib
mod = importlib.import_module(module_path)
cls = getattr(mod, class_name)
return cls(model=model, api_key=api_key, provider=provider, **kwargs)

View File

@@ -0,0 +1,125 @@
"""
OpenAI-compatible embedding adapter.
Works with both OpenAI (``/v1/embeddings``) and OpenRouter
(``/api/v1/embeddings``) since they share the same API format.
The *provider* parameter determines the default base URL and
API key environment variable.
"""
import time
from typing import Optional, Dict, Any
from markitect.llm.embedding_adapter import EmbeddingAdapter
from markitect.llm.config import resolve_api_key, find_project_root
from markitect.llm._http import post_json
from markitect.llm.exceptions import (
LLMConfigurationError,
LLMAPIError,
LLMRateLimitError,
)
_DEFAULT_MODEL = "text-embedding-3-small"
_PROVIDER_DEFAULTS: Dict[str, Dict[str, str]] = {
"openai": {
"api_base": "https://api.openai.com/v1",
"env_var": "OPENAI_API_KEY",
},
"openrouter": {
"api_base": "https://openrouter.ai/api/v1",
"env_var": "OPENROUTER_API_KEY",
},
}
class OpenAICompatibleEmbeddingAdapter(EmbeddingAdapter):
"""Embedding adapter for OpenAI-compatible endpoints.
A single class handles both OpenAI and OpenRouter because they
expose the same ``/embeddings`` endpoint format.
"""
def __init__(
self,
model: Optional[str] = None,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
provider: str = "openai",
max_retries: int = 3,
):
if provider not in _PROVIDER_DEFAULTS:
known = ", ".join(sorted(_PROVIDER_DEFAULTS))
raise LLMConfigurationError(
f"Unknown embedding provider {provider!r}. Choose from: {known}",
context={"provider": provider},
)
defaults = _PROVIDER_DEFAULTS[provider]
self._model = model or _DEFAULT_MODEL
self._api_base = (api_base or defaults["api_base"]).rstrip("/")
self._max_retries = max_retries
self._provider = provider
# Resolve API key
env_var = defaults["env_var"]
root = find_project_root()
key_file_paths = [root / f"apikey-{provider}.txt"] if root else []
self._api_key = resolve_api_key(
explicit=api_key,
env_var=env_var,
key_file_paths=key_file_paths,
)
def embed(self, texts: list[str]) -> list[list[float]]:
"""Embed texts via the OpenAI-compatible ``/embeddings`` endpoint.
Raises:
LLMConfigurationError: If no API key is configured.
LLMAPIError: On HTTP errors after retries are exhausted.
"""
if not self._api_key:
raise LLMConfigurationError(
"No API key configured for embedding adapter",
context={"provider": self._provider},
)
url = f"{self._api_base}/embeddings"
payload: Dict[str, Any] = {
"model": self._model,
"input": texts,
}
headers = {"Authorization": f"Bearer {self._api_key}"}
data = self._post_with_retries(url, payload, headers)
# Response: {"data": [{"embedding": [...], "index": 0}, ...]}
# Sort by index to guarantee input order.
items = sorted(data["data"], key=lambda d: d["index"])
return [item["embedding"] for item in items]
def validate(self) -> bool:
"""Return ``True`` if an API key is available."""
return self._api_key is not None
def _post_with_retries(
self,
url: str,
payload: Dict[str, Any],
headers: Dict[str, str],
) -> Dict[str, Any]:
last_exc: Optional[Exception] = None
for attempt in range(self._max_retries + 1):
try:
return post_json(url, payload, headers)
except LLMRateLimitError as exc:
last_exc = exc
if attempt < self._max_retries:
time.sleep(2 ** attempt)
except LLMAPIError as exc:
if exc.status_code >= 500 and attempt < self._max_retries:
last_exc = exc
time.sleep(2 ** attempt)
else:
raise
raise last_exc # type: ignore[misc]

View File

@@ -0,0 +1,64 @@
"""
Pure-Python vector similarity utilities.
No external dependencies — uses :mod:`math` only. Sufficient for the
current entity scale (~100s). numpy can be substituted later if needed.
"""
import math
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Cosine similarity between two vectors.
Returns a float in [-1, 1]. Returns 0.0 if either vector has
zero magnitude (to avoid division by zero).
"""
dot = sum(x * y for x, y in zip(a, b))
mag_a = math.sqrt(sum(x * x for x in a))
mag_b = math.sqrt(sum(x * x for x in b))
if mag_a == 0.0 or mag_b == 0.0:
return 0.0
return dot / (mag_a * mag_b)
def similarity_matrix(embeddings: list[list[float]]) -> list[list[float]]:
"""Build an NxN cosine similarity matrix.
``matrix[i][j]`` is the cosine similarity between
``embeddings[i]`` and ``embeddings[j]``.
"""
n = len(embeddings)
mat: list[list[float]] = [[0.0] * n for _ in range(n)]
for i in range(n):
mat[i][i] = 1.0
for j in range(i + 1, n):
sim = cosine_similarity(embeddings[i], embeddings[j])
mat[i][j] = sim
mat[j][i] = sim
return mat
def find_similar_pairs(
embeddings: dict[str, list[float]],
threshold: float = 0.80,
) -> list[tuple[str, str, float]]:
"""Find all pairs with cosine similarity >= *threshold*.
Args:
embeddings: Mapping of slug → embedding vector.
threshold: Minimum similarity to include (default 0.80).
Returns:
List of ``(slug_a, slug_b, similarity)`` tuples sorted by
similarity descending.
"""
slugs = sorted(embeddings)
pairs: list[tuple[str, str, float]] = []
for i, slug_a in enumerate(slugs):
for slug_b in slugs[i + 1:]:
sim = cosine_similarity(embeddings[slug_a], embeddings[slug_b])
if sim >= threshold:
pairs.append((slug_a, slug_b, sim))
pairs.sort(key=lambda t: t[2], reverse=True)
return pairs