--- id: custodian-WP-0003a type: workplan domain: custodian repo: activity-core status: done state_hub_workstream_id: b4eb45a9-69e3-4ab0-b00c-67a53c3117c5 split_from: custodian-WP-0003 split_part: 1 of 3 tasks: - id: T34 title: Refactor ActivityDefinition model — add rules/instructions fields status: done priority: high state_hub_task_id: ca1bf66f-7094-459c-9abf-5f5c6414c91a - id: T35 title: Create src/activity_core/rules/ module skeleton status: done priority: high state_hub_task_id: 54c25eae-4fad-42d8-bb15-9c1e7532425e - id: T36 title: Implement RuleEvaluator (sandboxed AST walker) status: done priority: high state_hub_task_id: a1ed0d8b-df59-4af1-82a1-d01628919689 - id: T37 title: Implement InstructionExecutor status: done priority: high state_hub_task_id: cdd349f1-b7ad-4a3a-afa0-ae671a7addb8 - id: T38 title: Alembic migration — add task_spawn_log table status: done priority: high state_hub_task_id: 4cd9833c-fbc4-4b10-b6f2-85c028c8c557 - id: T39 title: Implement IssueSink adapter interface and IssueCoreRestSink status: done priority: high state_hub_task_id: 38177fcf-c468-4424-9938-1b01038a386b - id: T40 title: Formalize EventEnvelope model status: done priority: high state_hub_task_id: 65d77939-6972-4450-993e-23ccb25d9454 - id: T41 title: Event type registry — file scanner, parser, DB model, sync command status: done priority: high state_hub_task_id: 7a182265-4013-4272-8540-cfb4e2079eb3 - id: T42 title: Curator gate configuration status: done priority: medium state_hub_task_id: 229d99ca-2d09-4c96-b3d2-da8b2d14c5b7 - id: T43 title: Write first event type definitions status: done priority: medium state_hub_task_id: 78b9d642-17b1-46c7-8e5f-c0a948821993 created: "2026-05-14" --- # activity-core WP-0003a — Domain Model, Rules Infrastructure & Event Type Registry **Split from:** custodian-WP-0003 (part 1 of 3) **Hub workstream:** `b4eb45a9-69e3-4ab0-b00c-67a53c3117c5` **Next part:** custodian-WP-0003b (parser, workflow wiring, triggers, webhooks) **Architecture:** ACT-ADR-001, ACT-ADR-002, ACT-ADR-003 ## Purpose Phases 7 and 8 of the Event Bridge implementation: establish the domain model with rules/instructions fields, build the sandboxed rule evaluator, implement the instruction executor with prompt injection protection, add the spawn audit log, implement the IssueSink adapter, formalize EventEnvelope, build the event type registry, and write the first event type definition files. This part produces the foundational building blocks that WP-0003b and WP-0003c depend on. ## Build Order ``` Phase 7 (parallel tracks): T34 (model) → T35 (rules/ skeleton) → T36, T37 (parallel) T38 (migration — independent) T39 (IssueSink — independent) T40 (EventEnvelope — independent) Phase 8 (needs T40): T40 → T41 (registry) → T42, T43 (parallel) ``` --- ## Phase 7 — Domain Model & Rules Infrastructure ### T34: Refactor ActivityDefinition model — add rules/instructions fields `src/activity_core/models.py` Add `RuleDef` and `InstructionDef` dataclasses and update `ActivityDefinition`: ```python @dataclass class ActionDef: task_template: str # path relative to repo root target_repo: str | None # attribute-access expression or literal priority: str = "medium" labels: list[str] = field(default_factory=list) due_in_days: int | None = None @dataclass class RuleDef: id: str condition: str # expression string; empty = always true action: ActionDef @dataclass class InstructionDef: id: str condition: str # optional pre-filter (Rule DSL) trusted_fields: list[str] model: str prompt: str output_schema: str # path to JSON Schema file review_required: bool = False ``` `ActivityDefinition` gains `rules: list[RuleDef]` and `instructions: list[InstructionDef]`. The existing `task_templates` field is deprecated and ignored if `rules` is non-empty. --- ### T35: Create src/activity_core/rules/ module skeleton ``` src/activity_core/rules/ __init__.py # exports: evaluate_condition, execute_instruction models.py # RuleDef, InstructionDef, TaskSpec, TaskRef (no upstream imports) evaluator.py # RuleEvaluator stub executor.py # InstructionExecutor stub ``` Boundary contract: nothing inside `rules/` may import from `temporalio`, `sqlalchemy`, `fastapi`, or any `activity_core.*` module outside `rules/`. A CI test (`tests/rules/test_boundary.py`) verifies this by inspecting imports. --- ### T36: Implement RuleEvaluator (sandboxed AST walker) `src/activity_core/rules/evaluator.py` ```python def evaluate_condition( expr: str, event: EventEnvelope, context: dict, ) -> bool: """ Evaluates a Rule condition expression safely. Raises UnsafeExpression at parse time if any non-whitelisted AST node is found. Never calls exec() or eval(). """ ``` Whitelisted AST node types: `Expression`, `BoolOp`, `And`, `Or`, `UnaryOp`, `Not`, `Compare`, `Eq`, `NotEq`, `Lt`, `LtE`, `Gt`, `GtE`, `In`, `NotIn`, `Name`, `Attribute`, `Constant`, `Call` (only `len`), `List`, `Tuple`. `IfExp` — forbidden, raise `UnsafeExpression`. Attribute resolution: `event.attributes.repo_slug` walks the EventEnvelope object. `context.repo_profile.tags` walks the context dict. Missing attributes return `None` rather than raising — this lets rules use `is None` checks without crashing on optional fields. Empty expression string → returns `True`. --- ### T37: Implement InstructionExecutor `src/activity_core/rules/executor.py` ```python def execute_instruction( instr: InstructionDef, event: EventEnvelope, context: dict, llm_client, # injected; implements llm-connect interface ) -> list[TaskSpec]: """ Evaluates an Instruction. Returns [] on any failure; never raises. """ ``` Steps: 1. If `instr.condition` is non-empty, call `evaluate_condition` — return `[]` if false. 2. Render prompt: scan for `{field.path}` placeholders; resolve each against `trusted_fields` allowlist. Raise `UntrustedFieldError` (caught by caller, logged, returns `[]`) if any placeholder is not listed. Fields of type `object` in the event schema are always rejected even if listed. 3. Call `llm_client.complete(prompt, model=instr.model)` with structured output. 4. Validate response against `instr.output_schema` (JSON Schema). On failure: append schema error to prompt, retry once. 5. If second attempt fails: log `instruction_output_error`, return `[]`. 6. Return validated `list[TaskSpec]`. --- ### T38: Alembic migration — add task_spawn_log table New migration file in `alembic/versions/`. Table definition: ```sql CREATE TABLE task_spawn_log ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), activity_def_id UUID NOT NULL REFERENCES activity_definitions(id), source_type VARCHAR(20) NOT NULL, -- 'rule' | 'instruction' source_id TEXT NOT NULL, source_version TEXT NOT NULL, triggering_event_id TEXT NOT NULL, task_ref TEXT, -- issue-core external ref condition_matched TEXT, prompt_hash CHAR(64), -- SHA-256, nullable model TEXT, output_validated BOOLEAN, review_required BOOLEAN, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX idx_spawn_log_event ON task_spawn_log(triggering_event_id); CREATE INDEX idx_spawn_log_def ON task_spawn_log(activity_def_id); ``` --- ### T39: Implement IssueSink adapter interface and IssueCoreRestSink `src/activity_core/issue_sink.py` ```python class IssueSink(ABC): @abstractmethod def emit(self, task_spec: TaskSpec) -> TaskRef: ... class IssueCoreRestSink(IssueSink): """POSTs to issue-core REST API. Config: ISSUE_CORE_URL env var.""" class NullSink(IssueSink): """Discards tasks and returns synthetic TaskRefs. For testing.""" ``` `IssueCoreRestSink.emit()`: 1. POST `{ISSUE_CORE_URL}/issues/` with task_spec serialised as issue fields. 2. Parse response → `TaskRef(external_id, backend_url)`. 3. Write `task_spawn_log` row (via injected db session). 4. Return `TaskRef`. Active sink is selected by `ISSUE_SINK_TYPE` env var: `rest` (default) | `null`. --- ## Phase 8 — Event Type Registry ### T40: Formalize EventEnvelope model Update `src/activity_core/models.py`: ```python @dataclass class EventEnvelope: id: str # UUID v4 type: str # e.g. "org.repo.registered" version: str # e.g. "1.0" timestamp: datetime publisher: str # e.g. "the-custodian/state-hub" attributes: dict[str, Any] # typed per event type schema @classmethod def from_nats_message(cls, msg: nats.aio.msg.Msg) -> "EventEnvelope": ... @classmethod def from_webhook_payload(cls, source: str, payload: dict) -> "EventEnvelope": ... ``` Update `event_router.py` to produce compliant envelopes. Existing NATS message format may require a migration shim if the current format omits `id`, `version`, or `publisher`. --- ### T41: Event type registry — file scanner, parser, DB model, sync command **File scanner:** glob `event-types/*.md`; parse YAML frontmatter block. **Attribute schema:** parse the `## Attributes` markdown table into a dict of `{name: {type, required, description}}`. Types map to Python annotations for validation. **DB model** (`event_types` table — new Alembic migration): ```sql CREATE TABLE event_types ( type_id TEXT PRIMARY KEY, -- e.g. "org.repo.registered" version TEXT NOT NULL, publisher TEXT NOT NULL, governance TEXT NOT NULL DEFAULT 'publisher-declared', status TEXT NOT NULL DEFAULT 'active', attribute_schema JSONB NOT NULL, raw_md TEXT NOT NULL, synced_at TIMESTAMPTZ NOT NULL DEFAULT now() ); ``` **Sync command:** `make sync-event-types` (also called at worker startup). **Registry lookup:** `get_event_type(type_id: str) -> EventTypeDef | None` — used by Event Router and webhook receiver to validate incoming events. --- ### T42: Curator gate configuration `ACTIVITY_CURATOR_GATE` env var: `disabled` (default) | `required`. - `disabled`: all event types with `status IN ('active', 'pending')` are accepted by the Event Router. `pending` types are logged as a warning. - `required`: only `status = 'active'` types are accepted. Events of type `pending` are discarded with a `curator_gate_rejected` log entry. Curator approval endpoint: `POST /event-types/{type_id}/approve` sets `status = 'active'`. Requires admin auth (same as existing API auth). --- ### T43: Write first event type definitions Three files under `event-types/`: - `org.repo.registered.md` — publisher: state-hub; attrs: repo_slug (string, required), domain (string, required), tags (string[], optional), registered_at (datetime, required). - `org.workstream.completed.md` — publisher: state-hub; attrs: workstream_id (uuid, required), workstream_slug (string, required), domain (string, required), completed_at (datetime, required). - `org.activity.run.completed.md` — publisher: activity-core; attrs: activity_definition_id (uuid, required), run_id (uuid, required), tasks_spawned (integer, required), completed_at (datetime, required). Each file follows the full ACT-ADR-002 structure: Intent, When Published, Attributes table, Example Payload, Consumer Notes, Debugging. --- ## Completion Criteria for This Part 1. `src/activity_core/rules/` module importable with no upstream deps (`test_boundary.py` passes) 2. `evaluate_condition` handles all whitelisted operators and rejects all forbidden constructs 3. `IssueSink` ABC and `IssueCoreRestSink` / `NullSink` implemented 4. Alembic migrations for `task_spawn_log` and `event_types` tables apply cleanly 5. `make sync-event-types` loads all three org event type definitions 6. EventEnvelope model updated and `from_nats_message` works with existing event_router ## New Files Produced | Path | Task | |---|---| | `src/activity_core/rules/__init__.py` | T35 | | `src/activity_core/rules/models.py` | T35 | | `src/activity_core/rules/evaluator.py` | T36 | | `src/activity_core/rules/executor.py` | T37 | | `src/activity_core/issue_sink.py` | T39 | | `event-types/org.repo.registered.md` | T43 | | `event-types/org.workstream.completed.md` | T43 | | `event-types/org.activity.run.completed.md` | T43 | ## Modified Files | Path | Task | Change | |---|---|---| | `src/activity_core/models.py` | T34, T40 | RuleDef, InstructionDef, EventEnvelope update | | `src/activity_core/event_router.py` | T40 | Produce compliant EventEnvelope | | `alembic/versions/` | T38, T41 | Two new migrations | | `src/activity_core/api.py` | T42 | Curator approve endpoint | | `src/activity_core/worker.py` | T41 | sync-event-types at startup | | `Makefile` | T41 | sync-event-types target | ## Change History - v1.0 (2026-05-14): Split from custodian-WP-0003 (phases 7–8).