generated from coulomb/repo-seed
Implements phases 7–8 of the Event Bridge architecture (custodian-WP-0003a). Domain model (T34, T40): - Added RuleDef, InstructionDef, ActionDef to models.py - Updated ActivityDefinition with rules/instructions fields (task_templates deprecated) - Formalized EventEnvelope: id, type, version, timestamp, publisher, attributes - Added from_nats_message() and from_webhook_payload() classmethods Rules module (T35, T36, T37): - src/activity_core/rules/ skeleton with boundary enforcement - evaluate_condition() — sandboxed AST walker, whitelisted nodes only, never exec() - execute_instruction() — LLM task generation with trusted_fields injection guard - tests/rules/test_boundary.py verifies no cross-boundary imports Infrastructure (T38, T39): - Alembic migrations 0004 (task_spawn_log) and 0005 (event_types) - IssueSink ABC + IssueCoreRestSink (REST) + NullSink (testing) - TaskSpawnLog and EventType ORM models Event type registry (T41, T42, T43): - event_type_registry.py: file scanner, parser, DB sync, in-process lookup - ACTIVITY_CURATOR_GATE env var (disabled|required) + approve endpoint - Three org event type definitions: org.repo.registered, org.workstream.completed, org.activity.run.completed All 10 tests pass. Boundary test confirms rules/ isolation. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
394 lines
13 KiB
Markdown
394 lines
13 KiB
Markdown
---
|
||
id: custodian-WP-0003a
|
||
type: workplan
|
||
domain: custodian
|
||
repo: activity-core
|
||
status: done
|
||
state_hub_workstream_id: b4eb45a9-69e3-4ab0-b00c-67a53c3117c5
|
||
split_from: custodian-WP-0003
|
||
split_part: 1 of 3
|
||
tasks:
|
||
- id: T34
|
||
title: Refactor ActivityDefinition model — add rules/instructions fields
|
||
status: done
|
||
priority: high
|
||
state_hub_task_id: ca1bf66f-7094-459c-9abf-5f5c6414c91a
|
||
- id: T35
|
||
title: Create src/activity_core/rules/ module skeleton
|
||
status: done
|
||
priority: high
|
||
state_hub_task_id: 54c25eae-4fad-42d8-bb15-9c1e7532425e
|
||
- id: T36
|
||
title: Implement RuleEvaluator (sandboxed AST walker)
|
||
status: done
|
||
priority: high
|
||
state_hub_task_id: a1ed0d8b-df59-4af1-82a1-d01628919689
|
||
- id: T37
|
||
title: Implement InstructionExecutor
|
||
status: done
|
||
priority: high
|
||
state_hub_task_id: cdd349f1-b7ad-4a3a-afa0-ae671a7addb8
|
||
- id: T38
|
||
title: Alembic migration — add task_spawn_log table
|
||
status: done
|
||
priority: high
|
||
state_hub_task_id: 4cd9833c-fbc4-4b10-b6f2-85c028c8c557
|
||
- id: T39
|
||
title: Implement IssueSink adapter interface and IssueCoreRestSink
|
||
status: done
|
||
priority: high
|
||
state_hub_task_id: 38177fcf-c468-4424-9938-1b01038a386b
|
||
- id: T40
|
||
title: Formalize EventEnvelope model
|
||
status: done
|
||
priority: high
|
||
state_hub_task_id: 65d77939-6972-4450-993e-23ccb25d9454
|
||
- id: T41
|
||
title: Event type registry — file scanner, parser, DB model, sync command
|
||
status: done
|
||
priority: high
|
||
state_hub_task_id: 7a182265-4013-4272-8540-cfb4e2079eb3
|
||
- id: T42
|
||
title: Curator gate configuration
|
||
status: done
|
||
priority: medium
|
||
state_hub_task_id: 229d99ca-2d09-4c96-b3d2-da8b2d14c5b7
|
||
- id: T43
|
||
title: Write first event type definitions
|
||
status: done
|
||
priority: medium
|
||
state_hub_task_id: 78b9d642-17b1-46c7-8e5f-c0a948821993
|
||
created: "2026-05-14"
|
||
---
|
||
|
||
# activity-core WP-0003a — Domain Model, Rules Infrastructure & Event Type Registry
|
||
|
||
**Split from:** custodian-WP-0003 (part 1 of 3)
|
||
**Hub workstream:** `b4eb45a9-69e3-4ab0-b00c-67a53c3117c5`
|
||
**Next part:** custodian-WP-0003b (parser, workflow wiring, triggers, webhooks)
|
||
**Architecture:** ACT-ADR-001, ACT-ADR-002, ACT-ADR-003
|
||
|
||
## Purpose
|
||
|
||
Phases 7 and 8 of the Event Bridge implementation: establish the domain model
|
||
with rules/instructions fields, build the sandboxed rule evaluator, implement
|
||
the instruction executor with prompt injection protection, add the spawn audit
|
||
log, implement the IssueSink adapter, formalize EventEnvelope, build the event
|
||
type registry, and write the first event type definition files.
|
||
|
||
This part produces the foundational building blocks that WP-0003b and WP-0003c
|
||
depend on.
|
||
|
||
## Build Order
|
||
|
||
```
|
||
Phase 7 (parallel tracks):
|
||
T34 (model) → T35 (rules/ skeleton) → T36, T37 (parallel)
|
||
T38 (migration — independent)
|
||
T39 (IssueSink — independent)
|
||
T40 (EventEnvelope — independent)
|
||
|
||
Phase 8 (needs T40):
|
||
T40 → T41 (registry) → T42, T43 (parallel)
|
||
```
|
||
|
||
---
|
||
|
||
## Phase 7 — Domain Model & Rules Infrastructure
|
||
|
||
### T34: Refactor ActivityDefinition model — add rules/instructions fields
|
||
|
||
`src/activity_core/models.py`
|
||
|
||
Add `RuleDef` and `InstructionDef` dataclasses and update `ActivityDefinition`:
|
||
|
||
```python
|
||
@dataclass
|
||
class ActionDef:
|
||
task_template: str # path relative to repo root
|
||
target_repo: str | None # attribute-access expression or literal
|
||
priority: str = "medium"
|
||
labels: list[str] = field(default_factory=list)
|
||
due_in_days: int | None = None
|
||
|
||
@dataclass
|
||
class RuleDef:
|
||
id: str
|
||
condition: str # expression string; empty = always true
|
||
action: ActionDef
|
||
|
||
@dataclass
|
||
class InstructionDef:
|
||
id: str
|
||
condition: str # optional pre-filter (Rule DSL)
|
||
trusted_fields: list[str]
|
||
model: str
|
||
prompt: str
|
||
output_schema: str # path to JSON Schema file
|
||
review_required: bool = False
|
||
```
|
||
|
||
`ActivityDefinition` gains `rules: list[RuleDef]` and
|
||
`instructions: list[InstructionDef]`. The existing `task_templates` field is
|
||
deprecated and ignored if `rules` is non-empty.
|
||
|
||
---
|
||
|
||
### T35: Create src/activity_core/rules/ module skeleton
|
||
|
||
```
|
||
src/activity_core/rules/
|
||
__init__.py # exports: evaluate_condition, execute_instruction
|
||
models.py # RuleDef, InstructionDef, TaskSpec, TaskRef (no upstream imports)
|
||
evaluator.py # RuleEvaluator stub
|
||
executor.py # InstructionExecutor stub
|
||
```
|
||
|
||
Boundary contract: nothing inside `rules/` may import from `temporalio`,
|
||
`sqlalchemy`, `fastapi`, or any `activity_core.*` module outside `rules/`.
|
||
A CI test (`tests/rules/test_boundary.py`) verifies this by inspecting imports.
|
||
|
||
---
|
||
|
||
### T36: Implement RuleEvaluator (sandboxed AST walker)
|
||
|
||
`src/activity_core/rules/evaluator.py`
|
||
|
||
```python
|
||
def evaluate_condition(
|
||
expr: str,
|
||
event: EventEnvelope,
|
||
context: dict,
|
||
) -> bool:
|
||
"""
|
||
Evaluates a Rule condition expression safely.
|
||
Raises UnsafeExpression at parse time if any non-whitelisted AST node
|
||
is found. Never calls exec() or eval().
|
||
"""
|
||
```
|
||
|
||
Whitelisted AST node types: `Expression`, `BoolOp`, `And`, `Or`, `UnaryOp`,
|
||
`Not`, `Compare`, `Eq`, `NotEq`, `Lt`, `LtE`, `Gt`, `GtE`, `In`, `NotIn`,
|
||
`Name`, `Attribute`, `Constant`, `Call` (only `len`), `List`, `Tuple`.
|
||
`IfExp` — forbidden, raise `UnsafeExpression`.
|
||
|
||
Attribute resolution: `event.attributes.repo_slug` walks the EventEnvelope
|
||
object. `context.repo_profile.tags` walks the context dict. Missing attributes
|
||
return `None` rather than raising — this lets rules use `is None` checks without
|
||
crashing on optional fields.
|
||
|
||
Empty expression string → returns `True`.
|
||
|
||
---
|
||
|
||
### T37: Implement InstructionExecutor
|
||
|
||
`src/activity_core/rules/executor.py`
|
||
|
||
```python
|
||
def execute_instruction(
|
||
instr: InstructionDef,
|
||
event: EventEnvelope,
|
||
context: dict,
|
||
llm_client, # injected; implements llm-connect interface
|
||
) -> list[TaskSpec]:
|
||
"""
|
||
Evaluates an Instruction. Returns [] on any failure; never raises.
|
||
"""
|
||
```
|
||
|
||
Steps:
|
||
1. If `instr.condition` is non-empty, call `evaluate_condition` — return `[]`
|
||
if false.
|
||
2. Render prompt: scan for `{field.path}` placeholders; resolve each against
|
||
`trusted_fields` allowlist. Raise `UntrustedFieldError` (caught by caller,
|
||
logged, returns `[]`) if any placeholder is not listed. Fields of type
|
||
`object` in the event schema are always rejected even if listed.
|
||
3. Call `llm_client.complete(prompt, model=instr.model)` with structured output.
|
||
4. Validate response against `instr.output_schema` (JSON Schema). On failure:
|
||
append schema error to prompt, retry once.
|
||
5. If second attempt fails: log `instruction_output_error`, return `[]`.
|
||
6. Return validated `list[TaskSpec]`.
|
||
|
||
---
|
||
|
||
### T38: Alembic migration — add task_spawn_log table
|
||
|
||
New migration file in `alembic/versions/`. Table definition:
|
||
|
||
```sql
|
||
CREATE TABLE task_spawn_log (
|
||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||
activity_def_id UUID NOT NULL REFERENCES activity_definitions(id),
|
||
source_type VARCHAR(20) NOT NULL, -- 'rule' | 'instruction'
|
||
source_id TEXT NOT NULL,
|
||
source_version TEXT NOT NULL,
|
||
triggering_event_id TEXT NOT NULL,
|
||
task_ref TEXT, -- issue-core external ref
|
||
condition_matched TEXT,
|
||
prompt_hash CHAR(64), -- SHA-256, nullable
|
||
model TEXT,
|
||
output_validated BOOLEAN,
|
||
review_required BOOLEAN,
|
||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||
);
|
||
CREATE INDEX idx_spawn_log_event ON task_spawn_log(triggering_event_id);
|
||
CREATE INDEX idx_spawn_log_def ON task_spawn_log(activity_def_id);
|
||
```
|
||
|
||
---
|
||
|
||
### T39: Implement IssueSink adapter interface and IssueCoreRestSink
|
||
|
||
`src/activity_core/issue_sink.py`
|
||
|
||
```python
|
||
class IssueSink(ABC):
|
||
@abstractmethod
|
||
def emit(self, task_spec: TaskSpec) -> TaskRef: ...
|
||
|
||
class IssueCoreRestSink(IssueSink):
|
||
"""POSTs to issue-core REST API. Config: ISSUE_CORE_URL env var."""
|
||
|
||
class NullSink(IssueSink):
|
||
"""Discards tasks and returns synthetic TaskRefs. For testing."""
|
||
```
|
||
|
||
`IssueCoreRestSink.emit()`:
|
||
1. POST `{ISSUE_CORE_URL}/issues/` with task_spec serialised as issue fields.
|
||
2. Parse response → `TaskRef(external_id, backend_url)`.
|
||
3. Write `task_spawn_log` row (via injected db session).
|
||
4. Return `TaskRef`.
|
||
|
||
Active sink is selected by `ISSUE_SINK_TYPE` env var: `rest` (default) | `null`.
|
||
|
||
---
|
||
|
||
## Phase 8 — Event Type Registry
|
||
|
||
### T40: Formalize EventEnvelope model
|
||
|
||
Update `src/activity_core/models.py`:
|
||
|
||
```python
|
||
@dataclass
|
||
class EventEnvelope:
|
||
id: str # UUID v4
|
||
type: str # e.g. "org.repo.registered"
|
||
version: str # e.g. "1.0"
|
||
timestamp: datetime
|
||
publisher: str # e.g. "the-custodian/state-hub"
|
||
attributes: dict[str, Any] # typed per event type schema
|
||
|
||
@classmethod
|
||
def from_nats_message(cls, msg: nats.aio.msg.Msg) -> "EventEnvelope": ...
|
||
|
||
@classmethod
|
||
def from_webhook_payload(cls, source: str, payload: dict) -> "EventEnvelope": ...
|
||
```
|
||
|
||
Update `event_router.py` to produce compliant envelopes. Existing NATS message
|
||
format may require a migration shim if the current format omits `id`, `version`,
|
||
or `publisher`.
|
||
|
||
---
|
||
|
||
### T41: Event type registry — file scanner, parser, DB model, sync command
|
||
|
||
**File scanner:** glob `event-types/*.md`; parse YAML frontmatter block.
|
||
**Attribute schema:** parse the `## Attributes` markdown table into a dict of
|
||
`{name: {type, required, description}}`. Types map to Python annotations for
|
||
validation.
|
||
|
||
**DB model** (`event_types` table — new Alembic migration):
|
||
|
||
```sql
|
||
CREATE TABLE event_types (
|
||
type_id TEXT PRIMARY KEY, -- e.g. "org.repo.registered"
|
||
version TEXT NOT NULL,
|
||
publisher TEXT NOT NULL,
|
||
governance TEXT NOT NULL DEFAULT 'publisher-declared',
|
||
status TEXT NOT NULL DEFAULT 'active',
|
||
attribute_schema JSONB NOT NULL,
|
||
raw_md TEXT NOT NULL,
|
||
synced_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||
);
|
||
```
|
||
|
||
**Sync command:** `make sync-event-types` (also called at worker startup).
|
||
|
||
**Registry lookup:** `get_event_type(type_id: str) -> EventTypeDef | None`
|
||
— used by Event Router and webhook receiver to validate incoming events.
|
||
|
||
---
|
||
|
||
### T42: Curator gate configuration
|
||
|
||
`ACTIVITY_CURATOR_GATE` env var: `disabled` (default) | `required`.
|
||
|
||
- `disabled`: all event types with `status IN ('active', 'pending')` are
|
||
accepted by the Event Router. `pending` types are logged as a warning.
|
||
- `required`: only `status = 'active'` types are accepted. Events of type
|
||
`pending` are discarded with a `curator_gate_rejected` log entry.
|
||
|
||
Curator approval endpoint: `POST /event-types/{type_id}/approve`
|
||
sets `status = 'active'`. Requires admin auth (same as existing API auth).
|
||
|
||
---
|
||
|
||
### T43: Write first event type definitions
|
||
|
||
Three files under `event-types/`:
|
||
|
||
- `org.repo.registered.md` — publisher: state-hub; attrs: repo_slug (string,
|
||
required), domain (string, required), tags (string[], optional),
|
||
registered_at (datetime, required).
|
||
- `org.workstream.completed.md` — publisher: state-hub; attrs: workstream_id
|
||
(uuid, required), workstream_slug (string, required), domain (string,
|
||
required), completed_at (datetime, required).
|
||
- `org.activity.run.completed.md` — publisher: activity-core; attrs:
|
||
activity_definition_id (uuid, required), run_id (uuid, required),
|
||
tasks_spawned (integer, required), completed_at (datetime, required).
|
||
|
||
Each file follows the full ACT-ADR-002 structure: Intent, When Published,
|
||
Attributes table, Example Payload, Consumer Notes, Debugging.
|
||
|
||
---
|
||
|
||
## Completion Criteria for This Part
|
||
|
||
1. `src/activity_core/rules/` module importable with no upstream deps (`test_boundary.py` passes)
|
||
2. `evaluate_condition` handles all whitelisted operators and rejects all forbidden constructs
|
||
3. `IssueSink` ABC and `IssueCoreRestSink` / `NullSink` implemented
|
||
4. Alembic migrations for `task_spawn_log` and `event_types` tables apply cleanly
|
||
5. `make sync-event-types` loads all three org event type definitions
|
||
6. EventEnvelope model updated and `from_nats_message` works with existing event_router
|
||
|
||
## New Files Produced
|
||
|
||
| Path | Task |
|
||
|---|---|
|
||
| `src/activity_core/rules/__init__.py` | T35 |
|
||
| `src/activity_core/rules/models.py` | T35 |
|
||
| `src/activity_core/rules/evaluator.py` | T36 |
|
||
| `src/activity_core/rules/executor.py` | T37 |
|
||
| `src/activity_core/issue_sink.py` | T39 |
|
||
| `event-types/org.repo.registered.md` | T43 |
|
||
| `event-types/org.workstream.completed.md` | T43 |
|
||
| `event-types/org.activity.run.completed.md` | T43 |
|
||
|
||
## Modified Files
|
||
|
||
| Path | Task | Change |
|
||
|---|---|---|
|
||
| `src/activity_core/models.py` | T34, T40 | RuleDef, InstructionDef, EventEnvelope update |
|
||
| `src/activity_core/event_router.py` | T40 | Produce compliant EventEnvelope |
|
||
| `alembic/versions/` | T38, T41 | Two new migrations |
|
||
| `src/activity_core/api.py` | T42 | Curator approve endpoint |
|
||
| `src/activity_core/worker.py` | T41 | sync-event-types at startup |
|
||
| `Makefile` | T41 | sync-event-types target |
|
||
|
||
## Change History
|
||
|
||
- v1.0 (2026-05-14): Split from custodian-WP-0003 (phases 7–8).
|