Files
issue-core/issue_core/api/ingest.py
tegwick b605d970e3 feat: rename to issue-core and add task ingestion endpoint
Renames the package, distribution, CLI alias, Makefile targets, and
working directory from issue-facade to issue-core, signalling its
role as the authoritative task lifecycle manager for the Coulomb org
(peer to activity-core, rules-core, project-core).

Adds POST /issues/ ingestion endpoint for activity-core's IssueSink,
under a new optional [api] extra. The endpoint is served by `issue
serve`, authenticates via the ISSUE_CORE_API_KEY env var (Bearer or
X-API-Key header), and routes the TaskSpec payload to the configured
default backend with full traceability metadata embedded in
sync_metadata.

- T01: Python package issue_tracker -> issue_core, dir rename
- T02: registered in state hub under custodian domain
- T03: INTENT.md (what it is, what it isn't, how it fits)
- T04: SCOPE.md (in/out-of-scope, integration boundaries)
- T05: POST /issues/ via FastAPI + Uvicorn, 9 unit tests
- T06: docs/nats-task-ingestion.md design stub

Closes ISSC-WP-0001.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 05:16:27 +02:00

140 lines
4.7 KiB
Python

"""
POST /issues/ — task ingestion endpoint.
Receives a TaskSpec payload (see schemas.TaskIngestionRequest) from an
authorized emitter, routes it to the configured backend, and returns the
created issue's id and (optional) URL.
Routing strategy (v1):
- Single default backend, looked up via cli.utils.get_default_backend().
- target_repo, triggering_event_id, source_*, activity_definition_id are
stored on the issue's sync_metadata for traceability back to the emitter.
- Per-target-repo routing is a planned follow-up; see SCOPE.md.
"""
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional, Tuple
from fastapi import APIRouter, Depends, HTTPException, status
from ..backends.gitea import GiteaBackend
from ..backends.local import LocalSQLiteBackend
from ..cli.utils import get_config_dir, load_backend_configs
from ..core.interfaces import BackendFactory, IssueBackend
from ..core.models import Issue, IssueState, Label
from .auth import require_api_key
from .schemas import BackendName, TaskIngestionRequest, TaskIngestionResponse
router = APIRouter()
BackendFactory.register_backend("local", LocalSQLiteBackend)
BackendFactory.register_backend("gitea", GiteaBackend)
_BACKEND_TYPE_TO_NAME: Dict[str, str] = {
"local": "sqlite",
"gitea": "gitea",
"github": "github",
}
def _resolve_backend() -> Tuple[IssueBackend, str]:
configs = load_backend_configs()
default_name = configs.get("default", "local")
if default_name not in configs:
if default_name == "local":
configs["local"] = {
"type": "local",
"db_path": str(get_config_dir() / "issues.db"),
}
else:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Default backend '{default_name}' is not configured.",
)
backend_config = configs[default_name]
backend_type = backend_config["type"]
try:
backend = BackendFactory.create_backend(backend_type)
backend.connect(backend_config)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Failed to connect to backend '{default_name}': {exc}",
)
return backend, backend_type
def _build_issue(payload: TaskIngestionRequest, backend_type: str) -> Issue:
now = datetime.now(timezone.utc)
labels = [Label(name=name) for name in payload.labels]
labels.append(Label(name=f"priority:{payload.priority}"))
labels.append(Label(name=f"source:{payload.source_type}"))
if payload.target_repo:
labels.append(Label(name=f"repo:{payload.target_repo}"))
ingestion_meta: Dict[str, Any] = {
"target_repo": payload.target_repo,
"source_type": payload.source_type,
"source_id": payload.source_id,
"triggering_event_id": str(payload.triggering_event_id),
"activity_definition_id": payload.activity_definition_id,
"ingested_at": now.isoformat(),
}
if payload.due_in_days is not None:
ingestion_meta["due_at"] = (now + timedelta(days=payload.due_in_days)).isoformat()
sync_metadata: Dict[str, Any] = {"ingestion": ingestion_meta}
return Issue(
id="",
number=0,
title=payload.title,
description=payload.description,
state=IssueState.OPEN,
created_at=now,
updated_at=now,
labels=labels,
backend_type=backend_type,
sync_metadata=sync_metadata,
)
@router.post(
"/issues/",
response_model=TaskIngestionResponse,
status_code=status.HTTP_201_CREATED,
dependencies=[Depends(require_api_key)],
summary="Ingest a task from an external emitter (e.g. activity-core).",
)
async def ingest_task(payload: TaskIngestionRequest) -> TaskIngestionResponse:
backend, backend_type = _resolve_backend()
draft = _build_issue(payload, backend_type)
try:
created: Issue = backend.create_issue(draft)
except HTTPException:
raise
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Backend rejected the issue: {exc}",
)
finally:
try:
backend.disconnect()
except Exception:
pass
issue_id = created.id or str(created.number)
issue_url: Optional[str] = None
if created.sync_metadata:
issue_url = created.sync_metadata.get("url") or created.sync_metadata.get("html_url")
backend_name: BackendName = _BACKEND_TYPE_TO_NAME.get(backend_type, backend_type) # type: ignore[assignment]
return TaskIngestionResponse(
issue_id=issue_id,
issue_url=issue_url,
backend=backend_name,
)