Implement post-triage operational hardening

This commit is contained in:
2026-06-04 12:15:07 +02:00
parent 8a33ec44b6
commit 20d4f26166
11 changed files with 775 additions and 31 deletions

View File

@@ -63,8 +63,8 @@ Omit `workstream_id` / `task_id` when not applicable.
```bash
curl -s -X PATCH "http://127.0.0.1:8000/tasks/<task_id>" \
-H "Content-Type: application/json" \
-d '{"status": "in_progress"}'
# values: todo | in_progress | done | blocked
-d '{"status": "progress"}'
# values: wait | todo | progress | done | cancel
```
### Flag a task for human review
@@ -146,7 +146,7 @@ derived health labels, not frontmatter statuses.
` ` `task
id: ACTIVITY-WP-NNNN-T01
status: todo | in_progress | done | blocked
status: wait | todo | progress | done | cancel
priority: high | medium | low
state_hub_task_id: "<uuid>" # written by fix-consistency — do not edit
` ` `
@@ -154,7 +154,10 @@ state_hub_task_id: "<uuid>" # written by fix-consistency — do not edit
Task description text.
```
Status progression: `todo` → `in_progress` → `done` (or `blocked`)
Status progression: `todo` → `progress` → `done`; use `wait` for a task
blocked on external input and `cancel` for intentionally abandoned work.
Workstream/workplan lifecycle status is separate; frontmatter `blocked` remains
valid there.
To create a new workplan:
1. Write the file following the format above

View File

@@ -101,17 +101,58 @@ A Rule's action block specifies:
```yaml
action:
task_template: tasks/{template-slug}.md # required
target_repo: event.attributes.repo_slug # expression — attribute access only
priority: high # high | medium | low | literal
labels: ["onboarding", "security"] # literal list
due_in_days: 7 # optional, integer literal
task_template: "Run SBOM rescan for {context.repo.repo_slug}"
target_repo: context.repo.repo_slug
priority: medium
labels: ["sbom", "security", "{context.repo.repo_slug}"]
due_in_days: 7
```
`target_repo` and similar fields accept simple attribute access expressions
(no boolean logic — just path traversal). This allows dynamic routing to the
correct issue-core instance without arbitrary expression evaluation in action
fields.
`action.task_template` is the emitted task title template. It is not a path to a
repo-local file. Older design notes and the legacy `tasks/*.md` directory use
"task template" for materialized task-body templates; that is a separate legacy
surface. To avoid surprise, new rule actions should treat `task_template` as
`title_template` semantics until the field can be renamed in a schema-breaking
revision.
Action fields accept two deterministic rendering forms:
- Whole-field paths: if the whole string is a path like
`context.repo.repo_slug` or `event.attributes.repo_slug`, the rendered value
keeps the original scalar/list/object shape from that path. This is the
correct form for `target_repo` and other fields that should not become prose.
- Scalar placeholders: strings may include `{context.foo}` or `{event.foo}`
placeholders. Each placeholder must resolve to a scalar. Lists and objects are
rejected rather than stringified, which prevents accidental JSON blobs or
untrusted text from being embedded into task titles.
Unsafe action cases are rejected:
- Any action path outside `context.*` or `event.*`.
- Any path containing calls, indexing, arithmetic, filters, or boolean logic.
- Placeholder values that resolve to lists or objects.
- `for_each` values that are not a whole-field `context.*` or `event.*` path to
a list.
- `bind_as` names that are not simple identifiers.
Per-item rule expansion is explicit:
```yaml
for_each: context.repos.repos
bind_as: repo
condition: 'context.repo.sbom_age_days > 30'
action:
task_template: Run SBOM rescan for {context.repo.repo_slug}
target_repo: context.repo.repo_slug
priority: medium
labels: ["sbom", "security", "automated"]
```
The weekly SBOM staleness definition is the canonical pattern. The State Hub
bulk resolver exposes all repository entries at `context.repos.repos`, the rule
binds each item as `context.repo`, and the strict staleness definition is
`context.repo.sbom_age_days > 30`. Thirty days exactly is not stale; thirty-one
days is stale.
#### Evaluation semantics

View File

@@ -0,0 +1,70 @@
# Issue-Core Emission Boundary
activity-core owns the decision to spawn a task and the audit trail that says
why it spawned. It does not own downstream task lifecycle state after emission.
## Current authoritative endpoint
The current authoritative boundary is the issue-core REST API:
```text
POST {ISSUE_CORE_URL}/issues/
```
`IssueCoreRestSink` sends this payload:
```json
{
"title": "Run SBOM rescan for activity-core",
"description": "",
"target_repo": "activity-core",
"priority": "medium",
"labels": ["sbom", "security", "automated"],
"due_in_days": null,
"source_type": "rule",
"source_id": "flag-stale-sbom",
"triggering_event_id": "event-or-schedule-key",
"activity_definition_id": "activity-definition-uuid"
}
```
The expected response contains `issue_id` and may include `issue_url` and
`backend`. activity-core stores only the returned task reference in
`task_spawn_log`; issue-core remains authoritative for task status, assignment,
comments, closure, and cancellation.
## REST versus NATS
Keep REST as the active emission contract until issue-core publishes and owns a
durable NATS consumer for task-creation commands. NATS is still appropriate for
event intake into activity-core, but task creation needs an acknowledged,
idempotent command boundary. A future NATS sink must return or later correlate a
task reference before it can replace `IssueCoreRestSink`.
## Safe operating modes
- `ISSUE_SINK_TYPE=null`: dry-run/audit mode. Task specs are rendered and the
workflow records synthetic `null-*` references. This is the current Railiance
production setting.
- `ISSUE_SINK_TYPE=rest`: live task creation. Sink failures raise out of
`emit_tasks`, so Temporal retries and the workflow history make failures
visible.
Weekly SBOM staleness is safe to evaluate in dry-run mode because the rule
contract is deterministic and tested. Do not enable it against the real REST sink
until issue-core credentials, endpoint reachability, and duplicate-handling are
verified in the target environment.
## Verification
Local contract tests cover the rendered weekly SBOM task path and the REST
payload shape:
```bash
uv run pytest tests/test_integration_event_bridge.py tests/test_issue_sink.py
```
For a live environment, run with `ISSUE_SINK_TYPE=null` first and confirm
`task_spawn_log` contains the expected source id, condition, triggering event id,
and synthetic task reference. Then switch to `ISSUE_SINK_TYPE=rest` only after a
single known-safe rule match creates one issue-core task with the same fields.

View File

@@ -147,6 +147,55 @@ docker exec temporal-admin-tools temporal workflow list \
---
## Daily State Hub WSJF triage verification
Use this when answering: "did today's daily triage run happen?"
Set the ActivityDefinition id when known. If it is not known, pass the
definition name used in the environment and let the live helper resolve it from
Postgres.
```bash
export DAILY_TRIAGE_ACTIVITY_ID=<daily-triage-activity-definition-uuid>
# Dry-run checklist; safe from any shell because it only prints checks.
uv run python scripts/verify_daily_triage.py \
--activity-id "$DAILY_TRIAGE_ACTIVITY_ID" \
--date "$(date -u +%F)"
# Live check from a shell with Temporal, DB, State Hub, and working-memory access.
ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \
TEMPORAL_HOST=localhost:7233 \
STATE_HUB_URL=http://127.0.0.1:8000 \
uv run python scripts/verify_daily_triage.py \
--activity-id "$DAILY_TRIAGE_ACTIVITY_ID" \
--working-memory-dir /home/worsch/the-custodian/working-memory \
--live
```
The verification is complete when all of these agree:
- Temporal schedule `activity-schedule-$DAILY_TRIAGE_ACTIVITY_ID` exists, is not
paused, and uses the `skip` overlap policy.
- The latest workflow found with `ActivityId="$DAILY_TRIAGE_ACTIVITY_ID"` either
completed or is visibly retrying a failed activity in history.
- `activity_runs` has a row for the daily triage ActivityDefinition with today's
`scheduled_for` or `fired_at` date.
- State Hub `/progress/` contains a `daily_triage` event whose detail includes
the same `activity_core_run_id`.
- The working-memory sink wrote `daily-triage-YYYY-MM-DD-<run>.md` and its
frontmatter contains the same `activity_core_run_id`.
- The ActivityDefinition's instruction model, token budget, and sink timeouts fit
under `ACTIVITY_TIMEOUT_SECONDS` (default 900 seconds). Temporal retries each
activity up to 10 attempts, so a slow LLM or sink failure should show as
workflow retry history rather than a silent missing report.
Expected missed-run behavior: the daily triage definition should use
`misfire_policy: skip`. Planned downtime does not catch up missed daily reports;
the next scheduled fire is the next authoritative run.
---
## Scale-out
### Multiple worker replicas
@@ -204,6 +253,44 @@ Set the environment variable before running the worker.
2. `curl http://localhost:9090/metrics` should return Temporal SDK metrics.
3. If port 9090 conflicts with Prometheus server, set `PROMETHEUS_BIND_ADDR=0.0.0.0:9091`.
### Production alerting and failure modes
Kubernetes health expectations:
```bash
kubectl -n activity-core get deploy actcore-worker actcore-api actcore-event-router
kubectl -n activity-core get pods -l app.kubernetes.io/part-of=activity-core
kubectl -n activity-core port-forward svc/actcore-worker-metrics 9090:9090
curl -sf http://127.0.0.1:9090/metrics
```
Page an operator when:
- `actcore-worker` has no ready pod, cannot connect to Temporal, or cannot reach
Postgres.
- The daily triage schedule is missing or paused outside an approved maintenance
window.
- The expected daily triage run is absent from Temporal and `activity_runs`
after the retry window.
- Both State Hub progress and working-memory report sinks are missing for a
completed run.
- Report sink or task emission failures repeat across Temporal retries.
Leave a State Hub progress note, but do not page, when:
- A planned outage caused one skipped run and the schedule is healthy again.
- A sink idempotency check reports `exists` for the expected run id.
- The report completed but calibration feedback says the recommendations were
noisy, too long, or under-sensitive.
Handle in the next operator session:
- Prompt/schema tuning, loose-end sensitivity, and stale-but-parked work
calibration.
- Non-urgent schedule jitter or timeout adjustments.
- Moving a task sink from `ISSUE_SINK_TYPE=null` to the real issue-core endpoint
after a dry-run contract check has passed.
### DB migration drift
```bash
uv run alembic current # show current revision

View File

@@ -0,0 +1,321 @@
#!/usr/bin/env python3
"""Verify the daily State Hub triage activity run.
The default mode is ``--dry-run`` so operators can see the exact checks without
needing live Temporal, Postgres, or State Hub access from the current shell.
Pass ``--live`` to run the cheap checks directly.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from uuid import UUID
DEFAULT_ACTIVITY_NAME = "Daily State Hub WSJF Triage"
DEFAULT_PROGRESS_EVENT_TYPE = "daily_triage"
DEFAULT_TEMPORAL_HOST = "localhost:7233"
DEFAULT_TEMPORAL_NAMESPACE = "default"
DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
DEFAULT_WORKING_MEMORY_DIR = "/home/worsch/the-custodian/working-memory"
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Verify whether today's daily State Hub triage run happened.",
)
parser.add_argument("--activity-id", default=os.environ.get("DAILY_TRIAGE_ACTIVITY_ID"))
parser.add_argument("--activity-name", default=os.environ.get(
"DAILY_TRIAGE_ACTIVITY_NAME",
DEFAULT_ACTIVITY_NAME,
))
parser.add_argument("--db-url", default=os.environ.get("ACTCORE_DB_URL"))
parser.add_argument("--temporal-host", default=os.environ.get(
"TEMPORAL_HOST",
DEFAULT_TEMPORAL_HOST,
))
parser.add_argument("--temporal-namespace", default=os.environ.get(
"TEMPORAL_NAMESPACE",
DEFAULT_TEMPORAL_NAMESPACE,
))
parser.add_argument("--state-hub-url", default=os.environ.get(
"STATE_HUB_URL",
DEFAULT_STATE_HUB_URL,
))
parser.add_argument("--progress-event-type", default=DEFAULT_PROGRESS_EVENT_TYPE)
parser.add_argument("--working-memory-dir", default=os.environ.get(
"DAILY_TRIAGE_WORKING_MEMORY_DIR",
DEFAULT_WORKING_MEMORY_DIR,
))
parser.add_argument(
"--date",
default=datetime.now(timezone.utc).date().isoformat(),
help="Local report date to check, formatted YYYY-MM-DD.",
)
parser.add_argument(
"--live",
action="store_true",
help="Run live checks. Without this flag the script prints a dry-run checklist.",
)
return parser.parse_args(argv)
def build_dry_run_report(args: argparse.Namespace) -> dict[str, Any]:
activity_ref = args.activity_id or (
f'<activity id for ActivityDefinition named "{args.activity_name}">'
)
schedule_id = f"activity-schedule-{activity_ref}"
db_filter = (
f"activity_runs.activity_id = '{args.activity_id}'"
if args.activity_id
else f"activity_definitions.name = '{args.activity_name}'"
)
activity_def_filter = (
f"id = '{args.activity_id}'"
if args.activity_id
else f"name = '{args.activity_name}'"
)
return {
"mode": "dry-run",
"generated_at": datetime.now(timezone.utc).isoformat(),
"activity": {
"id": args.activity_id,
"name": args.activity_name,
"schedule_id": schedule_id,
},
"checks": [
{
"name": "temporal_schedule",
"expect": "Schedule exists, is not paused, and uses SKIP overlap for misfire_policy=skip.",
"command": (
"temporal schedule describe "
f"--schedule-id {schedule_id} "
f"--address {args.temporal_host} "
f"--namespace {args.temporal_namespace}"
),
},
{
"name": "latest_workflow_history",
"expect": "Latest workflow has ActivityId search attribute and completed or is retrying visibly.",
"command": (
"temporal workflow list "
f"--query 'ActivityId=\"{activity_ref}\"' "
f"--address {args.temporal_host} "
f"--namespace {args.temporal_namespace}"
),
},
{
"name": "activity_runs_row",
"expect": "Latest activity_runs row exists for today's scheduled_for or fired_at date.",
"sql": (
"select run_id, scheduled_for, fired_at, tasks_spawned, version_used "
"from activity_runs join activity_definitions on "
"activity_runs.activity_id = activity_definitions.id "
f"where {db_filter} "
"order by fired_at desc limit 5;"
),
},
{
"name": "state_hub_progress",
"expect": f"State Hub progress contains event_type={args.progress_event_type!r} with this run id.",
"command": (
f"curl -s {args.state_hub_url.rstrip('/')}/progress/?limit=100"
),
},
{
"name": "working_memory_note",
"expect": "A daily-triage note exists and its frontmatter carries activity_core_run_id.",
"path_glob": str(
Path(args.working_memory_dir)
/ f"daily-triage-{args.date}-*.md"
),
},
{
"name": "llm_timeout_budget",
"expect": "Instruction model/max_tokens fit within ACTIVITY_TIMEOUT_SECONDS and Temporal retries.",
"sql": (
"select name, instructions_json, version from activity_definitions "
f"where {activity_def_filter};"
),
"activity_timeout_seconds": int(os.environ.get(
"ACTIVITY_TIMEOUT_SECONDS",
"900",
)),
"retry_attempts": 10,
},
],
}
async def build_live_report(args: argparse.Namespace) -> dict[str, Any]:
if not args.db_url:
raise RuntimeError("ACTCORE_DB_URL or --db-url is required for --live")
activity = await _resolve_activity(args)
activity_id = str(activity["id"])
args.activity_id = activity_id
dry = build_dry_run_report(args)
dry["mode"] = "live"
dry["results"] = {
"activity_definition": _json_ready(activity),
"temporal": await _check_temporal(args, activity_id),
"activity_runs": await _latest_activity_runs(args, activity_id),
"state_hub_progress": await _state_hub_progress(args),
"working_memory_notes": _working_memory_notes(args),
}
return dry
async def _resolve_activity(args: argparse.Namespace) -> dict[str, Any]:
from sqlalchemy import text
from activity_core.db import make_engine
engine = make_engine(args.db_url)
try:
async with engine.connect() as conn:
if args.activity_id:
result = await conn.execute(
text(
"select id, name, enabled, trigger_config, instructions_json, version "
"from activity_definitions where id = :activity_id"
),
{"activity_id": args.activity_id},
)
else:
result = await conn.execute(
text(
"select id, name, enabled, trigger_config, instructions_json, version "
"from activity_definitions where name = :activity_name "
"order by updated_at desc limit 1"
),
{"activity_name": args.activity_name},
)
row = result.mappings().first()
if row is None:
raise RuntimeError("daily triage ActivityDefinition was not found")
return dict(row)
finally:
await engine.dispose()
async def _latest_activity_runs(
args: argparse.Namespace,
activity_id: str,
) -> list[dict[str, Any]]:
from sqlalchemy import text
from activity_core.db import make_engine
engine = make_engine(args.db_url)
try:
async with engine.connect() as conn:
result = await conn.execute(
text(
"select run_id, scheduled_for, fired_at, tasks_spawned, version_used "
"from activity_runs where activity_id = :activity_id "
"order by fired_at desc limit 5"
),
{"activity_id": activity_id},
)
return [_json_ready(dict(row)) for row in result.mappings().all()]
finally:
await engine.dispose()
async def _check_temporal(args: argparse.Namespace, activity_id: str) -> dict[str, Any]:
from temporalio.client import Client
schedule_id = f"activity-schedule-{activity_id}"
client = await Client.connect(
args.temporal_host,
namespace=args.temporal_namespace,
)
handle = client.get_schedule_handle(schedule_id)
schedule = await handle.describe()
workflows = []
query = f'ActivityId="{activity_id}"'
async for item in client.list_workflows(query=query):
workflows.append({
"id": item.id,
"run_id": item.run_id,
"status": str(item.status),
"start_time": _iso(getattr(item, "start_time", None)),
"close_time": _iso(getattr(item, "close_time", None)),
})
if len(workflows) >= 5:
break
state = getattr(schedule.schedule, "state", None)
policy = getattr(schedule.schedule, "policy", None)
return {
"schedule_id": schedule_id,
"paused": getattr(state, "paused", None),
"overlap_policy": str(getattr(policy, "overlap", "")),
"latest_workflows": workflows,
}
async def _state_hub_progress(args: argparse.Namespace) -> list[dict[str, Any]]:
import httpx
base = args.state_hub_url.rstrip("/")
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{base}/progress/", params={"limit": 100})
response.raise_for_status()
items = response.json()
if not isinstance(items, list):
return []
return [
_json_ready(item)
for item in items
if item.get("event_type") == args.progress_event_type
][:5]
def _working_memory_notes(args: argparse.Namespace) -> list[str]:
directory = Path(args.working_memory_dir)
pattern = f"daily-triage-{args.date}-*.md"
if not directory.exists():
return []
return [str(path) for path in sorted(directory.glob(pattern))]
def _json_ready(value: Any) -> Any:
if isinstance(value, dict):
return {key: _json_ready(item) for key, item in value.items()}
if isinstance(value, list):
return [_json_ready(item) for item in value]
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, UUID):
return str(value)
return value
def _iso(value: Any) -> str | None:
return value.isoformat() if hasattr(value, "isoformat") else None
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
try:
if args.live:
report = asyncio.run(build_live_report(args))
else:
report = build_dry_run_report(args)
except Exception as exc:
print(f"verify_daily_triage: {exc}", file=sys.stderr)
return 2
print(json.dumps(report, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -377,6 +377,7 @@ async def emit_tasks(payload: dict) -> list[str]:
Session = _get_session_factory()
refs: list[str] = []
errors: list[str] = []
async with Session() as session:
async with session.begin():
for spec_dict in task_specs_raw:
@@ -411,6 +412,11 @@ async def emit_tasks(payload: dict) -> list[str]:
)
session.add(log_row)
except Exception as exc:
message = f"{spec.source_type}:{spec.source_id}: {exc}"
errors.append(message)
activity.logger.warning("emit_tasks: sink.emit failed — %s", exc)
if errors:
raise RuntimeError(f"task emission sink failure: {errors!r}")
return refs

View File

@@ -31,7 +31,7 @@ from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, Cont
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
_TIMEOUT_SECONDS = 10.0
_OPEN_WORKSTREAM_STATUSES = {"active", "ready", "blocked"}
_OPEN_TASK_STATUSES = {"todo", "in_progress", "blocked"}
_OPEN_TASK_STATUSES = {"wait", "todo", "progress"}
# Sentinel age for repos that have never had an SBOM ingested. Large enough
# that any threshold-based staleness rule treats them as "very stale" without
# forcing the rule expression to special-case None.
@@ -260,7 +260,7 @@ def _daily_triage_digest(params: dict[str, Any]) -> str:
"status",
"open_task_counts",
"needs_human_count",
"blocked_task_count",
"wait_task_count",
"workplan_health_labels",
],
},
@@ -311,14 +311,14 @@ def _open_workstream_digest(
def _task_counts(tasks: list[dict[str, Any]]) -> dict[str, int]:
counts = {"todo": 0, "in_progress": 0, "blocked": 0, "needs_human": 0}
counts = {"wait": 0, "todo": 0, "progress": 0, "needs_human": 0}
for task in tasks:
status = task.get("status")
if status in counts:
counts[status] += 1
if task.get("needs_human"):
counts["needs_human"] += 1
counts["open_total"] = counts["todo"] + counts["in_progress"] + counts["blocked"]
counts["open_total"] = counts["wait"] + counts["todo"] + counts["progress"]
return counts
@@ -364,7 +364,7 @@ def _candidate_sort_key(candidate: dict[str, Any]) -> tuple[int, int, int, int]:
_priority_rank(candidate.get("planning_priority")),
0 if candidate.get("status") == "active" else 1,
-int(counts.get("needs_human", 0)),
-int(counts.get("blocked", 0)),
-int(counts.get("wait", 0)),
)

View File

@@ -0,0 +1,56 @@
from __future__ import annotations
import importlib.util
from pathlib import Path
def _load_script():
path = Path(__file__).parent.parent / "scripts" / "verify_daily_triage.py"
spec = importlib.util.spec_from_file_location("verify_daily_triage", path)
assert spec is not None
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def test_daily_triage_verifier_dry_run_names_all_operator_checks() -> None:
script = _load_script()
args = script.parse_args([
"--activity-id",
"00000000-0000-0000-0000-000000000123",
"--date",
"2026-06-04",
"--working-memory-dir",
"/tmp/wm",
])
report = script.build_dry_run_report(args)
assert report["mode"] == "dry-run"
names = {check["name"] for check in report["checks"]}
assert names == {
"temporal_schedule",
"latest_workflow_history",
"activity_runs_row",
"state_hub_progress",
"working_memory_note",
"llm_timeout_budget",
}
assert report["activity"]["schedule_id"] == (
"activity-schedule-00000000-0000-0000-0000-000000000123"
)
assert any(
check.get("path_glob") == "/tmp/wm/daily-triage-2026-06-04-*.md"
for check in report["checks"]
)
timeout_check = next(
check for check in report["checks"] if check["name"] == "llm_timeout_budget"
)
run_check = next(
check for check in report["checks"] if check["name"] == "activity_runs_row"
)
assert "activity_runs.activity_id" in run_check["sql"]
assert "where id = '00000000-0000-0000-0000-000000000123'" in timeout_check["sql"]
assert timeout_check["activity_timeout_seconds"] == 900
assert timeout_check["retry_attempts"] == 10

126
tests/test_issue_sink.py Normal file
View File

@@ -0,0 +1,126 @@
from __future__ import annotations
from typing import Any
import httpx
import pytest
from activity_core import activities
from activity_core.issue_sink import IssueCoreRestSink
from activity_core.rules.models import TaskRef, TaskSpec
class DummyResponse:
def __init__(self, payload: dict[str, Any]) -> None:
self.payload = payload
def raise_for_status(self) -> None:
return None
def json(self) -> dict[str, Any]:
return self.payload
def test_issue_core_rest_sink_posts_task_contract(monkeypatch) -> None:
posts: list[dict[str, Any]] = []
def fake_post(url: str, **kwargs: Any) -> DummyResponse:
posts.append({"url": url, **kwargs})
return DummyResponse({
"issue_id": "issue-123",
"issue_url": "http://issue-core.test/issues/issue-123",
"backend": "issue-core",
})
monkeypatch.setattr(httpx, "post", fake_post)
ref = IssueCoreRestSink("http://issue-core.test/").emit(TaskSpec(
title="Run SBOM rescan for activity-core",
description="SBOM is older than 30 days.",
target_repo="activity-core",
priority="medium",
labels=["sbom", "security", "automated"],
due_in_days=7,
source_type="rule",
source_id="flag-stale-sbom",
triggering_event_id="scheduled",
activity_definition_id="activity-1",
))
assert ref == TaskRef(
external_id="issue-123",
backend_url="http://issue-core.test/issues/issue-123",
backend="issue-core",
)
assert posts == [
{
"url": "http://issue-core.test/issues/",
"json": {
"title": "Run SBOM rescan for activity-core",
"description": "SBOM is older than 30 days.",
"target_repo": "activity-core",
"priority": "medium",
"labels": ["sbom", "security", "automated"],
"due_in_days": 7,
"source_type": "rule",
"source_id": "flag-stale-sbom",
"triggering_event_id": "scheduled",
"activity_definition_id": "activity-1",
},
"timeout": 10.0,
}
]
@pytest.mark.asyncio
async def test_emit_tasks_raises_when_sink_fails(monkeypatch) -> None:
class FailingSink:
def emit(self, task_spec: TaskSpec) -> TaskRef:
raise RuntimeError(f"boom for {task_spec.title}")
class FakeTransaction:
async def __aenter__(self) -> None:
return None
async def __aexit__(self, *exc_info: object) -> bool:
return False
class FakeSession:
def begin(self) -> FakeTransaction:
return FakeTransaction()
async def __aenter__(self) -> "FakeSession":
return self
async def __aexit__(self, *exc_info: object) -> bool:
return False
def add(self, row: object) -> None:
raise AssertionError("failed emissions should not write spawn logs")
class FakeSessionFactory:
def __call__(self) -> FakeSession:
return FakeSession()
monkeypatch.setattr(activities, "get_issue_sink", lambda: FailingSink())
monkeypatch.setattr(activities, "_get_session_factory", lambda: FakeSessionFactory())
with pytest.raises(RuntimeError, match="task emission sink failure"):
await activities.emit_tasks({
"activity_id": "00000000-0000-0000-0000-000000000001",
"triggering_event_id": "scheduled",
"run_id": "00000000-0000-0000-0000-000000000002",
"task_specs": [
{
"title": "Run SBOM rescan for activity-core",
"description": "",
"target_repo": "activity-core",
"priority": "medium",
"labels": ["sbom"],
"due_in_days": None,
"source_type": "rule",
"source_id": "flag-stale-sbom",
"condition": "context.repo.sbom_age_days > 30",
}
],
})

View File

@@ -235,7 +235,7 @@ def test_daily_triage_digest_is_curated_scalar_json(monkeypatch) -> None:
payloads = {
"/state/summary": {
"generated_at": "2026-05-19T05:20:00Z",
"totals": {"tasks": {"todo": 4, "blocked": 1}},
"totals": {"tasks": {"todo": 4, "wait": 1}},
"topics": [
{
"slug": "custodian",
@@ -306,7 +306,7 @@ def test_daily_triage_digest_is_curated_scalar_json(monkeypatch) -> None:
{
"id": "task-2",
"title": "T06 - Canary Cutover",
"status": "blocked",
"status": "wait",
"priority": "medium",
"needs_human": True,
},
@@ -331,13 +331,13 @@ def test_daily_triage_digest_is_curated_scalar_json(monkeypatch) -> None:
import json
digest = json.loads(raw_digest)
assert digest["totals"] == {"tasks": {"todo": 4, "blocked": 1}}
assert digest["totals"] == {"tasks": {"todo": 4, "wait": 1}}
assert digest["open_workstreams"][0]["slug"] == "cust-wp-0045"
assert digest["open_workstreams"][0]["planning_priority"] == "high"
assert digest["open_workstreams"][0]["open_task_counts"] == {
"wait": 1,
"todo": 1,
"in_progress": 0,
"blocked": 1,
"progress": 0,
"needs_human": 1,
"open_total": 2,
}

View File

@@ -4,11 +4,11 @@ type: workplan
title: "Post-triage operational hardening"
domain: custodian
repo: activity-core
status: ready
status: active
owner: codex
topic_slug: custodian
created: "2026-06-03"
updated: "2026-06-03"
updated: "2026-06-04"
state_hub_workstream_id: "5646e13a-13af-4724-bca6-3c0d86f96733"
---
@@ -31,7 +31,7 @@ task lifecycle database, a project planner, or an execution worker.
```task
id: ACTIVITY-WP-0006-T01
status: todo
status: done
priority: high
state_hub_task_id: "5d79e3da-d26d-4cad-9cdf-5e5264bb7019"
```
@@ -50,11 +50,17 @@ Scope:
Done when the full test suite passes and activity-core no longer depends on
legacy task-status aliases for State Hub API clients or tests.
2026-06-04: Completed. `AGENTS.md` now uses State Hub task statuses
`wait`, `todo`, `progress`, `done`, and `cancel`; workplan/workstream lifecycle
`blocked` remains separate. The State Hub daily triage digest now counts
`wait/todo/progress` open tasks and no longer fixtures task-level
`in_progress` or `blocked`. Full suite passed: 128 passed, 1 skipped.
## Daily Triage Observability Runbook
```task
id: ACTIVITY-WP-0006-T02
status: todo
status: done
priority: high
state_hub_task_id: "02c34443-0e8d-4f1a-93d9-6c39f07faad7"
```
@@ -73,11 +79,16 @@ The operator should be able to check:
Done when `docs/runbook.md` has a concise daily-triage verification section
and any helper command/script is covered by tests or a dry-run path.
2026-06-04: Completed. Added `scripts/verify_daily_triage.py` with dry-run and
live modes, plus `tests/test_daily_triage_verifier.py`. `docs/runbook.md` now
covers Temporal schedule/workflow checks, `activity_runs`, State Hub progress,
working-memory notes, missed-run `skip` behavior, and LLM timeout budget.
## Three-Run Calibration Feedback
```task
id: ACTIVITY-WP-0006-T03
status: todo
status: wait
priority: medium
state_hub_task_id: "7cbf0a35-71a1-47ac-afc2-f51ad2180fd0"
```
@@ -96,11 +107,16 @@ Done when the calibration result is recorded in State Hub and the related
`CUST-WP-0044` / `CUST-WP-0045` tasks can close based on activity-core runs,
not Codex app fallback runs.
2026-06-04: Waiting on real evidence. The repo now has a verification path for
scheduled daily triage runs, but this task still requires three consecutive
actual activity-core scheduled runs and State Hub calibration feedback. Local
tests cannot substitute for that operational evidence.
## Rule Action Contract Documentation
```task
id: ACTIVITY-WP-0006-T04
status: todo
status: done
priority: medium
state_hub_task_id: "c9066d2e-0429-4e14-a68a-8418061ffd8d"
```
@@ -116,11 +132,16 @@ Also decide and document the naming/semantics mismatch around
Done when ADR-003 or a focused follow-up doc contains examples, unsafe cases,
and the weekly SBOM staleness definition is cited as the canonical pattern.
2026-06-04: Completed. Updated ADR-003 with whole-field path rendering,
scalar placeholder rendering, unsafe action cases, explicit `for_each` /
`bind_as` expansion, the `task_template` naming mismatch, and weekly SBOM
staleness as the canonical per-item pattern.
## Production Alerting And Failure Modes
```task
id: ACTIVITY-WP-0006-T05
status: todo
status: done
priority: medium
state_hub_task_id: "420ea629-0c20-4d09-9cc1-6b2f32665161"
```
@@ -139,11 +160,17 @@ Cover:
Done when the runbook and metrics/health surface make ordinary failures visible
without inspecting a Codex Desktop session.
2026-06-04: Completed. `docs/runbook.md` now documents Kubernetes worker/API/
router health checks, Temporal schedule paused/missing checks, report sink
failure behavior, LLM timeout/retry behavior, and page/note/next-session
classification. Task emission sink failures now raise from `emit_tasks`, making
them visible to Temporal retries instead of warning-only logs.
## Issue-Core Emission Boundary Verification
```task
id: ACTIVITY-WP-0006-T06
status: todo
status: done
priority: medium
state_hub_task_id: "78089aef-aba1-42d7-a203-ef80ba6791d9"
```
@@ -163,6 +190,13 @@ Done when there is a tested or dry-run-verified path from a rule match to a
downstream task reference, and activity-core still owns only the spawn audit
trail, not task lifecycle state.
2026-06-04: Completed. Added `docs/issue-core-emission-boundary.md` documenting
REST `/issues/` as the current authoritative endpoint, NATS as future work,
Railiance `ISSUE_SINK_TYPE=null` dry-run mode, and the fields sent to
issue-core versus retained in `task_spawn_log`. Added REST payload and sink
failure tests in `tests/test_issue_sink.py`; the existing weekly SBOM integration
test remains the dry-run rule-match-to-task-reference proof.
## Completion Criteria
- State Hub task-status canon adaptation is complete.