"""ActivityDefinition markdown file parser (T44). Scans activity-definitions/*.md in the local repo and any directories listed in ACTIVITY_DEFINITION_DIRS (colon-separated). Returns parsed ActivityDefinitionDef objects; raises ParseError on malformed input. Never silently ignores a broken definition file. """ from __future__ import annotations import os import re from dataclasses import dataclass, field from pathlib import Path from typing import Any import yaml class ParseError(Exception): """Raised when a definition file cannot be parsed.""" def __init__(self, file: Path, line: int | None, message: str) -> None: self.file = file self.line = line self.message = message loc = f"{file}:{line}" if line else str(file) super().__init__(f"{loc}: {message}") @dataclass class ActivityDefinitionDef: """Parsed in-memory representation of an activity-definitions/*.md file.""" id: str name: str enabled: bool trigger_config: dict[str, Any] context_sources: list[dict[str, Any]] rules: list[dict[str, Any]] instructions: list[dict[str, Any]] governance: str owner: str status: str source_file: Path _FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL) _FENCED_BLOCK_RE = re.compile(r"^```(\w+)\s*\n(.*?)\n```", re.DOTALL | re.MULTILINE) def _scan_dirs() -> list[Path]: dirs: list[Path] = [] default_dir = Path("activity-definitions") if default_dir.is_dir(): dirs.append(default_dir) extra = os.environ.get("ACTIVITY_DEFINITION_DIRS", "") for part in extra.split(":"): part = part.strip() if not part: continue p = Path(part) / "activity-definitions" if p.is_dir(): dirs.append(p) return dirs def _parse_trigger(trigger_dict: dict, file: Path) -> dict[str, Any]: """Normalise the trigger section into a trigger_config dict.""" trigger_type = trigger_dict.get("type") if trigger_type == "cron": return { "trigger_type": "cron", "cron_expression": trigger_dict.get("cron_expression", ""), "timezone": trigger_dict.get("timezone", "UTC"), "jitter_seconds": trigger_dict.get("jitter_seconds", 0), "misfire_policy": trigger_dict.get("misfire_policy", "skip"), } elif trigger_type == "event": return { "trigger_type": "event", "event_type": trigger_dict.get("event_type", ""), "filters": trigger_dict.get("filters", {}), } elif trigger_type == "scheduled": at = trigger_dict.get("at") if at is None: raise ParseError(file, None, "trigger.at is required for type=scheduled") at_str = at.isoformat() if hasattr(at, "isoformat") else str(at) return { "trigger_type": "scheduled", "at": at_str, "timezone": trigger_dict.get("timezone", "UTC"), } else: raise ParseError(file, None, f"unknown trigger type {trigger_type!r}") def parse_file(path: Path) -> ActivityDefinitionDef: """Parse one ActivityDefinition markdown file. Raises ParseError on malformed or missing required fields. """ try: text = path.read_text(encoding="utf-8") except OSError as exc: raise ParseError(path, None, f"cannot read file: {exc}") from exc fm_match = _FRONTMATTER_RE.match(text) if not fm_match: raise ParseError(path, 1, "missing or malformed YAML frontmatter") try: fm = yaml.safe_load(fm_match.group(1)) except yaml.YAMLError as exc: raise ParseError(path, None, f"YAML parse error in frontmatter: {exc}") from exc if not isinstance(fm, dict): raise ParseError(path, 1, "frontmatter must be a YAML mapping") for req in ("id", "name", "trigger"): if req not in fm: raise ParseError(path, None, f"missing required frontmatter field: {req!r}") trigger_section = fm["trigger"] if not isinstance(trigger_section, dict): raise ParseError(path, None, "trigger must be a YAML mapping") trigger_config = _parse_trigger(trigger_section, path) raw_sources = fm.get("context_sources", []) or [] context_sources: list[dict[str, Any]] = [] for cs in raw_sources: if not isinstance(cs, dict): raise ParseError(path, None, "each context_source must be a mapping") context_sources.append(cs) # Parse fenced rule/instruction blocks from the body body = text[fm_match.end():] rules: list[dict[str, Any]] = [] instructions: list[dict[str, Any]] = [] for block_match in _FENCED_BLOCK_RE.finditer(body): lang = block_match.group(1).strip() block_body = block_match.group(2) try: block_data = yaml.safe_load(block_body) except yaml.YAMLError as exc: raise ParseError(path, None, f"YAML parse error in {lang!r} block: {exc}") from exc if lang == "rule": if not isinstance(block_data, dict): raise ParseError(path, None, "rule block must be a YAML mapping") if "id" not in block_data: raise ParseError(path, None, "rule block missing required field 'id'") if "action" not in block_data: raise ParseError(path, None, f"rule {block_data['id']!r} missing 'action'") rules.append(block_data) elif lang == "instruction": if not isinstance(block_data, dict): raise ParseError(path, None, "instruction block must be a YAML mapping") if "id" not in block_data: raise ParseError(path, None, "instruction block missing required field 'id'") instructions.append(block_data) return ActivityDefinitionDef( id=str(fm["id"]), name=str(fm["name"]), enabled=bool(fm.get("enabled", True)), trigger_config=trigger_config, context_sources=context_sources, rules=rules, instructions=instructions, governance=str(fm.get("governance", "publisher-declared")), owner=str(fm.get("owner", "")), status=str(fm.get("status", "active")), source_file=path, ) def scan_and_parse() -> list[ActivityDefinitionDef]: """Scan all definition directories and parse all .md files. Raises ParseError on any broken file — never silently skips. """ dirs = _scan_dirs() defs: list[ActivityDefinitionDef] = [] for d in dirs: for path in sorted(d.glob("*.md")): defs.append(parse_file(path)) return defs