"""FastAPI application — HTTP surface for the registry.""" from __future__ import annotations import asyncio import re import secrets from collections.abc import AsyncIterator, Mapping from contextlib import asynccontextmanager, suppress from dataclasses import dataclass from datetime import UTC, datetime from email import policy from email.parser import BytesParser from http import HTTPStatus from typing import Any, NoReturn, cast from uuid import UUID, uuid4 import cbor2 import jcs from fastapi import Depends, FastAPI, Header, HTTPException, Query, Request, Response, status from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel, Field from artifactstore import __version__ from artifactstore.app import build_registry from artifactstore.config import Settings, get_settings from artifactstore.events.model import Event from artifactstore.registry import ( DuplicateRelativePathError, FileNotFoundError, FileRecord, IllegalPackageStateError, PackageNotFoundError, PackageRecord, Registry, RetentionClassRecord, RetentionStateError, RetentionStateRecord, ) __all__ = ["app", "create_app"] _RANGE_RE = re.compile(r"^bytes=(?P\d*)-(?P\d*)$") _CONTENT_RANGE_RE = re.compile(r"^bytes (?P\d+)-(?P\d+)/(?P\d+|\*)$") _MAX_EVENT_LIMIT = 500 class PackageCreate(BaseModel): name: str = Field(min_length=1) producer: str = Field(min_length=1) subject: str = Field(min_length=1) retention_class: str = Field(min_length=1) metadata: dict[str, Any] = Field(default_factory=dict) metadata_schema_slug: str | None = None class MetadataSchemaCreate(BaseModel): slug: str = Field(min_length=1) json_schema: dict[str, Any] class UploadCreate(BaseModel): expected_size_bytes: int | None = Field(default=None, ge=0) media_type: str | None = None class UploadComplete(BaseModel): package_id: UUID relative_path: str = Field(min_length=1) media_type: str | None = None class RetentionExtensionCreate(BaseModel): new_expires_at: datetime reason: str = Field(min_length=1) class RetentionHoldCreate(BaseModel): reason: str = Field(min_length=1) class RetentionHoldRelease(BaseModel): reason: str = Field(min_length=1) @dataclass(slots=True) class MultipartFile: filename: str content_type: str content: bytes fields: dict[str, str] @dataclass(slots=True) class UploadSession: upload_id: UUID buffer: bytearray expected_size_bytes: int | None media_type: str | None status: str = "open" def get_registry(request: Request) -> Registry: return request.app.state.registry # type: ignore[no-any-return] def get_http_settings(request: Request) -> Settings: return request.app.state.settings # type: ignore[no-any-return] async def require_read_auth( request: Request, authorization: str | None = Header(default=None, alias="Authorization"), ) -> str: settings = get_http_settings(request) if settings.anon_read: return "anonymous" return _require_bearer(settings, authorization) async def require_write_auth( request: Request, authorization: str | None = Header(default=None, alias="Authorization"), ) -> str: settings = get_http_settings(request) return _require_bearer(settings, authorization) def create_app(settings: Settings | None = None) -> FastAPI: """Build the FastAPI app. Lifespan owns the registry instance.""" effective_settings = settings or get_settings() @asynccontextmanager async def lifespan(application: FastAPI) -> AsyncIterator[None]: registry = build_registry(effective_settings) application.state.registry = registry application.state.settings = effective_settings application.state.upload_sessions = {} application.state.upload_lock = asyncio.Lock() try: yield finally: await registry.dispose() application = FastAPI( title="artifact-store", version=__version__, lifespan=lifespan, ) @application.exception_handler(HTTPException) async def http_exception_handler(_request: Request, exc: HTTPException) -> JSONResponse: return _problem_response( status_code=exc.status_code, title=_status_phrase(exc.status_code), detail=str(exc.detail), headers=exc.headers, ) @application.exception_handler(RequestValidationError) async def validation_exception_handler( _request: Request, exc: RequestValidationError, ) -> JSONResponse: return _problem_response( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, title="Validation error", detail=str(exc.errors()), ) @application.get("/") def root() -> dict[str, str]: return { "service": "artifact-store", "version": __version__, "status": "scaffold", } @application.get("/health") async def health(registry: Registry = Depends(get_registry)) -> dict[str, Any]: db_ok, db_detail = await registry.db_health() backend_status = await registry.backend_health() failed_storage_locations = await registry.failed_storage_locations_count() overall = ( "ok" if db_ok and backend_status.healthy and failed_storage_locations == 0 else "degraded" ) return { "service": "artifact-store", "version": __version__, "status": overall, "db": {"healthy": db_ok, "detail": db_detail}, "backend": { "backend_id": backend_status.backend_id, "healthy": backend_status.healthy, "detail": backend_status.detail, "free_bytes": backend_status.free_bytes, "total_bytes": backend_status.total_bytes, }, "storage": {"failed_locations": failed_storage_locations}, } @application.get("/backends") async def backends( _actor: str = Depends(require_read_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: statuses = await registry.backend_health_all() return { "backends": [ { "backend_id": backend_status.backend_id, "healthy": backend_status.healthy, "detail": backend_status.detail, "free_bytes": backend_status.free_bytes, "total_bytes": backend_status.total_bytes, } for backend_status in statuses ] } @application.get("/retention-classes") async def retention_classes( _actor: str = Depends(require_read_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: classes = await registry.list_retention_classes() return {"retention_classes": [_retention_class_dict(c) for c in classes]} @application.post("/metadata-schemas", status_code=status.HTTP_201_CREATED) async def register_metadata_schema( body: MetadataSchemaCreate, _actor: str = Depends(require_write_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: schema_id = await registry.register_metadata_schema( slug=body.slug, json_schema=body.json_schema, ) schema = await registry.get_metadata_schema(body.slug) return { "id": str(schema_id), "slug": schema.slug, "json_schema": schema.json_schema, "created_at": _iso(schema.created_at), } @application.post("/packages", status_code=status.HTTP_201_CREATED) async def create_package( body: PackageCreate, actor: str = Depends(require_write_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: try: package_id = await registry.create_package( name=body.name, producer=body.producer, subject=body.subject, retention_class=body.retention_class, actor=actor, metadata=body.metadata, metadata_schema_slug=body.metadata_schema_slug, ) return _package_dict(await registry.get_package(package_id)) except ValueError as exc: _raise_problem(status.HTTP_400_BAD_REQUEST, str(exc)) @application.get("/packages") async def list_packages( producer: str | None = None, subject: str | None = None, retention_class: str | None = None, metadata_key: str | None = None, metadata_value: str | None = None, _actor: str = Depends(require_read_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: packages = await registry.list_packages( producer=producer, subject=subject, retention_class=retention_class, metadata_key=metadata_key, metadata_value=metadata_value, ) return {"packages": [_package_dict(p) for p in packages]} @application.get("/packages/{package_id}") async def get_package( package_id: UUID, _actor: str = Depends(require_read_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: try: return _package_dict(await registry.get_package(package_id)) except PackageNotFoundError as exc: _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) @application.post("/packages/{package_id}/files", status_code=status.HTTP_201_CREATED) async def upload_package_file( package_id: UUID, request: Request, actor: str = Depends(require_write_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: multipart = await _read_single_file_multipart(request) relative_path = multipart.fields.get("relative_path") or multipart.filename media_type = multipart.fields.get("media_type") or multipart.content_type try: file_id = await registry.ingest_file( package_id, relative_path=relative_path, media_type=media_type or "application/octet-stream", stream=_bytes_stream(multipart.content), actor=actor, ) return _file_dict(await registry.get_file_metadata(file_id)) except PackageNotFoundError as exc: _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) except DuplicateRelativePathError as exc: _raise_problem(status.HTTP_409_CONFLICT, str(exc)) except IllegalPackageStateError as exc: _raise_problem(status.HTTP_409_CONFLICT, str(exc)) @application.post("/packages/{package_id}/finalize") async def finalize_package( package_id: UUID, actor: str = Depends(require_write_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: try: await registry.finalize_package(package_id, actor=actor) return _package_dict(await registry.get_package(package_id)) except PackageNotFoundError as exc: _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) except IllegalPackageStateError as exc: _raise_problem(status.HTTP_409_CONFLICT, str(exc)) @application.post("/packages/{package_id}/retention/extensions") async def extend_retention( package_id: UUID, body: RetentionExtensionCreate, actor: str = Depends(require_write_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: try: state = await registry.extend_retention( package_id, new_expires_at=body.new_expires_at, reason=body.reason, actor=actor, ) return _retention_state_dict(state) except PackageNotFoundError as exc: _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) except RetentionStateError as exc: _raise_problem(status.HTTP_409_CONFLICT, str(exc)) @application.post("/packages/{package_id}/retention/holds", status_code=status.HTTP_201_CREATED) async def apply_hold( package_id: UUID, body: RetentionHoldCreate, actor: str = Depends(require_write_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: try: hold_id = await registry.apply_retention_hold( package_id, reason=body.reason, actor=actor, ) state = await registry.get_retention_state(package_id) return {"hold_id": str(hold_id), "retention": _retention_state_dict(state)} except PackageNotFoundError as exc: _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) except RetentionStateError as exc: _raise_problem(status.HTTP_409_CONFLICT, str(exc)) @application.post("/packages/{package_id}/retention/holds/{hold_id}/release") async def release_hold( package_id: UUID, hold_id: UUID, body: RetentionHoldRelease, actor: str = Depends(require_write_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: try: state = await registry.release_retention_hold( package_id, hold_id, reason=body.reason, actor=actor, ) return _retention_state_dict(state) except PackageNotFoundError as exc: _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) except RetentionStateError as exc: _raise_problem(status.HTTP_409_CONFLICT, str(exc)) @application.get("/packages/{package_id}/retention/history") async def retention_history( package_id: UUID, request: Request, _actor: str = Depends(require_read_auth), registry: Registry = Depends(get_registry), ) -> Response: try: history = await registry.retention_history(package_id) except PackageNotFoundError as exc: _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) payload = {"events": [_event_dict(event) for event in history]} accept = request.headers.get("accept", "") if "application/cbor" in accept: return Response( content=cbor2.dumps(payload, canonical=True), media_type="application/cbor", ) return Response(content=jcs.canonicalize(payload), media_type="application/json") @application.get("/packages/{package_id}/manifest") async def get_manifest_cbor( package_id: UUID, _actor: str = Depends(require_read_auth), registry: Registry = Depends(get_registry), ) -> Response: try: payload = await registry.get_manifest_bytes(package_id, format="cbor") except PackageNotFoundError as exc: _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) return Response(content=payload, media_type="application/cbor") @application.get("/packages/{package_id}/manifest.json") async def get_manifest_json( package_id: UUID, _actor: str = Depends(require_read_auth), registry: Registry = Depends(get_registry), ) -> Response: try: payload = await registry.get_manifest_bytes(package_id, format="json") except PackageNotFoundError as exc: _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) return Response(content=payload, media_type="application/json") @application.get("/files/{file_id}") async def get_file_metadata( file_id: UUID, _actor: str = Depends(require_read_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: try: return _file_dict(await registry.get_file_metadata(file_id)) except FileNotFoundError as exc: _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) @application.get("/files/{file_id}/download") async def download_file( file_id: UUID, range_header: str | None = Header(default=None, alias="Range"), if_none_match: str | None = Header(default=None, alias="If-None-Match"), _actor: str = Depends(require_read_auth), registry: Registry = Depends(get_registry), ) -> Response: try: file_record = await registry.get_file_metadata(file_id) except FileNotFoundError as exc: _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) etag = _etag(file_record.content_address) headers = { "Accept-Ranges": "bytes", "ETag": etag, "Content-Type": file_record.media_type, } if if_none_match and _etag_matches(if_none_match, file_record.content_address): return Response(status_code=status.HTTP_304_NOT_MODIFIED, headers=headers) byte_range = _parse_range(range_header, file_record.size_bytes) status_code = status.HTTP_200_OK if byte_range is None: headers["Content-Length"] = str(file_record.size_bytes) else: start, end = byte_range headers["Content-Range"] = f"bytes {start}-{end}/{file_record.size_bytes}" headers["Content-Length"] = str(end - start + 1) status_code = status.HTTP_206_PARTIAL_CONTENT stream = await registry.get_file(file_id, byte_range=byte_range) return StreamingResponse(stream, status_code=status_code, headers=headers) @application.post("/uploads", status_code=status.HTTP_201_CREATED) async def open_upload( body: UploadCreate, request: Request, _actor: str = Depends(require_write_auth), ) -> dict[str, Any]: upload_id = uuid4() session = UploadSession( upload_id=upload_id, buffer=bytearray(), expected_size_bytes=body.expected_size_bytes, media_type=body.media_type, ) async with request.app.state.upload_lock: request.app.state.upload_sessions[str(upload_id)] = session return { "upload_id": str(upload_id), "status": session.status, "content_upload_url": f"/uploads/{upload_id}", } @application.patch("/uploads/{upload_id}") async def patch_upload( upload_id: UUID, request: Request, content_range: str | None = Header(default=None, alias="Content-Range"), _actor: str = Depends(require_write_auth), ) -> dict[str, Any]: if content_range is None: _raise_problem(status.HTTP_400_BAD_REQUEST, "Content-Range header is required") body = await request.body() start, end, total = _parse_content_range(content_range) if end - start + 1 != len(body): _raise_problem( status.HTTP_400_BAD_REQUEST, "Content-Range length does not match request body length", ) async with request.app.state.upload_lock: session = _upload_session(request, upload_id) if start != len(session.buffer): _raise_problem( status.HTTP_409_CONFLICT, f"upload offset mismatch: expected {len(session.buffer)}, got {start}", ) session.buffer.extend(body) if total is not None and len(session.buffer) > total: _raise_problem(status.HTTP_400_BAD_REQUEST, "upload exceeds declared total size") session.status = ( "uploaded" if total is not None and len(session.buffer) == total else "open" ) return { "upload_id": str(upload_id), "status": session.status, "received_bytes": len(session.buffer), "expected_size_bytes": total, } @application.post("/uploads/{upload_id}/complete", status_code=status.HTTP_201_CREATED) async def complete_upload( upload_id: UUID, body: UploadComplete, request: Request, actor: str = Depends(require_write_auth), registry: Registry = Depends(get_registry), ) -> dict[str, Any]: async with request.app.state.upload_lock: session = _upload_session(request, upload_id) payload = bytes(session.buffer) media_type = body.media_type or session.media_type or "application/octet-stream" try: file_id = await registry.ingest_file( body.package_id, relative_path=body.relative_path, media_type=media_type, stream=_bytes_stream(payload), actor=actor, ) except PackageNotFoundError as exc: _raise_problem(status.HTTP_404_NOT_FOUND, str(exc)) except DuplicateRelativePathError as exc: _raise_problem(status.HTTP_409_CONFLICT, str(exc)) except IllegalPackageStateError as exc: _raise_problem(status.HTTP_409_CONFLICT, str(exc)) async with request.app.state.upload_lock: request.app.state.upload_sessions.pop(str(upload_id), None) return _file_dict(await registry.get_file_metadata(file_id)) @application.get("/events") async def events( request: Request, since: int = Query(default=0, ge=0), limit: int = Query(default=100, ge=1, le=_MAX_EVENT_LIMIT), wait_seconds: float = Query(default=1.0, ge=0.0, le=30.0), _actor: str = Depends(require_read_auth), registry: Registry = Depends(get_registry), ) -> Response: batch = await _long_poll_events( registry, since_sequence=since, limit=limit, wait_seconds=wait_seconds, ) payload = {"events": [_event_dict(e) for e in batch]} accept = request.headers.get("accept", "") if "application/json" in accept: return Response(content=jcs.canonicalize(payload), media_type="application/json") return Response(content=cbor2.dumps(payload, canonical=True), media_type="application/cbor") return application def _require_bearer(settings: Settings, authorization: str | None) -> str: tokens = settings.bearer_tokens if not tokens: _raise_problem( status.HTTP_401_UNAUTHORIZED, "No bearer tokens are configured", headers={"WWW-Authenticate": "Bearer"}, ) if authorization is None or not authorization.lower().startswith("bearer "): _raise_problem( status.HTTP_401_UNAUTHORIZED, "Bearer token required", headers={"WWW-Authenticate": "Bearer"}, ) supplied = authorization.partition(" ")[2].strip() if not any(secrets.compare_digest(supplied, token) for token in tokens): _raise_problem( status.HTTP_401_UNAUTHORIZED, "Invalid bearer token", headers={"WWW-Authenticate": "Bearer"}, ) return "bearer" def _raise_problem( status_code: int, detail: str, *, headers: Mapping[str, str] | None = None, ) -> NoReturn: raise HTTPException(status_code=status_code, detail=detail, headers=headers) def _problem_response( *, status_code: int, title: str, detail: str, headers: Mapping[str, str] | None = None, ) -> JSONResponse: return JSONResponse( status_code=status_code, content={ "type": "about:blank", "title": title, "status": status_code, "detail": detail, }, media_type="application/problem+json", headers=headers, ) def _status_phrase(status_code: int) -> str: try: return HTTPStatus(status_code).phrase except ValueError: return "HTTP error" def _package_dict(record: PackageRecord) -> dict[str, Any]: return { "id": str(record.id), "name": record.name, "producer": record.producer, "subject": record.subject, "retention_class": record.retention_class, "metadata_schema_id": str(record.metadata_schema_id) if record.metadata_schema_id else None, "metadata": record.metadata, "status": record.status, "manifest_digest": f"blake3:{record.manifest_digest_hex}" if record.manifest_digest_hex else None, "created_at": _iso(record.created_at), "finalized_at": _iso(record.finalized_at), "expires_at": _iso(record.expires_at), "last_event_sequence": record.last_event_sequence, } def _file_dict(record: FileRecord) -> dict[str, Any]: return { "id": str(record.id), "package_id": str(record.package_id), "relative_path": record.relative_path, "media_type": record.media_type, "size_bytes": record.size_bytes, "digest_algorithm": record.digest_algorithm, "digest_primary_hex": record.digest_primary_hex, "digest_sha256_hex": record.digest_sha256_hex, "content_address": record.content_address, "created_at": _iso(record.created_at), "storage": { "backend_id": record.backend_id, "object_key": record.object_key, "retrieval_tier": record.retrieval_tier, "status": record.storage_status, }, } def _retention_class_dict(record: RetentionClassRecord) -> dict[str, Any]: return { "class_id": record.class_id, "default_duration_seconds": record.default_duration_seconds, "deletion_strategy": record.deletion_strategy, } def _retention_state_dict(record: RetentionStateRecord) -> dict[str, Any]: return { "package_id": str(record.package_id), "current_expires_at": _iso(record.current_expires_at), "effective_class": record.effective_class, "active_hold_id": str(record.active_hold_id) if record.active_hold_id else None, "eligible_for_deletion": record.eligible_for_deletion, } def _event_dict(event: Event) -> dict[str, Any]: payload = cbor2.loads(event.payload) return { "sequence": event.sequence, "created_at": _iso(event.created_at), "event_type": event.event_type, "subject_kind": event.subject_kind, "subject_id": str(event.subject_id) if event.subject_id else None, "actor": event.actor, "payload_digest_hex": event.payload_digest.hex(), "payload": payload, } def _iso(value: datetime | None) -> str | None: if value is None: return None if value.tzinfo is None: value = value.replace(tzinfo=UTC) return value.isoformat() async def _read_single_file_multipart(request: Request) -> MultipartFile: content_type = request.headers.get("content-type", "") if not content_type.lower().startswith("multipart/form-data"): _raise_problem(status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, "multipart/form-data is required") raw = await request.body() message = cast( Any, BytesParser(policy=policy.default).parsebytes( f"Content-Type: {content_type}\r\nMIME-Version: 1.0\r\n\r\n".encode() + raw ), ) if not message.is_multipart(): _raise_problem(status.HTTP_400_BAD_REQUEST, "request body is not multipart") fields: dict[str, str] = {} chosen: tuple[str, str, bytes] | None = None for part in message.iter_parts(): if part.get_content_disposition() != "form-data": continue name = cast(str | None, part.get_param("name", header="content-disposition")) filename = part.get_filename() payload = cast(bytes, part.get_payload(decode=True) or b"") if filename is None: if name is not None: fields[name] = payload.decode(part.get_content_charset() or "utf-8") continue if chosen is None or name == "file": chosen = (filename, part.get_content_type(), payload) if chosen is None: _raise_problem(status.HTTP_400_BAD_REQUEST, "multipart file part is required") filename, part_content_type, payload = chosen return MultipartFile( filename=filename, content_type=part_content_type or "application/octet-stream", content=payload, fields=fields, ) async def _bytes_stream(data: bytes) -> AsyncIterator[bytes]: yield data def _etag(content_address: str) -> str: return f'"{content_address}"' def _etag_matches(header_value: str, content_address: str) -> bool: candidates = [v.strip() for v in header_value.split(",")] return ( "*" in candidates or content_address in candidates or _etag(content_address) in candidates ) def _parse_range(header_value: str | None, size_bytes: int) -> tuple[int, int] | None: if header_value is None: return None match = _RANGE_RE.match(header_value.strip()) if match is None: _raise_problem( status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, "only a single bytes range is supported", ) start_raw = match.group("start") end_raw = match.group("end") if size_bytes <= 0: _raise_problem(status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, "empty file has no range") if start_raw == "": if end_raw == "": _raise_problem(status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, "invalid range") suffix = int(end_raw) if suffix <= 0: _raise_problem(status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, "invalid suffix range") start = max(size_bytes - suffix, 0) end = size_bytes - 1 else: start = int(start_raw) end = int(end_raw) if end_raw else size_bytes - 1 if start >= size_bytes or end < start: _raise_problem(status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE, "range not satisfiable") return start, min(end, size_bytes - 1) def _parse_content_range(header_value: str) -> tuple[int, int, int | None]: match = _CONTENT_RANGE_RE.match(header_value.strip()) if match is None: _raise_problem(status.HTTP_400_BAD_REQUEST, "invalid Content-Range header") start = int(match.group("start")) end = int(match.group("end")) total_raw = match.group("total") total = None if total_raw == "*" else int(total_raw) if end < start: _raise_problem(status.HTTP_400_BAD_REQUEST, "invalid Content-Range bounds") if total is not None and end >= total: _raise_problem(status.HTTP_400_BAD_REQUEST, "Content-Range exceeds total size") return start, end, total def _upload_session(request: Request, upload_id: UUID) -> UploadSession: session = request.app.state.upload_sessions.get(str(upload_id)) if session is None: _raise_problem(status.HTTP_404_NOT_FOUND, f"upload not found: {upload_id}") return session # type: ignore[no-any-return] async def _long_poll_events( registry: Registry, *, since_sequence: int, limit: int, wait_seconds: float, ) -> list[Event]: batch = await registry.fetch_events(since_sequence=since_sequence, limit=limit) if batch or wait_seconds == 0: return batch collected: list[Event] = [] async def collect() -> None: async for event in registry.tail_events( since_sequence=since_sequence, poll_interval_seconds=0.05, ): collected.append(event) if len(collected) >= limit: break with suppress(TimeoutError): await asyncio.wait_for(collect(), timeout=wait_seconds) return collected app = create_app()