# NATS Task Ingestion — Design Stub **Status:** design stub. Implementation deferred until activity-core's `IssueSink` migrates from REST to NATS. **Scope:** describe what the NATS-backed counterpart of `POST /issues/` will look like, so the activity-core agent and any other future emitter can plan against a stable contract. ## Why NATS Today the ingestion surface is `POST /issues/` — synchronous REST with an API key. That works for activity-core's first cut but has limitations: - **Coupling**: activity-core needs to know the URL and key of every issue-core instance. With NATS, both sides connect to a shared broker; routing is by subject. - **Backpressure**: REST is best-effort. If issue-core is down or slow, the emitter either blocks or drops. With NATS JetStream, messages are durable and replay-capable. - **Fan-out**: REST has one consumer. NATS supports multiple consumers (e.g. an audit logger sitting alongside the actual ingester) trivially. - **Replay**: incidents that lose tasks can be reconstructed from the JetStream log if the consumer was offline. ## Subject pattern ``` act.tasks.create.{target_repo} ``` - Namespace prefix `act.tasks.` (the `act` is activity-core's heritage — the subject prefix is now neutral and other emitters can publish on it too). - `create` is the verb. Future verbs (`act.tasks.update`, `act.tasks.close`) are reserved but not in scope here. - `{target_repo}` is the same string field as the REST `TaskSpec.target_repo`. It allows subject-based routing in consumers: an issue-core instance responsible only for one repo subscribes to `act.tasks.create.myrepo`, while a multi-tenant instance subscribes to `act.tasks.create.>`. ## Message schema The payload is the **exact same** schema as the REST endpoint — `TaskIngestionRequest` in `issue_core/api/schemas.py`: ```json { "title": "string", "description": "string", "target_repo": "string", "priority": "high | medium | low", "labels": ["string"], "due_in_days": 7, "source_type": "rule | instruction", "source_id": "string", "triggering_event_id": "uuid", "activity_definition_id": "string" } ``` Encoded as **JSON** in the message body. `Content-Type: application/json` in the message header. This intentionally matches the REST schema so the validator and `_build_issue` logic in `issue_core/api/ingest.py` can be reused unchanged by the NATS consumer. ## JetStream configuration The publisher (e.g. activity-core IssueSink-NATS) writes to a JetStream stream: | Field | Value | |---------------|----------------------------------------| | Stream name | `ACT_TASKS` | | Subjects | `act.tasks.>` | | Retention | Limits (Time-based: 7 days) | | Storage | File | | Replicas | 3 in prod, 1 in dev | | Discard | Old (drop oldest on overflow) | | Max msg size | 64 KiB (TaskSpec is small) | issue-core consumes via a **durable consumer**: | Field | Value | |-----------------|----------------------------------------| | Stream | `ACT_TASKS` | | Consumer name | `issue-core-ingest` | | Filter subject | `act.tasks.create.>` | | Deliver policy | All (catch up from oldest on first start) | | Ack policy | Explicit | | Max deliver | 5 (then dead-letter) | | Ack wait | 30s | | Replay policy | Instant | ## Idempotency NATS JetStream provides **at-least-once** delivery. The consumer must dedupe retries. **Idempotency key:** `triggering_event_id` (UUID, included in every payload). The consumer's responsibility: 1. Compute idempotency key from `triggering_event_id`. 2. Check whether an issue with that key already exists (lookup by `sync_metadata.ingestion.triggering_event_id`). 3. If exists, ack the message without creating a duplicate. 4. If not, create the issue and ack. Both REST and NATS paths share this dedupe logic, so a task can be safely emitted via either transport without risk of duplicate issues. ## Implementation plan (when activated) 1. Add `nats-py>=2.6` as an optional dependency (`pip install issue-core[nats]`). 2. New module `issue_core/nats/consumer.py` — connects to NATS, subscribes to the durable consumer, parses messages, calls the same `_build_issue` / backend.create_issue path as the REST endpoint. 3. New CLI subcommand `issue subscribe --nats-url ... --stream ACT_TASKS`. 4. Add idempotency check to both REST and NATS ingestion paths (single shared function in `issue_core/api/ingest.py` or a new `issue_core/ingestion/` module). 5. Tests using `nats-py` test harness or a docker-compose NATS instance. ## Open questions - Should the NATS consumer write a `progress_event` to the state hub on each successful ingestion, in addition to creating the issue? Probably yes, but out of scope until activation. - Multi-tenant routing: do we run one issue-core consumer per `target_repo`, or one shared consumer with per-repo backend lookup? Current bias: shared consumer, simpler to operate. - Dead-letter handling: where do messages go after 5 failed deliveries? Candidate: a `ACT_TASKS_DLQ` stream with manual replay tooling. ## See also - `SCOPE.md` — confirms NATS ingestion is in-scope as a future surface. - `issue_core/api/schemas.py` — the canonical `TaskIngestionRequest` schema. - `issue_core/api/ingest.py` — the REST handler whose logic the NATS consumer will share. - activity-core `docs/adr/adr-001-event-bridge-architecture.md` — describes activity-core's migration trajectory from REST to NATS.