From 20d4f261666afe4448ff6575468a3855324dcc81 Mon Sep 17 00:00:00 2001 From: tegwick Date: Thu, 4 Jun 2026 12:15:07 +0200 Subject: [PATCH] Implement post-triage operational hardening --- AGENTS.md | 11 +- docs/adr/adr-003-rule-instruction-model.md | 59 +++- docs/issue-core-emission-boundary.md | 70 ++++ docs/runbook.md | 87 +++++ scripts/verify_daily_triage.py | 321 ++++++++++++++++++ src/activity_core/activities.py | 6 + .../context_resolvers/state_hub.py | 10 +- tests/test_daily_triage_verifier.py | 56 +++ tests/test_issue_sink.py | 126 +++++++ tests/test_state_hub_context_resolver.py | 10 +- ...-0006-post-triage-operational-hardening.md | 50 ++- 11 files changed, 775 insertions(+), 31 deletions(-) create mode 100644 docs/issue-core-emission-boundary.md create mode 100644 scripts/verify_daily_triage.py create mode 100644 tests/test_daily_triage_verifier.py create mode 100644 tests/test_issue_sink.py diff --git a/AGENTS.md b/AGENTS.md index ef5eef1..84a82ee 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -63,8 +63,8 @@ Omit `workstream_id` / `task_id` when not applicable. ```bash curl -s -X PATCH "http://127.0.0.1:8000/tasks/" \ -H "Content-Type: application/json" \ - -d '{"status": "in_progress"}' -# values: todo | in_progress | done | blocked + -d '{"status": "progress"}' +# values: wait | todo | progress | done | cancel ``` ### Flag a task for human review @@ -146,7 +146,7 @@ derived health labels, not frontmatter statuses. ` ` `task id: ACTIVITY-WP-NNNN-T01 -status: todo | in_progress | done | blocked +status: wait | todo | progress | done | cancel priority: high | medium | low state_hub_task_id: "" # written by fix-consistency — do not edit ` ` ` @@ -154,7 +154,10 @@ state_hub_task_id: "" # written by fix-consistency — do not edit Task description text. ``` -Status progression: `todo` → `in_progress` → `done` (or `blocked`) +Status progression: `todo` → `progress` → `done`; use `wait` for a task +blocked on external input and `cancel` for intentionally abandoned work. +Workstream/workplan lifecycle status is separate; frontmatter `blocked` remains +valid there. To create a new workplan: 1. Write the file following the format above diff --git a/docs/adr/adr-003-rule-instruction-model.md b/docs/adr/adr-003-rule-instruction-model.md index bd6370e..5b3750f 100644 --- a/docs/adr/adr-003-rule-instruction-model.md +++ b/docs/adr/adr-003-rule-instruction-model.md @@ -101,17 +101,58 @@ A Rule's action block specifies: ```yaml action: - task_template: tasks/{template-slug}.md # required - target_repo: event.attributes.repo_slug # expression — attribute access only - priority: high # high | medium | low | literal - labels: ["onboarding", "security"] # literal list - due_in_days: 7 # optional, integer literal + task_template: "Run SBOM rescan for {context.repo.repo_slug}" + target_repo: context.repo.repo_slug + priority: medium + labels: ["sbom", "security", "{context.repo.repo_slug}"] + due_in_days: 7 ``` -`target_repo` and similar fields accept simple attribute access expressions -(no boolean logic — just path traversal). This allows dynamic routing to the -correct issue-core instance without arbitrary expression evaluation in action -fields. +`action.task_template` is the emitted task title template. It is not a path to a +repo-local file. Older design notes and the legacy `tasks/*.md` directory use +"task template" for materialized task-body templates; that is a separate legacy +surface. To avoid surprise, new rule actions should treat `task_template` as +`title_template` semantics until the field can be renamed in a schema-breaking +revision. + +Action fields accept two deterministic rendering forms: + +- Whole-field paths: if the whole string is a path like + `context.repo.repo_slug` or `event.attributes.repo_slug`, the rendered value + keeps the original scalar/list/object shape from that path. This is the + correct form for `target_repo` and other fields that should not become prose. +- Scalar placeholders: strings may include `{context.foo}` or `{event.foo}` + placeholders. Each placeholder must resolve to a scalar. Lists and objects are + rejected rather than stringified, which prevents accidental JSON blobs or + untrusted text from being embedded into task titles. + +Unsafe action cases are rejected: + +- Any action path outside `context.*` or `event.*`. +- Any path containing calls, indexing, arithmetic, filters, or boolean logic. +- Placeholder values that resolve to lists or objects. +- `for_each` values that are not a whole-field `context.*` or `event.*` path to + a list. +- `bind_as` names that are not simple identifiers. + +Per-item rule expansion is explicit: + +```yaml +for_each: context.repos.repos +bind_as: repo +condition: 'context.repo.sbom_age_days > 30' +action: + task_template: Run SBOM rescan for {context.repo.repo_slug} + target_repo: context.repo.repo_slug + priority: medium + labels: ["sbom", "security", "automated"] +``` + +The weekly SBOM staleness definition is the canonical pattern. The State Hub +bulk resolver exposes all repository entries at `context.repos.repos`, the rule +binds each item as `context.repo`, and the strict staleness definition is +`context.repo.sbom_age_days > 30`. Thirty days exactly is not stale; thirty-one +days is stale. #### Evaluation semantics diff --git a/docs/issue-core-emission-boundary.md b/docs/issue-core-emission-boundary.md new file mode 100644 index 0000000..e2bdb6e --- /dev/null +++ b/docs/issue-core-emission-boundary.md @@ -0,0 +1,70 @@ +# Issue-Core Emission Boundary + +activity-core owns the decision to spawn a task and the audit trail that says +why it spawned. It does not own downstream task lifecycle state after emission. + +## Current authoritative endpoint + +The current authoritative boundary is the issue-core REST API: + +```text +POST {ISSUE_CORE_URL}/issues/ +``` + +`IssueCoreRestSink` sends this payload: + +```json +{ + "title": "Run SBOM rescan for activity-core", + "description": "", + "target_repo": "activity-core", + "priority": "medium", + "labels": ["sbom", "security", "automated"], + "due_in_days": null, + "source_type": "rule", + "source_id": "flag-stale-sbom", + "triggering_event_id": "event-or-schedule-key", + "activity_definition_id": "activity-definition-uuid" +} +``` + +The expected response contains `issue_id` and may include `issue_url` and +`backend`. activity-core stores only the returned task reference in +`task_spawn_log`; issue-core remains authoritative for task status, assignment, +comments, closure, and cancellation. + +## REST versus NATS + +Keep REST as the active emission contract until issue-core publishes and owns a +durable NATS consumer for task-creation commands. NATS is still appropriate for +event intake into activity-core, but task creation needs an acknowledged, +idempotent command boundary. A future NATS sink must return or later correlate a +task reference before it can replace `IssueCoreRestSink`. + +## Safe operating modes + +- `ISSUE_SINK_TYPE=null`: dry-run/audit mode. Task specs are rendered and the + workflow records synthetic `null-*` references. This is the current Railiance + production setting. +- `ISSUE_SINK_TYPE=rest`: live task creation. Sink failures raise out of + `emit_tasks`, so Temporal retries and the workflow history make failures + visible. + +Weekly SBOM staleness is safe to evaluate in dry-run mode because the rule +contract is deterministic and tested. Do not enable it against the real REST sink +until issue-core credentials, endpoint reachability, and duplicate-handling are +verified in the target environment. + +## Verification + +Local contract tests cover the rendered weekly SBOM task path and the REST +payload shape: + +```bash +uv run pytest tests/test_integration_event_bridge.py tests/test_issue_sink.py +``` + +For a live environment, run with `ISSUE_SINK_TYPE=null` first and confirm +`task_spawn_log` contains the expected source id, condition, triggering event id, +and synthetic task reference. Then switch to `ISSUE_SINK_TYPE=rest` only after a +single known-safe rule match creates one issue-core task with the same fields. diff --git a/docs/runbook.md b/docs/runbook.md index a547196..384f87e 100644 --- a/docs/runbook.md +++ b/docs/runbook.md @@ -147,6 +147,55 @@ docker exec temporal-admin-tools temporal workflow list \ --- +## Daily State Hub WSJF triage verification + +Use this when answering: "did today's daily triage run happen?" + +Set the ActivityDefinition id when known. If it is not known, pass the +definition name used in the environment and let the live helper resolve it from +Postgres. + +```bash +export DAILY_TRIAGE_ACTIVITY_ID= + +# Dry-run checklist; safe from any shell because it only prints checks. +uv run python scripts/verify_daily_triage.py \ + --activity-id "$DAILY_TRIAGE_ACTIVITY_ID" \ + --date "$(date -u +%F)" + +# Live check from a shell with Temporal, DB, State Hub, and working-memory access. +ACTCORE_DB_URL=postgresql+asyncpg://actcore:actcore@localhost:5433/actcore \ +TEMPORAL_HOST=localhost:7233 \ +STATE_HUB_URL=http://127.0.0.1:8000 \ + uv run python scripts/verify_daily_triage.py \ + --activity-id "$DAILY_TRIAGE_ACTIVITY_ID" \ + --working-memory-dir /home/worsch/the-custodian/working-memory \ + --live +``` + +The verification is complete when all of these agree: + +- Temporal schedule `activity-schedule-$DAILY_TRIAGE_ACTIVITY_ID` exists, is not + paused, and uses the `skip` overlap policy. +- The latest workflow found with `ActivityId="$DAILY_TRIAGE_ACTIVITY_ID"` either + completed or is visibly retrying a failed activity in history. +- `activity_runs` has a row for the daily triage ActivityDefinition with today's + `scheduled_for` or `fired_at` date. +- State Hub `/progress/` contains a `daily_triage` event whose detail includes + the same `activity_core_run_id`. +- The working-memory sink wrote `daily-triage-YYYY-MM-DD-.md` and its + frontmatter contains the same `activity_core_run_id`. +- The ActivityDefinition's instruction model, token budget, and sink timeouts fit + under `ACTIVITY_TIMEOUT_SECONDS` (default 900 seconds). Temporal retries each + activity up to 10 attempts, so a slow LLM or sink failure should show as + workflow retry history rather than a silent missing report. + +Expected missed-run behavior: the daily triage definition should use +`misfire_policy: skip`. Planned downtime does not catch up missed daily reports; +the next scheduled fire is the next authoritative run. + +--- + ## Scale-out ### Multiple worker replicas @@ -204,6 +253,44 @@ Set the environment variable before running the worker. 2. `curl http://localhost:9090/metrics` should return Temporal SDK metrics. 3. If port 9090 conflicts with Prometheus server, set `PROMETHEUS_BIND_ADDR=0.0.0.0:9091`. +### Production alerting and failure modes + +Kubernetes health expectations: + +```bash +kubectl -n activity-core get deploy actcore-worker actcore-api actcore-event-router +kubectl -n activity-core get pods -l app.kubernetes.io/part-of=activity-core +kubectl -n activity-core port-forward svc/actcore-worker-metrics 9090:9090 +curl -sf http://127.0.0.1:9090/metrics +``` + +Page an operator when: + +- `actcore-worker` has no ready pod, cannot connect to Temporal, or cannot reach + Postgres. +- The daily triage schedule is missing or paused outside an approved maintenance + window. +- The expected daily triage run is absent from Temporal and `activity_runs` + after the retry window. +- Both State Hub progress and working-memory report sinks are missing for a + completed run. +- Report sink or task emission failures repeat across Temporal retries. + +Leave a State Hub progress note, but do not page, when: + +- A planned outage caused one skipped run and the schedule is healthy again. +- A sink idempotency check reports `exists` for the expected run id. +- The report completed but calibration feedback says the recommendations were + noisy, too long, or under-sensitive. + +Handle in the next operator session: + +- Prompt/schema tuning, loose-end sensitivity, and stale-but-parked work + calibration. +- Non-urgent schedule jitter or timeout adjustments. +- Moving a task sink from `ISSUE_SINK_TYPE=null` to the real issue-core endpoint + after a dry-run contract check has passed. + ### DB migration drift ```bash uv run alembic current # show current revision diff --git a/scripts/verify_daily_triage.py b/scripts/verify_daily_triage.py new file mode 100644 index 0000000..a76e5f0 --- /dev/null +++ b/scripts/verify_daily_triage.py @@ -0,0 +1,321 @@ +#!/usr/bin/env python3 +"""Verify the daily State Hub triage activity run. + +The default mode is ``--dry-run`` so operators can see the exact checks without +needing live Temporal, Postgres, or State Hub access from the current shell. +Pass ``--live`` to run the cheap checks directly. +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import os +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Any +from uuid import UUID + + +DEFAULT_ACTIVITY_NAME = "Daily State Hub WSJF Triage" +DEFAULT_PROGRESS_EVENT_TYPE = "daily_triage" +DEFAULT_TEMPORAL_HOST = "localhost:7233" +DEFAULT_TEMPORAL_NAMESPACE = "default" +DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" +DEFAULT_WORKING_MEMORY_DIR = "/home/worsch/the-custodian/working-memory" + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Verify whether today's daily State Hub triage run happened.", + ) + parser.add_argument("--activity-id", default=os.environ.get("DAILY_TRIAGE_ACTIVITY_ID")) + parser.add_argument("--activity-name", default=os.environ.get( + "DAILY_TRIAGE_ACTIVITY_NAME", + DEFAULT_ACTIVITY_NAME, + )) + parser.add_argument("--db-url", default=os.environ.get("ACTCORE_DB_URL")) + parser.add_argument("--temporal-host", default=os.environ.get( + "TEMPORAL_HOST", + DEFAULT_TEMPORAL_HOST, + )) + parser.add_argument("--temporal-namespace", default=os.environ.get( + "TEMPORAL_NAMESPACE", + DEFAULT_TEMPORAL_NAMESPACE, + )) + parser.add_argument("--state-hub-url", default=os.environ.get( + "STATE_HUB_URL", + DEFAULT_STATE_HUB_URL, + )) + parser.add_argument("--progress-event-type", default=DEFAULT_PROGRESS_EVENT_TYPE) + parser.add_argument("--working-memory-dir", default=os.environ.get( + "DAILY_TRIAGE_WORKING_MEMORY_DIR", + DEFAULT_WORKING_MEMORY_DIR, + )) + parser.add_argument( + "--date", + default=datetime.now(timezone.utc).date().isoformat(), + help="Local report date to check, formatted YYYY-MM-DD.", + ) + parser.add_argument( + "--live", + action="store_true", + help="Run live checks. Without this flag the script prints a dry-run checklist.", + ) + return parser.parse_args(argv) + + +def build_dry_run_report(args: argparse.Namespace) -> dict[str, Any]: + activity_ref = args.activity_id or ( + f'' + ) + schedule_id = f"activity-schedule-{activity_ref}" + db_filter = ( + f"activity_runs.activity_id = '{args.activity_id}'" + if args.activity_id + else f"activity_definitions.name = '{args.activity_name}'" + ) + activity_def_filter = ( + f"id = '{args.activity_id}'" + if args.activity_id + else f"name = '{args.activity_name}'" + ) + return { + "mode": "dry-run", + "generated_at": datetime.now(timezone.utc).isoformat(), + "activity": { + "id": args.activity_id, + "name": args.activity_name, + "schedule_id": schedule_id, + }, + "checks": [ + { + "name": "temporal_schedule", + "expect": "Schedule exists, is not paused, and uses SKIP overlap for misfire_policy=skip.", + "command": ( + "temporal schedule describe " + f"--schedule-id {schedule_id} " + f"--address {args.temporal_host} " + f"--namespace {args.temporal_namespace}" + ), + }, + { + "name": "latest_workflow_history", + "expect": "Latest workflow has ActivityId search attribute and completed or is retrying visibly.", + "command": ( + "temporal workflow list " + f"--query 'ActivityId=\"{activity_ref}\"' " + f"--address {args.temporal_host} " + f"--namespace {args.temporal_namespace}" + ), + }, + { + "name": "activity_runs_row", + "expect": "Latest activity_runs row exists for today's scheduled_for or fired_at date.", + "sql": ( + "select run_id, scheduled_for, fired_at, tasks_spawned, version_used " + "from activity_runs join activity_definitions on " + "activity_runs.activity_id = activity_definitions.id " + f"where {db_filter} " + "order by fired_at desc limit 5;" + ), + }, + { + "name": "state_hub_progress", + "expect": f"State Hub progress contains event_type={args.progress_event_type!r} with this run id.", + "command": ( + f"curl -s {args.state_hub_url.rstrip('/')}/progress/?limit=100" + ), + }, + { + "name": "working_memory_note", + "expect": "A daily-triage note exists and its frontmatter carries activity_core_run_id.", + "path_glob": str( + Path(args.working_memory_dir) + / f"daily-triage-{args.date}-*.md" + ), + }, + { + "name": "llm_timeout_budget", + "expect": "Instruction model/max_tokens fit within ACTIVITY_TIMEOUT_SECONDS and Temporal retries.", + "sql": ( + "select name, instructions_json, version from activity_definitions " + f"where {activity_def_filter};" + ), + "activity_timeout_seconds": int(os.environ.get( + "ACTIVITY_TIMEOUT_SECONDS", + "900", + )), + "retry_attempts": 10, + }, + ], + } + + +async def build_live_report(args: argparse.Namespace) -> dict[str, Any]: + if not args.db_url: + raise RuntimeError("ACTCORE_DB_URL or --db-url is required for --live") + + activity = await _resolve_activity(args) + activity_id = str(activity["id"]) + args.activity_id = activity_id + dry = build_dry_run_report(args) + dry["mode"] = "live" + dry["results"] = { + "activity_definition": _json_ready(activity), + "temporal": await _check_temporal(args, activity_id), + "activity_runs": await _latest_activity_runs(args, activity_id), + "state_hub_progress": await _state_hub_progress(args), + "working_memory_notes": _working_memory_notes(args), + } + return dry + + +async def _resolve_activity(args: argparse.Namespace) -> dict[str, Any]: + from sqlalchemy import text + + from activity_core.db import make_engine + + engine = make_engine(args.db_url) + try: + async with engine.connect() as conn: + if args.activity_id: + result = await conn.execute( + text( + "select id, name, enabled, trigger_config, instructions_json, version " + "from activity_definitions where id = :activity_id" + ), + {"activity_id": args.activity_id}, + ) + else: + result = await conn.execute( + text( + "select id, name, enabled, trigger_config, instructions_json, version " + "from activity_definitions where name = :activity_name " + "order by updated_at desc limit 1" + ), + {"activity_name": args.activity_name}, + ) + row = result.mappings().first() + if row is None: + raise RuntimeError("daily triage ActivityDefinition was not found") + return dict(row) + finally: + await engine.dispose() + + +async def _latest_activity_runs( + args: argparse.Namespace, + activity_id: str, +) -> list[dict[str, Any]]: + from sqlalchemy import text + + from activity_core.db import make_engine + + engine = make_engine(args.db_url) + try: + async with engine.connect() as conn: + result = await conn.execute( + text( + "select run_id, scheduled_for, fired_at, tasks_spawned, version_used " + "from activity_runs where activity_id = :activity_id " + "order by fired_at desc limit 5" + ), + {"activity_id": activity_id}, + ) + return [_json_ready(dict(row)) for row in result.mappings().all()] + finally: + await engine.dispose() + + +async def _check_temporal(args: argparse.Namespace, activity_id: str) -> dict[str, Any]: + from temporalio.client import Client + + schedule_id = f"activity-schedule-{activity_id}" + client = await Client.connect( + args.temporal_host, + namespace=args.temporal_namespace, + ) + handle = client.get_schedule_handle(schedule_id) + schedule = await handle.describe() + workflows = [] + query = f'ActivityId="{activity_id}"' + async for item in client.list_workflows(query=query): + workflows.append({ + "id": item.id, + "run_id": item.run_id, + "status": str(item.status), + "start_time": _iso(getattr(item, "start_time", None)), + "close_time": _iso(getattr(item, "close_time", None)), + }) + if len(workflows) >= 5: + break + state = getattr(schedule.schedule, "state", None) + policy = getattr(schedule.schedule, "policy", None) + return { + "schedule_id": schedule_id, + "paused": getattr(state, "paused", None), + "overlap_policy": str(getattr(policy, "overlap", "")), + "latest_workflows": workflows, + } + + +async def _state_hub_progress(args: argparse.Namespace) -> list[dict[str, Any]]: + import httpx + + base = args.state_hub_url.rstrip("/") + async with httpx.AsyncClient(timeout=10.0) as client: + response = await client.get(f"{base}/progress/", params={"limit": 100}) + response.raise_for_status() + items = response.json() + if not isinstance(items, list): + return [] + return [ + _json_ready(item) + for item in items + if item.get("event_type") == args.progress_event_type + ][:5] + + +def _working_memory_notes(args: argparse.Namespace) -> list[str]: + directory = Path(args.working_memory_dir) + pattern = f"daily-triage-{args.date}-*.md" + if not directory.exists(): + return [] + return [str(path) for path in sorted(directory.glob(pattern))] + + +def _json_ready(value: Any) -> Any: + if isinstance(value, dict): + return {key: _json_ready(item) for key, item in value.items()} + if isinstance(value, list): + return [_json_ready(item) for item in value] + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, UUID): + return str(value) + return value + + +def _iso(value: Any) -> str | None: + return value.isoformat() if hasattr(value, "isoformat") else None + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(argv) + try: + if args.live: + report = asyncio.run(build_live_report(args)) + else: + report = build_dry_run_report(args) + except Exception as exc: + print(f"verify_daily_triage: {exc}", file=sys.stderr) + return 2 + print(json.dumps(report, indent=2, sort_keys=True)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py index 98a65fb..3502061 100644 --- a/src/activity_core/activities.py +++ b/src/activity_core/activities.py @@ -377,6 +377,7 @@ async def emit_tasks(payload: dict) -> list[str]: Session = _get_session_factory() refs: list[str] = [] + errors: list[str] = [] async with Session() as session: async with session.begin(): for spec_dict in task_specs_raw: @@ -411,6 +412,11 @@ async def emit_tasks(payload: dict) -> list[str]: ) session.add(log_row) except Exception as exc: + message = f"{spec.source_type}:{spec.source_id}: {exc}" + errors.append(message) activity.logger.warning("emit_tasks: sink.emit failed — %s", exc) + if errors: + raise RuntimeError(f"task emission sink failure: {errors!r}") + return refs diff --git a/src/activity_core/context_resolvers/state_hub.py b/src/activity_core/context_resolvers/state_hub.py index e1e693e..fbb0a10 100644 --- a/src/activity_core/context_resolvers/state_hub.py +++ b/src/activity_core/context_resolvers/state_hub.py @@ -31,7 +31,7 @@ from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, Cont _DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" _TIMEOUT_SECONDS = 10.0 _OPEN_WORKSTREAM_STATUSES = {"active", "ready", "blocked"} -_OPEN_TASK_STATUSES = {"todo", "in_progress", "blocked"} +_OPEN_TASK_STATUSES = {"wait", "todo", "progress"} # Sentinel age for repos that have never had an SBOM ingested. Large enough # that any threshold-based staleness rule treats them as "very stale" without # forcing the rule expression to special-case None. @@ -260,7 +260,7 @@ def _daily_triage_digest(params: dict[str, Any]) -> str: "status", "open_task_counts", "needs_human_count", - "blocked_task_count", + "wait_task_count", "workplan_health_labels", ], }, @@ -311,14 +311,14 @@ def _open_workstream_digest( def _task_counts(tasks: list[dict[str, Any]]) -> dict[str, int]: - counts = {"todo": 0, "in_progress": 0, "blocked": 0, "needs_human": 0} + counts = {"wait": 0, "todo": 0, "progress": 0, "needs_human": 0} for task in tasks: status = task.get("status") if status in counts: counts[status] += 1 if task.get("needs_human"): counts["needs_human"] += 1 - counts["open_total"] = counts["todo"] + counts["in_progress"] + counts["blocked"] + counts["open_total"] = counts["wait"] + counts["todo"] + counts["progress"] return counts @@ -364,7 +364,7 @@ def _candidate_sort_key(candidate: dict[str, Any]) -> tuple[int, int, int, int]: _priority_rank(candidate.get("planning_priority")), 0 if candidate.get("status") == "active" else 1, -int(counts.get("needs_human", 0)), - -int(counts.get("blocked", 0)), + -int(counts.get("wait", 0)), ) diff --git a/tests/test_daily_triage_verifier.py b/tests/test_daily_triage_verifier.py new file mode 100644 index 0000000..2fb7b42 --- /dev/null +++ b/tests/test_daily_triage_verifier.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import importlib.util +from pathlib import Path + + +def _load_script(): + path = Path(__file__).parent.parent / "scripts" / "verify_daily_triage.py" + spec = importlib.util.spec_from_file_location("verify_daily_triage", path) + assert spec is not None + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + return module + + +def test_daily_triage_verifier_dry_run_names_all_operator_checks() -> None: + script = _load_script() + args = script.parse_args([ + "--activity-id", + "00000000-0000-0000-0000-000000000123", + "--date", + "2026-06-04", + "--working-memory-dir", + "/tmp/wm", + ]) + + report = script.build_dry_run_report(args) + + assert report["mode"] == "dry-run" + names = {check["name"] for check in report["checks"]} + assert names == { + "temporal_schedule", + "latest_workflow_history", + "activity_runs_row", + "state_hub_progress", + "working_memory_note", + "llm_timeout_budget", + } + assert report["activity"]["schedule_id"] == ( + "activity-schedule-00000000-0000-0000-0000-000000000123" + ) + assert any( + check.get("path_glob") == "/tmp/wm/daily-triage-2026-06-04-*.md" + for check in report["checks"] + ) + timeout_check = next( + check for check in report["checks"] if check["name"] == "llm_timeout_budget" + ) + run_check = next( + check for check in report["checks"] if check["name"] == "activity_runs_row" + ) + assert "activity_runs.activity_id" in run_check["sql"] + assert "where id = '00000000-0000-0000-0000-000000000123'" in timeout_check["sql"] + assert timeout_check["activity_timeout_seconds"] == 900 + assert timeout_check["retry_attempts"] == 10 diff --git a/tests/test_issue_sink.py b/tests/test_issue_sink.py new file mode 100644 index 0000000..ae9f3e4 --- /dev/null +++ b/tests/test_issue_sink.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +from typing import Any + +import httpx +import pytest + +from activity_core import activities +from activity_core.issue_sink import IssueCoreRestSink +from activity_core.rules.models import TaskRef, TaskSpec + + +class DummyResponse: + def __init__(self, payload: dict[str, Any]) -> None: + self.payload = payload + + def raise_for_status(self) -> None: + return None + + def json(self) -> dict[str, Any]: + return self.payload + + +def test_issue_core_rest_sink_posts_task_contract(monkeypatch) -> None: + posts: list[dict[str, Any]] = [] + + def fake_post(url: str, **kwargs: Any) -> DummyResponse: + posts.append({"url": url, **kwargs}) + return DummyResponse({ + "issue_id": "issue-123", + "issue_url": "http://issue-core.test/issues/issue-123", + "backend": "issue-core", + }) + + monkeypatch.setattr(httpx, "post", fake_post) + + ref = IssueCoreRestSink("http://issue-core.test/").emit(TaskSpec( + title="Run SBOM rescan for activity-core", + description="SBOM is older than 30 days.", + target_repo="activity-core", + priority="medium", + labels=["sbom", "security", "automated"], + due_in_days=7, + source_type="rule", + source_id="flag-stale-sbom", + triggering_event_id="scheduled", + activity_definition_id="activity-1", + )) + + assert ref == TaskRef( + external_id="issue-123", + backend_url="http://issue-core.test/issues/issue-123", + backend="issue-core", + ) + assert posts == [ + { + "url": "http://issue-core.test/issues/", + "json": { + "title": "Run SBOM rescan for activity-core", + "description": "SBOM is older than 30 days.", + "target_repo": "activity-core", + "priority": "medium", + "labels": ["sbom", "security", "automated"], + "due_in_days": 7, + "source_type": "rule", + "source_id": "flag-stale-sbom", + "triggering_event_id": "scheduled", + "activity_definition_id": "activity-1", + }, + "timeout": 10.0, + } + ] + + +@pytest.mark.asyncio +async def test_emit_tasks_raises_when_sink_fails(monkeypatch) -> None: + class FailingSink: + def emit(self, task_spec: TaskSpec) -> TaskRef: + raise RuntimeError(f"boom for {task_spec.title}") + + class FakeTransaction: + async def __aenter__(self) -> None: + return None + + async def __aexit__(self, *exc_info: object) -> bool: + return False + + class FakeSession: + def begin(self) -> FakeTransaction: + return FakeTransaction() + + async def __aenter__(self) -> "FakeSession": + return self + + async def __aexit__(self, *exc_info: object) -> bool: + return False + + def add(self, row: object) -> None: + raise AssertionError("failed emissions should not write spawn logs") + + class FakeSessionFactory: + def __call__(self) -> FakeSession: + return FakeSession() + + monkeypatch.setattr(activities, "get_issue_sink", lambda: FailingSink()) + monkeypatch.setattr(activities, "_get_session_factory", lambda: FakeSessionFactory()) + + with pytest.raises(RuntimeError, match="task emission sink failure"): + await activities.emit_tasks({ + "activity_id": "00000000-0000-0000-0000-000000000001", + "triggering_event_id": "scheduled", + "run_id": "00000000-0000-0000-0000-000000000002", + "task_specs": [ + { + "title": "Run SBOM rescan for activity-core", + "description": "", + "target_repo": "activity-core", + "priority": "medium", + "labels": ["sbom"], + "due_in_days": None, + "source_type": "rule", + "source_id": "flag-stale-sbom", + "condition": "context.repo.sbom_age_days > 30", + } + ], + }) diff --git a/tests/test_state_hub_context_resolver.py b/tests/test_state_hub_context_resolver.py index 1200195..800ce8d 100644 --- a/tests/test_state_hub_context_resolver.py +++ b/tests/test_state_hub_context_resolver.py @@ -235,7 +235,7 @@ def test_daily_triage_digest_is_curated_scalar_json(monkeypatch) -> None: payloads = { "/state/summary": { "generated_at": "2026-05-19T05:20:00Z", - "totals": {"tasks": {"todo": 4, "blocked": 1}}, + "totals": {"tasks": {"todo": 4, "wait": 1}}, "topics": [ { "slug": "custodian", @@ -306,7 +306,7 @@ def test_daily_triage_digest_is_curated_scalar_json(monkeypatch) -> None: { "id": "task-2", "title": "T06 - Canary Cutover", - "status": "blocked", + "status": "wait", "priority": "medium", "needs_human": True, }, @@ -331,13 +331,13 @@ def test_daily_triage_digest_is_curated_scalar_json(monkeypatch) -> None: import json digest = json.loads(raw_digest) - assert digest["totals"] == {"tasks": {"todo": 4, "blocked": 1}} + assert digest["totals"] == {"tasks": {"todo": 4, "wait": 1}} assert digest["open_workstreams"][0]["slug"] == "cust-wp-0045" assert digest["open_workstreams"][0]["planning_priority"] == "high" assert digest["open_workstreams"][0]["open_task_counts"] == { + "wait": 1, "todo": 1, - "in_progress": 0, - "blocked": 1, + "progress": 0, "needs_human": 1, "open_total": 2, } diff --git a/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md b/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md index 7c00c51..d89c0b6 100644 --- a/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md +++ b/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md @@ -4,11 +4,11 @@ type: workplan title: "Post-triage operational hardening" domain: custodian repo: activity-core -status: ready +status: active owner: codex topic_slug: custodian created: "2026-06-03" -updated: "2026-06-03" +updated: "2026-06-04" state_hub_workstream_id: "5646e13a-13af-4724-bca6-3c0d86f96733" --- @@ -31,7 +31,7 @@ task lifecycle database, a project planner, or an execution worker. ```task id: ACTIVITY-WP-0006-T01 -status: todo +status: done priority: high state_hub_task_id: "5d79e3da-d26d-4cad-9cdf-5e5264bb7019" ``` @@ -50,11 +50,17 @@ Scope: Done when the full test suite passes and activity-core no longer depends on legacy task-status aliases for State Hub API clients or tests. +2026-06-04: Completed. `AGENTS.md` now uses State Hub task statuses +`wait`, `todo`, `progress`, `done`, and `cancel`; workplan/workstream lifecycle +`blocked` remains separate. The State Hub daily triage digest now counts +`wait/todo/progress` open tasks and no longer fixtures task-level +`in_progress` or `blocked`. Full suite passed: 128 passed, 1 skipped. + ## Daily Triage Observability Runbook ```task id: ACTIVITY-WP-0006-T02 -status: todo +status: done priority: high state_hub_task_id: "02c34443-0e8d-4f1a-93d9-6c39f07faad7" ``` @@ -73,11 +79,16 @@ The operator should be able to check: Done when `docs/runbook.md` has a concise daily-triage verification section and any helper command/script is covered by tests or a dry-run path. +2026-06-04: Completed. Added `scripts/verify_daily_triage.py` with dry-run and +live modes, plus `tests/test_daily_triage_verifier.py`. `docs/runbook.md` now +covers Temporal schedule/workflow checks, `activity_runs`, State Hub progress, +working-memory notes, missed-run `skip` behavior, and LLM timeout budget. + ## Three-Run Calibration Feedback ```task id: ACTIVITY-WP-0006-T03 -status: todo +status: wait priority: medium state_hub_task_id: "7cbf0a35-71a1-47ac-afc2-f51ad2180fd0" ``` @@ -96,11 +107,16 @@ Done when the calibration result is recorded in State Hub and the related `CUST-WP-0044` / `CUST-WP-0045` tasks can close based on activity-core runs, not Codex app fallback runs. +2026-06-04: Waiting on real evidence. The repo now has a verification path for +scheduled daily triage runs, but this task still requires three consecutive +actual activity-core scheduled runs and State Hub calibration feedback. Local +tests cannot substitute for that operational evidence. + ## Rule Action Contract Documentation ```task id: ACTIVITY-WP-0006-T04 -status: todo +status: done priority: medium state_hub_task_id: "c9066d2e-0429-4e14-a68a-8418061ffd8d" ``` @@ -116,11 +132,16 @@ Also decide and document the naming/semantics mismatch around Done when ADR-003 or a focused follow-up doc contains examples, unsafe cases, and the weekly SBOM staleness definition is cited as the canonical pattern. +2026-06-04: Completed. Updated ADR-003 with whole-field path rendering, +scalar placeholder rendering, unsafe action cases, explicit `for_each` / +`bind_as` expansion, the `task_template` naming mismatch, and weekly SBOM +staleness as the canonical per-item pattern. + ## Production Alerting And Failure Modes ```task id: ACTIVITY-WP-0006-T05 -status: todo +status: done priority: medium state_hub_task_id: "420ea629-0c20-4d09-9cc1-6b2f32665161" ``` @@ -139,11 +160,17 @@ Cover: Done when the runbook and metrics/health surface make ordinary failures visible without inspecting a Codex Desktop session. +2026-06-04: Completed. `docs/runbook.md` now documents Kubernetes worker/API/ +router health checks, Temporal schedule paused/missing checks, report sink +failure behavior, LLM timeout/retry behavior, and page/note/next-session +classification. Task emission sink failures now raise from `emit_tasks`, making +them visible to Temporal retries instead of warning-only logs. + ## Issue-Core Emission Boundary Verification ```task id: ACTIVITY-WP-0006-T06 -status: todo +status: done priority: medium state_hub_task_id: "78089aef-aba1-42d7-a203-ef80ba6791d9" ``` @@ -163,6 +190,13 @@ Done when there is a tested or dry-run-verified path from a rule match to a downstream task reference, and activity-core still owns only the spawn audit trail, not task lifecycle state. +2026-06-04: Completed. Added `docs/issue-core-emission-boundary.md` documenting +REST `/issues/` as the current authoritative endpoint, NATS as future work, +Railiance `ISSUE_SINK_TYPE=null` dry-run mode, and the fields sent to +issue-core versus retained in `task_spawn_log`. Added REST payload and sink +failure tests in `tests/test_issue_sink.py`; the existing weekly SBOM integration +test remains the dry-run rule-match-to-task-reference proof. + ## Completion Criteria - State Hub task-status canon adaptation is complete.