From 0dc342eb1b2144c9203d511adcb550a943c5ba77 Mon Sep 17 00:00:00 2001 From: tegwick Date: Tue, 19 May 2026 18:28:23 +0200 Subject: [PATCH] Wire instruction report execution --- schemas/daily-triage-report.json | 15 +++ src/activity_core/activities.py | 76 ++++++++++++ src/activity_core/llm_client.py | 57 +++++++++ src/activity_core/rules/executor.py | 174 ++++++++++++++++++++++++--- src/activity_core/workflows.py | 14 +++ tests/rules/test_executor.py | 77 ++++++++++++ tests/test_instruction_evaluation.py | 116 ++++++++++++++++++ 7 files changed, 513 insertions(+), 16 deletions(-) create mode 100644 schemas/daily-triage-report.json create mode 100644 src/activity_core/llm_client.py create mode 100644 tests/test_instruction_evaluation.py diff --git a/schemas/daily-triage-report.json b/schemas/daily-triage-report.json new file mode 100644 index 0000000..a1ff71d --- /dev/null +++ b/schemas/daily-triage-report.json @@ -0,0 +1,15 @@ +{ + "type": "object", + "required": ["summary", "recommendations"], + "properties": { + "summary": { + "type": "string" + }, + "recommendations": { + "type": "array", + "items": { + "type": "object" + } + } + } +} diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py index e27b2d8..de20c82 100644 --- a/src/activity_core/activities.py +++ b/src/activity_core/activities.py @@ -25,6 +25,9 @@ from activity_core.issue_sink import get_issue_sink from activity_core.orm import ActivityDefinition as ActivityDefinitionRow from activity_core.orm import ActivityRun, TaskInstance, TaskSpawnLog from activity_core.rules import evaluate_condition +from activity_core.llm_client import get_llm_client +from activity_core.models import InstructionDef +from activity_core.rules.executor import execute_instruction_with_audit _session_factory: async_sessionmaker[AsyncSession] | None = None @@ -267,6 +270,75 @@ async def evaluate_rules(payload: dict) -> list[dict]: return matched +@activity.defn +async def evaluate_instructions(payload: dict) -> dict: + """Evaluate instruction blocks and return task specs/reports with audit fields. + + Expected keys in payload: + instructions list[dict] — InstructionDef serialised dicts + event dict — EventEnvelope attributes (or empty for cron) + context dict — context snapshot from resolve_context + """ + instructions = payload.get("instructions", []) + event_attrs = payload.get("event", {}) + context = payload.get("context", {}) + llm_client = get_llm_client() + + class _Env: + def __init__(self, attrs: dict) -> None: + self.attributes = _DictObj(attrs) + + class _DictObj: + def __init__(self, d: dict) -> None: + self.__dict__.update(d) + + event_obj = _Env(event_attrs) + + task_specs: list[dict] = [] + reports: list[dict] = [] + for raw_instruction in instructions: + try: + instruction = InstructionDef.model_validate(raw_instruction) + except Exception as exc: + activity.logger.warning("instruction definition invalid — %s", exc) + continue + + result = execute_instruction_with_audit( + instruction, + event_obj, + context, + llm_client, + ) + if result.report is not None: + reports.append({ + "instruction_id": instruction.id, + "report": result.report, + "condition": result.condition_matched, + "prompt_hash": result.prompt_hash, + "model": result.model, + "output_validated": result.output_validated, + "review_required": result.review_required, + }) + for spec in result.tasks: + task_specs.append({ + "title": spec.title, + "description": spec.description, + "target_repo": spec.target_repo, + "priority": spec.priority, + "labels": spec.labels, + "due_in_days": spec.due_in_days, + "source_type": "instruction", + "source_id": instruction.id, + "condition": result.condition_matched, + "prompt_hash": result.prompt_hash, + "model": result.model, + "output_validated": result.output_validated, + "review_required": result.review_required, + }) + + return {"task_specs": task_specs, "reports": reports} + + @activity.defn async def emit_tasks(payload: dict) -> list[str]: """Emit TaskSpecs to IssueSink and write task_spawn_log rows. @@ -316,6 +388,10 @@ async def emit_tasks(payload: dict) -> list[str]: triggering_event_id=triggering_event_id, task_ref=ref.external_id, condition_matched=spec_dict.get("condition"), + prompt_hash=spec_dict.get("prompt_hash"), + model=spec_dict.get("model"), + output_validated=spec_dict.get("output_validated"), + review_required=spec_dict.get("review_required"), ) session.add(log_row) except Exception as exc: diff --git a/src/activity_core/llm_client.py b/src/activity_core/llm_client.py new file mode 100644 index 0000000..076c2c6 --- /dev/null +++ b/src/activity_core/llm_client.py @@ -0,0 +1,57 @@ +"""llm-connect adapter for instruction execution. + +activity-core deliberately talks to llm-connect over its small HTTP surface +instead of importing provider-specific SDKs. This keeps the activity worker on +owned infrastructure while leaving provider selection, API keys, and model +routing behind the existing llm-connect boundary. +""" + +from __future__ import annotations + +import os +from typing import Any + +import httpx + + +class DisabledLLMClient: + """LLM client used when no llm-connect endpoint is configured.""" + + def complete(self, prompt: str, model: str = "") -> str: # noqa: ARG002 + raise RuntimeError("LLM_CONNECT_URL is not configured") + + +class LLMConnectClient: + """Small synchronous client for llm-connect server mode.""" + + def __init__(self, base_url: str, timeout_seconds: float = 300.0) -> None: + self.base_url = base_url.rstrip("/") + self.timeout_seconds = timeout_seconds + + def complete(self, prompt: str, model: str = "") -> str: + payload: dict[str, Any] = { + "prompt": prompt, + "config": { + "model_name": model, + "timeout_seconds": int(self.timeout_seconds), + }, + } + resp = httpx.post( + f"{self.base_url}/execute", + json=payload, + timeout=self.timeout_seconds, + ) + resp.raise_for_status() + data = resp.json() + content = data.get("content") + if not isinstance(content, str): + raise ValueError("llm-connect response missing string content") + return content + + +def get_llm_client() -> DisabledLLMClient | LLMConnectClient: + base_url = os.environ.get("LLM_CONNECT_URL", "").strip() + if not base_url: + return DisabledLLMClient() + timeout = float(os.environ.get("LLM_CONNECT_TIMEOUT_SECONDS", "300")) + return LLMConnectClient(base_url, timeout) diff --git a/src/activity_core/rules/executor.py b/src/activity_core/rules/executor.py index e1d5cb9..7561601 100644 --- a/src/activity_core/rules/executor.py +++ b/src/activity_core/rules/executor.py @@ -11,6 +11,8 @@ import hashlib import json import logging import re +from dataclasses import dataclass +from pathlib import Path from typing import Any from activity_core.rules.evaluator import UnsafeExpression, evaluate_condition @@ -26,6 +28,19 @@ class UntrustedFieldError(ValueError): """Raised when a prompt placeholder references a field not in trusted_fields.""" +@dataclass +class InstructionResult: + """Instruction output plus audit metadata for workflow integration.""" + + tasks: list[TaskSpec] + report: dict[str, Any] | None = None + prompt_hash: str | None = None + model: str | None = None + output_validated: bool = False + review_required: bool = False + condition_matched: str | None = None + + def _resolve_path(obj: Any, path: str) -> Any: """Walk a dot-separated path on obj or dict. Returns None if not found.""" parts = path.split(".") @@ -92,14 +107,24 @@ def execute_instruction( 4. Validate response against instr.output_schema (JSON Schema). Retry once. 5. Return list[TaskSpec]. """ + return execute_instruction_with_audit(instr, event, context, llm_client).tasks + + +def execute_instruction_with_audit( + instr: Any, + event: Any, + context: dict, + llm_client: Any, +) -> InstructionResult: + """Evaluate an Instruction and return task specs plus audit metadata.""" try: return _execute(instr, event, context, llm_client) except UntrustedFieldError as exc: logger.warning("instruction %r rejected — %s", instr.id, exc) - return [] + return _empty_result(instr) except Exception as exc: logger.warning("instruction %r failed — %s", instr.id, exc) - return [] + return _empty_result(instr) def _execute( @@ -107,14 +132,14 @@ def _execute( event: Any, context: dict, llm_client: Any, -) -> list[TaskSpec]: +) -> InstructionResult: # Step 1 — pre-filter try: if instr.condition and not evaluate_condition(instr.condition, event, context): - return [] + return _empty_result(instr) except UnsafeExpression as exc: logger.warning("instruction %r condition is unsafe — %s", instr.id, exc) - return [] + return _empty_result(instr) # Step 2 — render prompt (raises UntrustedFieldError on policy violation) rendered = _render_prompt(instr.prompt, instr.trusted_fields, event, context) @@ -124,34 +149,87 @@ def _execute( raw_output = llm_client.complete(rendered, model=instr.model) # Step 4 — validate and optionally retry - task_specs, error = _validate_output(raw_output, instr) + task_specs, report, error = _validate_output(raw_output, instr) if error: retry_prompt = rendered + f"\n\nPrevious output was invalid: {error}\nPlease fix." raw_output = llm_client.complete(retry_prompt, model=instr.model) - task_specs, error = _validate_output(raw_output, instr) + task_specs, report, error = _validate_output(raw_output, instr) if error: logger.warning( "instruction_output_error: instruction=%r, prompt_hash=%s, error=%s", instr.id, prompt_hash, error, ) - return [] + return _empty_result(instr, prompt_hash=prompt_hash) - return task_specs + return InstructionResult( + tasks=task_specs, + report=report, + prompt_hash=prompt_hash, + model=instr.model, + output_validated=True, + review_required=bool(getattr(instr, "review_required", False)), + condition_matched=instr.condition or None, + ) -def _validate_output(raw_output: Any, instr: Any) -> tuple[list[TaskSpec], str | None]: - """Parse raw LLM output into TaskSpec list. Returns (specs, error_message).""" +def _empty_result(instr: Any, prompt_hash: str | None = None) -> InstructionResult: + return InstructionResult( + tasks=[], + prompt_hash=prompt_hash, + model=getattr(instr, "model", None), + output_validated=False, + review_required=bool(getattr(instr, "review_required", False)), + condition_matched=getattr(instr, "condition", "") or None, + ) + + +def _validate_output( + raw_output: Any, + instr: Any, +) -> tuple[list[TaskSpec], dict[str, Any] | None, str | None]: + """Parse raw LLM output into TaskSpecs and optional report payload. + + Accepted shapes: + - list[task] + - single task dict with title/description/etc. + - {"tasks": [...], "report": {...}} + - report-only dict, such as {"summary": "...", "recommendations": [...]} + + Returns (specs, report, error_message). + """ try: if isinstance(raw_output, str): data = json.loads(raw_output) else: data = raw_output - if not isinstance(data, list): - data = [data] + schema_error = _validate_against_schema(data, getattr(instr, "output_schema", "")) + if schema_error: + return [], None, schema_error + + report: dict[str, Any] | None = None + task_items: list[Any] + if isinstance(data, dict) and ("tasks" in data or "report" in data): + maybe_report = data.get("report") + if maybe_report is not None and not isinstance(maybe_report, dict): + return [], None, "report must be a JSON object" + report = maybe_report + tasks = data.get("tasks", []) + if not isinstance(tasks, list): + return [], None, "tasks must be a JSON array" + task_items = tasks + elif isinstance(data, dict) and "title" not in data: + report = data + task_items = [] + elif isinstance(data, list): + task_items = data + else: + task_items = [data] specs = [] - for item in data: + for item in task_items: + if not isinstance(item, dict): + return [], None, "each task must be a JSON object" specs.append(TaskSpec( title=item.get("title", ""), description=item.get("description", ""), @@ -162,6 +240,70 @@ def _validate_output(raw_output: Any, instr: Any) -> tuple[list[TaskSpec], str | source_type="instruction", source_id=instr.id, )) - return specs, None + return specs, report, None except (json.JSONDecodeError, AttributeError, KeyError, TypeError) as exc: - return [], str(exc) + return [], None, str(exc) + + +def _validate_against_schema(data: Any, schema_path: str) -> str | None: + if not schema_path: + return None + + path = Path(schema_path) + if not path.exists(): + return None + + try: + schema = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as exc: + return f"could not read output schema: {exc}" + + return _validate_schema_node(data, schema, "$") + + +def _validate_schema_node(data: Any, schema: dict[str, Any], path: str) -> str | None: + expected_type = schema.get("type") + if expected_type and not _matches_type(data, expected_type): + return f"{path}: expected {expected_type}" + + if expected_type == "object": + required = schema.get("required", []) + if isinstance(required, list): + for key in required: + if isinstance(key, str) and key not in data: + return f"{path}: missing required property {key!r}" + properties = schema.get("properties", {}) + if isinstance(properties, dict): + for key, child_schema in properties.items(): + if key in data and isinstance(child_schema, dict): + error = _validate_schema_node(data[key], child_schema, f"{path}.{key}") + if error: + return error + + if expected_type == "array": + item_schema = schema.get("items") + if isinstance(item_schema, dict): + for index, item in enumerate(data): + error = _validate_schema_node(item, item_schema, f"{path}[{index}]") + if error: + return error + + return None + + +def _matches_type(data: Any, expected_type: str) -> bool: + if expected_type == "object": + return isinstance(data, dict) + if expected_type == "array": + return isinstance(data, list) + if expected_type == "string": + return isinstance(data, str) + if expected_type == "integer": + return isinstance(data, int) and not isinstance(data, bool) + if expected_type == "number": + return isinstance(data, (int, float)) and not isinstance(data, bool) + if expected_type == "boolean": + return isinstance(data, bool) + if expected_type == "null": + return data is None + return True diff --git a/src/activity_core/workflows.py b/src/activity_core/workflows.py index b56d0f1..e56efe6 100644 --- a/src/activity_core/workflows.py +++ b/src/activity_core/workflows.py @@ -21,6 +21,7 @@ with workflow.unsafe.imports_passed_through(): from activity_core.activities import ( emit_tasks, evaluate_rules, + evaluate_instructions, load_activity_definition, log_run, persist_task_instance, @@ -136,6 +137,19 @@ class RunActivityWorkflow: "condition": rule.get("condition", ""), }) + if defn.get("instructions"): + instruction_result: dict = await workflow.execute_activity( + evaluate_instructions, + { + "instructions": defn.get("instructions", []), + "event": event_attrs, + "context": context_snapshot, + }, + start_to_close_timeout=_ACTIVITY_TIMEOUT, + retry_policy=_RETRY_POLICY, + ) + task_spec_dicts.extend(instruction_result.get("task_specs", [])) + # ── 4. Emit tasks via IssueSink ─────────────────────────────────────── if trigger_key == SCHEDULED_TRIGGER_KEY: dedup_source = workflow.info().workflow_id diff --git a/tests/rules/test_executor.py b/tests/rules/test_executor.py index b035b05..e21637a 100644 --- a/tests/rules/test_executor.py +++ b/tests/rules/test_executor.py @@ -21,6 +21,7 @@ from activity_core.rules.executor import ( UntrustedFieldError, _render_prompt, execute_instruction, + execute_instruction_with_audit, ) @@ -201,6 +202,82 @@ def test_valid_llm_output_returns_task_spec(): assert result[0].source_type == "instruction" +def test_execute_instruction_with_audit_returns_metadata(): + task_data = [{"title": "Run triage", "priority": "high"}] + llm = _CountingLLM([json.dumps(task_data)]) + instr = _instr( + id="daily-triage", + condition="", + prompt="Check State Hub.", + trusted_fields=[], + model="test-model", + review_required=True, + ) + + result = execute_instruction_with_audit(instr, _Event(), {}, llm) + + assert len(result.tasks) == 1 + assert result.tasks[0].source_id == "daily-triage" + assert result.prompt_hash is not None + assert len(result.prompt_hash) == 64 + assert result.model == "test-model" + assert result.output_validated is True + assert result.review_required is True + + +def test_execute_instruction_with_audit_accepts_report_payload(): + report_data = { + "summary": "State Hub has loose ends.", + "recommendations": [{"action": "revisit", "candidate": "CUST-WP-0045"}], + } + llm = _CountingLLM([json.dumps(report_data)]) + instr = _instr( + id="daily-triage-report", + prompt="Report.", + trusted_fields=[], + output_schema="schemas/daily-triage-report.json", + ) + + result = execute_instruction_with_audit(instr, _Event(), {}, llm) + + assert result.tasks == [] + assert result.report == report_data + assert result.output_validated is True + + +def test_execute_instruction_with_audit_rejects_invalid_report_schema(): + report_data = {"summary": "Missing recommendations."} + llm = _CountingLLM([json.dumps(report_data), json.dumps(report_data)]) + instr = _instr( + id="daily-triage-report", + prompt="Report.", + trusted_fields=[], + output_schema="schemas/daily-triage-report.json", + ) + + result = execute_instruction_with_audit(instr, _Event(), {}, llm) + + assert result.tasks == [] + assert result.report is None + assert result.output_validated is False + assert llm.call_count == 2 + + +def test_execute_instruction_with_audit_accepts_report_and_tasks_envelope(): + envelope = { + "report": {"summary": "Review needed."}, + "tasks": [{"title": "Inspect CUST-WP-0045"}], + } + llm = _CountingLLM([json.dumps(envelope)]) + instr = _instr(id="daily-triage-report", prompt="Report.", trusted_fields=[]) + + result = execute_instruction_with_audit(instr, _Event(), {}, llm) + + assert result.report == {"summary": "Review needed."} + assert len(result.tasks) == 1 + assert result.tasks[0].title == "Inspect CUST-WP-0045" + + # ── Condition pre-filter ─────────────────────────────────────────────────────── def test_condition_false_skips_llm(): diff --git a/tests/test_instruction_evaluation.py b/tests/test_instruction_evaluation.py new file mode 100644 index 0000000..5afb713 --- /dev/null +++ b/tests/test_instruction_evaluation.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +import json + +import pytest + +from activity_core import activities + + +class FakeLLMClient: + def __init__(self, response: str) -> None: + self.response = response + self.calls: list[tuple[str, str]] = [] + + def complete(self, prompt: str, model: str = "") -> str: + self.calls.append((prompt, model)) + return self.response + + +@pytest.mark.asyncio +async def test_evaluate_instructions_returns_task_specs_with_audit(monkeypatch) -> None: + llm = FakeLLMClient(json.dumps([ + { + "title": "Run daily triage", + "description": "Review State Hub loose ends.", + "priority": "high", + "labels": ["triage"], + } + ])) + monkeypatch.setattr(activities, "get_llm_client", lambda: llm) + + result = await activities.evaluate_instructions({ + "instructions": [ + { + "id": "daily-triage", + "trusted_fields": ["context.summary.open_tasks"], + "model": "test-model", + "prompt": "Open tasks: {context.summary.open_tasks}", + "output_schema": "", + "review_required": False, + } + ], + "event": {}, + "context": {"summary": {"open_tasks": 3}}, + }) + + task_specs = result["task_specs"] + assert len(task_specs) == 1 + spec = task_specs[0] + assert spec["title"] == "Run daily triage" + assert spec["source_type"] == "instruction" + assert spec["source_id"] == "daily-triage" + assert spec["model"] == "test-model" + assert spec["output_validated"] is True + assert spec["review_required"] is False + assert spec["prompt_hash"] is not None + assert len(spec["prompt_hash"]) == 64 + assert result["reports"] == [] + assert llm.calls == [("Open tasks: 3", "test-model")] + + +@pytest.mark.asyncio +async def test_evaluate_instructions_returns_report_payload(monkeypatch) -> None: + llm = FakeLLMClient(json.dumps({ + "summary": "State Hub has open loose ends.", + "recommendations": [{"candidate": "CUST-WP-0045", "action": "work-next"}], + })) + monkeypatch.setattr(activities, "get_llm_client", lambda: llm) + + result = await activities.evaluate_instructions({ + "instructions": [ + { + "id": "daily-triage-report", + "trusted_fields": [], + "model": "test-model", + "prompt": "Run report.", + "output_schema": "schemas/daily-triage-report.json", + "review_required": False, + } + ], + "event": {}, + "context": {}, + }) + + assert result["task_specs"] == [] + assert len(result["reports"]) == 1 + report = result["reports"][0] + assert report["instruction_id"] == "daily-triage-report" + assert report["report"]["summary"] == "State Hub has open loose ends." + assert report["output_validated"] is True + assert report["prompt_hash"] is not None + + +@pytest.mark.asyncio +async def test_evaluate_instructions_without_llm_client_returns_no_tasks(monkeypatch) -> None: + class RaisingClient: + def complete(self, prompt: str, model: str = "") -> str: # noqa: ARG002 + raise RuntimeError("not configured") + + monkeypatch.setattr(activities, "get_llm_client", lambda: RaisingClient()) + + result = await activities.evaluate_instructions({ + "instructions": [ + { + "id": "daily-triage", + "trusted_fields": [], + "model": "test-model", + "prompt": "Run triage.", + "output_schema": "schemas/daily-triage-report.json", + } + ], + "event": {}, + "context": {}, + }) + + assert result == {"task_specs": [], "reports": []}