Files
artifact-store/src/artifactstore/app.py

89 lines
3.2 KiB
Python

"""Composition root: build the runtime registry from settings.
The HTTP server, the CLI, and any future host all instantiate the
:class:`Registry` through :func:`build_registry`. Wiring lives here so the
control-plane consumers stay thin (per ADR-0004).
"""
from __future__ import annotations
from collections.abc import Callable
from artifactstore.config import Settings, get_settings, resolve_secret_ref
from artifactstore.dataplane import InProcessDataPlane
from artifactstore.db.engine import create_engine
from artifactstore.events import RegistryViewWriter
from artifactstore.registry import Registry
from artifactstore.retention import RetentionPolicy
from artifactstore.storage import LocalBackend, S3Backend, S3BackendConfig, StorageBackend
__all__ = ["build_registry"]
def build_registry(settings: Settings | None = None) -> Registry:
"""Wire engine, local FS backend, in-process data plane, and registry."""
effective = settings or get_settings()
engine = create_engine(effective)
backends = _build_backends(effective)
dataplane = InProcessDataPlane(
backends,
default_backend_id=effective.storage_default_backend,
)
view_writer = RegistryViewWriter()
retention_policy = RetentionPolicy.from_toml(effective.retention_config_path)
return Registry(
engine,
dataplane,
view_writer,
retention_policy,
backend_selector=_backend_selector(effective),
)
def _build_backends(settings: Settings) -> dict[str, StorageBackend]:
configured = settings.configured_backend_ids or ("local",)
backends: dict[str, StorageBackend] = {}
if "local" in configured:
backends["local"] = LocalBackend(settings.storage_local_root, backend_id="local")
if "s3" in configured:
access_key = (
resolve_secret_ref(settings.s3_access_key_ref)
if settings.s3_access_key_ref
else None
)
secret_key = (
resolve_secret_ref(settings.s3_secret_key_ref)
if settings.s3_secret_key_ref
else None
)
backends["s3"] = S3Backend(
S3BackendConfig(
endpoint_url=settings.s3_endpoint_url,
region=settings.s3_region,
bucket=settings.s3_bucket,
key_prefix=settings.s3_key_prefix,
access_key_id=access_key,
secret_access_key=secret_key,
storage_class=settings.s3_storage_class or None,
sse=settings.s3_sse or None,
multipart_threshold_bytes=settings.s3_multipart_threshold_bytes,
multipart_chunk_bytes=settings.s3_multipart_chunk_bytes,
)
)
unknown = set(configured) - set(backends)
if unknown:
raise ValueError(f"unknown storage backend ids: {sorted(unknown)}")
return backends
def _backend_selector(settings: Settings) -> Callable[[str, str], str | None]:
routes = settings.backend_routes
def select(producer: str, retention_class: str) -> str | None:
for route in routes:
if route.matches(producer=producer, retention_class=retention_class):
return route.backend_id
return settings.storage_default_backend
return select