--- id: custodian-WP-0003b type: workplan domain: custodian repo: activity-core status: done state_hub_workstream_id: b4eb45a9-69e3-4ab0-b00c-67a53c3117c5 split_from: custodian-WP-0003 split_part: 2 of 3 depends_on: - custodian-WP-0003a # model, rules, registry must be done first tasks: - id: T44 title: ActivityDefinition file parser status: done priority: high state_hub_task_id: a86dcffe-789a-482e-bc5f-a5ac9db9608e - id: T45 title: ActivityDefinition sync command status: done priority: medium state_hub_task_id: a714c511-9f93-4594-b745-9330bc645384 - id: T46 title: Wire rule/instruction evaluator into RunActivityWorkflow status: done priority: high state_hub_task_id: 615dec10-7e63-46b3-80ad-83ff2e51d6ba - id: T47 title: Add ScheduledTriggerConfig (one-off future datetime) status: done priority: medium state_hub_task_id: 1e06cb43-d0e3-4af5-9216-0877cac2082d - id: T48 title: Implement one-off Temporal Schedule status: done priority: medium state_hub_task_id: 01b507e3-6420-422f-83ed-90c7b4d49bb7 - id: T49 title: Implement webhook receiver status: done priority: medium state_hub_task_id: 673ca7b2-49cc-4314-8be2-9848f686da37 - id: T50 title: Write Gitea webhook event type definitions status: done priority: medium state_hub_task_id: 7ba289e9-fef6-469a-9504-c3664227a2f7 created: "2026-05-14" --- # activity-core WP-0003b — Parser, Workflow Wiring, Triggers & Webhooks **Split from:** custodian-WP-0003 (part 2 of 3) **Hub workstream:** `b4eb45a9-69e3-4ab0-b00c-67a53c3117c5` **Depends on:** custodian-WP-0003a (model, rules, registry) **Next part:** custodian-WP-0003c (context adapters, integration) **Architecture:** ACT-ADR-001, ACT-ADR-002, ACT-ADR-003 ## Purpose Phases 9 and 10 of the Event Bridge implementation. With the rules module, IssueSink, and event type registry in place (WP-0003a), this part wires everything into the workflow: markdown-format ActivityDefinition parser, sync command, rule/instruction pipeline in RunActivityWorkflow, one-off scheduled trigger type, and Gitea webhook receiver. ## Prerequisites All tasks from custodian-WP-0003a must be done: - `src/activity_core/rules/` module with `evaluate_condition` and `execute_instruction` - `IssueSink` ABC + `NullSink` + `IssueCoreRestSink` - `EventEnvelope` model finalized - `event_types` table and `sync-event-types` command working ## Build Order ``` Phase 9: T44 (parser) → T45 (sync) → T46 (wire workflow) T46 also needs: T36 (RuleEvaluator), T37 (InstructionExecutor), T39 (IssueSink) from 0003a Phase 10 (parallel with phase 9): T47 (model) → T48 (Temporal schedule) T49, T50 (parallel — need T40, T41 from 0003a) ``` --- ## Phase 9 — ActivityDefinition as Markdown ### T44: ActivityDefinition file parser `src/activity_core/definition_parser.py` Scans `activity-definitions/*.md` in the local repo and any registered domain repos listed in `ACTIVITY_DEFINITION_DIRS` env var (colon-separated paths). Parses: - YAML frontmatter → trigger config, context_sources, governance, owner, status. - Fenced ` ```rule ` blocks → `RuleDef` list (YAML body). - Fenced ` ```instruction ` blocks → `InstructionDef` list (YAML body). Returns `ActivityDefinitionDef`. Raises `ParseError(file, line, message)` on malformed input — never silently ignores a broken definition. --- ### T45: ActivityDefinition sync command `make sync-activity-definitions` (also in `worker.py` startup sequence, before the Temporal run loop). Extend `activity_definitions` table: add `rules_json` (JSONB) and `instructions_json` (JSONB) columns. On sync: parse all definition files, upsert rows. Definitions absent from filesystem set `status = 'inactive'` (soft delete — Temporal Schedules for those definitions are paused, not deleted). Alembic migration for new columns included in this task. --- ### T46: Wire rule/instruction evaluator into RunActivityWorkflow Replace `RunActivityWorkflow`'s current unconditional task-spawning logic: ``` load_activity_definition() → resolve_context() [calls each context_source adapter] → evaluate_rules() [RuleEvaluator — all rules, all-match] → execute_instructions() [InstructionExecutor — all passing pre-filters] → emit_tasks() [IssueSink.emit() for each TaskSpec] → log_run() [activity_runs row + task_spawn_log rows] ``` A rule that raises `UnsafeExpression` or any other error is skipped and logged as `rule_eval_error`; other rules in the same run still execute. The run is not failed by a single bad rule. --- ## Phase 10 — One-off Scheduled Trigger and Webhook Receiver ### T47: Add ScheduledTriggerConfig `src/activity_core/models.py` ```python @dataclass class ScheduledTriggerConfig: at: datetime # UTC; warns if in the past timezone: str = "UTC" TriggerConfig = CronTriggerConfig | EventTriggerConfig | ScheduledTriggerConfig ``` Add to ActivityDefinition frontmatter schema: ```yaml trigger: type: scheduled at: "2026-09-01T09:00:00Z" timezone: "Europe/Berlin" ``` --- ### T48: Implement one-off Temporal Schedule `src/activity_core/schedule_manager.py` ```python async def upsert_schedule(client, defn: ActivityDefinition): if isinstance(defn.trigger_config, ScheduledTriggerConfig): # Create Schedule with remaining_actions=1 # Schedule ID: activity-schedule-{defn.id}-once # start_at = defn.trigger_config.at ... ``` `sync_schedules.py`: detect `ScheduledTriggerConfig`, use one-off path. Add `cancel_scheduled(client, activity_id)` for admin cancellation of pending one-off definitions. --- ### T49: Implement webhook receiver `src/activity_core/webhook_receiver.py` FastAPI `APIRouter` mounted at `/webhooks/{source}` in `api.py`. ``` POST /webhooks/gitea → validate X-Gitea-Signature (HMAC-SHA256) POST /webhooks/github → validate X-Hub-Signature-256 (HMAC-SHA256) ``` Per-source normalisers convert raw webhook payload to `EventEnvelope`: - Gitea `repository` event → `gitea.repo.created` - Gitea `push` event → `gitea.push` - Gitea `issues` event (action=closed) → `gitea.issue.closed` After normalisation: validate type against event type registry; publish to NATS subject `activity.events`. Discard unknown event types with HTTP 422 (not 500 — unknown type is not a server error). Config: `WEBHOOK_SECRET_GITEA` and `WEBHOOK_SECRET_GITHUB` env vars. --- ### T50: Write Gitea webhook event type definitions Three files under `event-types/`: - `gitea.repo.created.md` — publisher: gitea/webhook; attrs: repo_full_name (string), repo_slug (string, derived from repo name), owner (string), html_url (string), created_at (datetime). - `gitea.push.md` — publisher: gitea/webhook; attrs: repo_full_name (string), branch (string), pusher (string), commits_count (integer), compare_url (string), pushed_at (datetime). - `gitea.issue.closed.md` — publisher: gitea/webhook; attrs: repo_full_name (string), issue_number (integer), issue_title (string), closer (string), closed_at (datetime). Each includes a "Normaliser mapping" subsection in Consumer Notes showing the raw Gitea webhook field → EventEnvelope attribute mapping. --- ## Completion Criteria for This Part 1. `make sync-activity-definitions` loads definitions from `activity-definitions/` and upserts them (with `rules_json`, `instructions_json`) cleanly 2. `RunActivityWorkflow` uses the rule/instruction pipeline end-to-end (no unconditional task spawning) 3. One-off `scheduled` trigger type creates a Temporal Schedule with `remaining_actions=1` 4. `POST /webhooks/gitea` validates HMAC, normalises payload, publishes to NATS 5. Three Gitea event type definitions committed and loaded by `sync-event-types` ## New Files Produced | Path | Task | |---|---| | `src/activity_core/definition_parser.py` | T44 | | `src/activity_core/webhook_receiver.py` | T49 | | `event-types/gitea.repo.created.md` | T50 | | `event-types/gitea.push.md` | T50 | | `event-types/gitea.issue.closed.md` | T50 | ## Modified Files | Path | Task | Change | |---|---|---| | `src/activity_core/models.py` | T47 | ScheduledTriggerConfig, TriggerConfig union | | `src/activity_core/workflows.py` | T46 | Rule/instruction pipeline replaces unconditional spawn | | `src/activity_core/activities.py` | T46 | resolve_context calls adapter chain | | `src/activity_core/schedule_manager.py` | T48 | One-off schedule path | | `src/activity_core/sync_schedules.py` | T48 | Detect ScheduledTriggerConfig | | `src/activity_core/api.py` | T49 | Mount webhook router | | `src/activity_core/worker.py` | T45 | sync-activity-definitions at startup | | `alembic/versions/` | T45 | Migration: rules_json, instructions_json columns | | `Makefile` | T45 | sync-activity-definitions target | ## Change History - v1.0 (2026-05-14): Split from custodian-WP-0003 (phases 9–10).