Files
issue-core/issue_core/api/auth.py
tegwick b605d970e3 feat: rename to issue-core and add task ingestion endpoint
Renames the package, distribution, CLI alias, Makefile targets, and
working directory from issue-facade to issue-core, signalling its
role as the authoritative task lifecycle manager for the Coulomb org
(peer to activity-core, rules-core, project-core).

Adds POST /issues/ ingestion endpoint for activity-core's IssueSink,
under a new optional [api] extra. The endpoint is served by `issue
serve`, authenticates via the ISSUE_CORE_API_KEY env var (Bearer or
X-API-Key header), and routes the TaskSpec payload to the configured
default backend with full traceability metadata embedded in
sync_metadata.

- T01: Python package issue_tracker -> issue_core, dir rename
- T02: registered in state hub under custodian domain
- T03: INTENT.md (what it is, what it isn't, how it fits)
- T04: SCOPE.md (in/out-of-scope, integration boundaries)
- T05: POST /issues/ via FastAPI + Uvicorn, 9 unit tests
- T06: docs/nats-task-ingestion.md design stub

Closes ISSC-WP-0001.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 05:16:27 +02:00

67 lines
2.0 KiB
Python

"""
API key authentication for the issue-core REST API.
A single shared key is read from the ISSUE_CORE_API_KEY environment variable.
Clients send it either as `Authorization: Bearer <key>` or as `X-API-Key: <key>`.
If ISSUE_CORE_API_KEY is unset, the server refuses to start — the workplan
explicitly forbids an unauthenticated ingestion surface.
"""
import os
import secrets
from typing import Optional
from fastapi import Header, HTTPException, status
API_KEY_ENV_VAR = "ISSUE_CORE_API_KEY"
class AuthConfigError(RuntimeError):
"""Raised at startup when no API key is configured."""
def get_configured_api_key() -> str:
key = os.environ.get(API_KEY_ENV_VAR, "").strip()
if not key:
raise AuthConfigError(
f"{API_KEY_ENV_VAR} is not set. The ingestion endpoint requires an API key. "
f"Generate one with: python -c 'import secrets; print(secrets.token_urlsafe(32))'"
)
return key
def _extract_token(
authorization: Optional[str],
x_api_key: Optional[str],
) -> Optional[str]:
if x_api_key:
return x_api_key.strip()
if authorization:
scheme, _, value = authorization.partition(" ")
if scheme.lower() == "bearer" and value:
return value.strip()
return None
async def require_api_key(
authorization: Optional[str] = Header(default=None),
x_api_key: Optional[str] = Header(default=None, alias="X-API-Key"),
) -> None:
"""FastAPI dependency enforcing the shared API key."""
try:
expected = get_configured_api_key()
except AuthConfigError as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=str(exc),
)
presented = _extract_token(authorization, x_api_key)
if not presented or not secrets.compare_digest(presented, expected):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing or invalid API key.",
headers={"WWW-Authenticate": "Bearer"},
)