"""Form state and deterministic runtime rule evaluation.""" from __future__ import annotations import re from dataclasses import asdict, dataclass, field, replace from typing import Any from markitect_tool.contract.model import DocumentContract, FieldSpec, SectionSpec from markitect_tool.core import Document from markitect_tool.diagnostics import ( Diagnostic, SourceLocation, has_error, valid_severity, ) from markitect_tool.runtime.context import RuntimeContext from markitect_tool.runtime.paths import comparable_value, resolve_path from markitect_tool.runtime.rules import evaluate_condition @dataclass(frozen=True) class FieldState: """UI-neutral runtime state for one contract field.""" id: str path: str | None = None source: str | None = None type: str | None = None label: str | None = None description: str | None = None value: Any = None exists: bool = False origin: str = "missing" required: bool = False visible: bool = True enabled: bool = True allowed_values: list[Any] | None = None diagnostics: list[Diagnostic] = field(default_factory=list) metadata: dict[str, Any] = field(default_factory=dict) @property def valid(self) -> bool: return not has_error(self.diagnostics) def to_dict(self) -> dict[str, Any]: data = asdict(self) data["diagnostics"] = [diagnostic.to_dict() for diagnostic in self.diagnostics] return _drop_empty(data) @dataclass(frozen=True) class FormState: """A complete runtime form state derived from a document contract.""" contract_id: str | None document_path: str | None = None context_path: str | None = None fields: list[FieldState] = field(default_factory=list) diagnostics: list[Diagnostic] = field(default_factory=list) rules_applied: list[str] = field(default_factory=list) metadata: dict[str, Any] = field(default_factory=dict) @property def valid(self) -> bool: return not has_error(self.diagnostics) @property def field_values(self) -> dict[str, Any]: return {field.id: field.value for field in self.fields if field.exists} def to_dict(self) -> dict[str, Any]: data = { "valid": self.valid, "contract_id": self.contract_id, "document_path": self.document_path, "context_path": self.context_path, "field_values": self.field_values, "fields": [field.to_dict() for field in self.fields], "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], "rules_applied": self.rules_applied, "metadata": self.metadata, } return _drop_empty(data) def evaluate_form_state( document: Document, contract: DocumentContract, runtime_context: RuntimeContext | None = None, ) -> FormState: """Evaluate contract fields, prefill sources, and dynamic runtime rules.""" context = runtime_context or RuntimeContext.empty() bindings = build_runtime_bindings(document, contract, context) fields = [ _resolve_field_state(field_spec, document, contract, bindings) for field_spec in contract.fields ] diagnostics: list[Diagnostic] = list(context.diagnostics) rules_applied: list[str] = [] fields_by_id = {field.id: field for field in fields} bindings = build_runtime_bindings(document, contract, context, fields_by_id) fields_by_id, rule_diagnostics, rules_applied = _apply_dynamic_rules( fields_by_id, document, contract, context, bindings, ) diagnostics.extend(rule_diagnostics) validated_fields: list[FieldState] = [] for field_spec in contract.fields: field_state = fields_by_id[_field_key(field_spec)] field_diagnostics = [ *field_state.diagnostics, *_validate_field_state(field_spec, field_state, document, contract), ] validated_fields.append(replace(field_state, diagnostics=field_diagnostics)) diagnostics.extend(field_diagnostics) return FormState( contract_id=contract.id, document_path=document.source_path, context_path=context.source_path, fields=validated_fields, diagnostics=diagnostics, rules_applied=rules_applied, ) def build_runtime_bindings( document: Document, contract: DocumentContract, context: RuntimeContext, fields_by_id: dict[str, FieldState] | None = None, ) -> dict[str, Any]: """Build deterministic bindings used by field sources and rule conditions.""" field_states = fields_by_id or {} return { "document": document.to_dict(), "frontmatter": document.frontmatter, "context": context.binding(), "contract": contract.to_dict(), "fields": { field_id: field.to_dict() | {"value": field.value} for field_id, field in field_states.items() }, "field_values": { field_id: field.value for field_id, field in field_states.items() if field.exists }, } def _resolve_field_state( field_spec: FieldSpec, document: Document, contract: DocumentContract, bindings: dict[str, Any], ) -> FieldState: field_id = field_spec.id or "" path = field_spec.path or (f"frontmatter.{field_id}" if field_spec.id else None) source_candidates = _source_candidates(field_spec) diagnostics: list[Diagnostic] = [] document_value, document_exists = resolve_path(bindings, path) source_values = [ (source, value) for source in source_candidates for value, exists in [resolve_path(bindings, source)] if exists ] if document_exists: origin = "manual" value = document_value exists = True for source, source_value in source_values: if comparable_value(source_value) != comparable_value(document_value): diagnostics.append( _diagnostic( "runtime.field.conflict", ( f"Field `{field_id}` is provided by the document and " f"context source `{source}` with different values." ), document=document, contract=contract, rule_id=field_id, severity=_conflict_severity(field_spec), details={ "path": path, "source": source, "document_value": comparable_value(document_value), "context_value": comparable_value(source_value), }, ) ) elif source_values: origin = "prefilled" value = source_values[0][1] exists = True distinct = { repr(comparable_value(item_value)) for _source, item_value in source_values } if len(distinct) > 1: diagnostics.append( _diagnostic( "runtime.field.ambiguous", f"Field `{field_id}` has multiple source values.", document=document, contract=contract, rule_id=field_id, details={"sources": [source for source, _value in source_values]}, ) ) elif field_spec.default is not None: origin = "defaulted" value = field_spec.default exists = True else: origin = "missing" value = None exists = False return FieldState( id=field_id, path=path, source=source_candidates[0] if source_candidates else field_spec.source, type=field_spec.type, label=field_spec.label, description=field_spec.description, value=value, exists=exists, origin=origin, required=field_spec.required, allowed_values=field_spec.enum, diagnostics=diagnostics, metadata={ "sources": source_candidates, "coerce": bool(field_spec.raw.get("coerce", False)) if isinstance(field_spec.raw, dict) else False, }, ) def _source_candidates(field_spec: FieldSpec) -> list[str]: raw = field_spec.raw if isinstance(field_spec.raw, dict) else {} candidates: list[str] = [] source = raw.get("source", field_spec.source) if isinstance(source, list): candidates.extend(str(item) for item in source if item) elif source: candidates.append(str(source)) sources = raw.get("sources") if isinstance(sources, list): candidates.extend(str(item) for item in sources if item) return list(dict.fromkeys(candidates)) def _field_key(field_spec: FieldSpec) -> str: return field_spec.id or "" def _apply_dynamic_rules( fields_by_id: dict[str, FieldState], document: Document, contract: DocumentContract, context: RuntimeContext, bindings: dict[str, Any], ) -> tuple[dict[str, FieldState], list[Diagnostic], list[str]]: diagnostics: list[Diagnostic] = [] applied: list[str] = [] rules = contract.rules for index, rule in enumerate(rules): if not isinstance(rule, dict): diagnostics.append( _diagnostic( "runtime.rule.invalid", "Dynamic rule must be a mapping.", document=document, contract=contract, ) ) continue rule_id = str(rule.get("id") or f"rule-{index + 1}") result = evaluate_condition(rule.get("if"), bindings, rule_id=rule_id) diagnostics.extend( _with_contract_location(item, document=document, contract=contract) for item in result.diagnostics ) action = rule.get("then") if result.matched else rule.get("else") if result.matched: applied.append(rule_id) if action is not None: fields_by_id, action_diagnostics = _apply_rule_action( fields_by_id, action, document, contract, context, rule_id, ) diagnostics.extend(action_diagnostics) bindings = build_runtime_bindings(document, contract, context, fields_by_id) if "assert" in rule and result.matched: diagnostics.extend( _evaluate_runtime_assertions( rule["assert"], bindings, document, contract, rule_id, rule, ) ) return fields_by_id, diagnostics, applied def _apply_rule_action( fields_by_id: dict[str, FieldState], action: Any, document: Document, contract: DocumentContract, context: RuntimeContext, rule_id: str, ) -> tuple[dict[str, FieldState], list[Diagnostic]]: if not isinstance(action, dict): return fields_by_id, [ _diagnostic( "runtime.rule.action_invalid", f"Dynamic rule `{rule_id}` action must be a mapping.", document=document, contract=contract, rule_id=rule_id, ) ] diagnostics: list[Diagnostic] = [] updated = dict(fields_by_id) bindings = build_runtime_bindings(document, contract, context, updated) for field_id in _field_id_list(action.get("required")): updated, diagnostics = _set_field_attr( updated, diagnostics, field_id, {"required": True}, document, contract, rule_id ) for field_id in _field_id_list(action.get("optional")): updated, diagnostics = _set_field_attr( updated, diagnostics, field_id, {"required": False}, document, contract, rule_id ) for field_id, visible in _field_bool_mapping(action.get("visible")).items(): updated, diagnostics = _set_field_attr( updated, diagnostics, field_id, {"visible": visible}, document, contract, rule_id ) for field_id in _field_id_list(action.get("hidden")): updated, diagnostics = _set_field_attr( updated, diagnostics, field_id, {"visible": False}, document, contract, rule_id ) for field_id, enabled in _field_bool_mapping(action.get("enabled")).items(): updated, diagnostics = _set_field_attr( updated, diagnostics, field_id, {"enabled": enabled}, document, contract, rule_id ) for field_id in _field_id_list(action.get("disabled")): updated, diagnostics = _set_field_attr( updated, diagnostics, field_id, {"enabled": False}, document, contract, rule_id ) if isinstance(action.get("allowed_values"), dict): for field_id, allowed in action["allowed_values"].items(): updated, diagnostics = _set_field_attr( updated, diagnostics, str(field_id), {"allowed_values": allowed if isinstance(allowed, list) else [allowed]}, document, contract, rule_id, ) if isinstance(action.get("set"), dict): for field_id, raw_value in action["set"].items(): value = _resolve_template_value(raw_value, bindings) updated, diagnostics = _set_field_attr( updated, diagnostics, str(field_id), {"value": value, "exists": True, "origin": "calculated"}, document, contract, rule_id, ) if "assert" in action: diagnostics.extend( _evaluate_runtime_assertions( action["assert"], build_runtime_bindings(document, contract, context, updated), document, contract, rule_id, action, ) ) if "sections" in action: diagnostics.extend(_check_dynamic_sections(action["sections"], document, contract, rule_id)) return updated, diagnostics def _set_field_attr( fields_by_id: dict[str, FieldState], diagnostics: list[Diagnostic], field_id: str, updates: dict[str, Any], document: Document, contract: DocumentContract, rule_id: str, ) -> tuple[dict[str, FieldState], list[Diagnostic]]: if field_id not in fields_by_id: diagnostics.append( _diagnostic( "runtime.rule.unknown_field", f"Dynamic rule `{rule_id}` references unknown field `{field_id}`.", document=document, contract=contract, rule_id=rule_id, ) ) return fields_by_id, diagnostics fields_by_id[field_id] = replace(fields_by_id[field_id], **updates) return fields_by_id, diagnostics def _evaluate_runtime_assertions( raw_assertions: Any, bindings: dict[str, Any], document: Document, contract: DocumentContract, rule_id: str, rule: dict[str, Any], ) -> list[Diagnostic]: assertions = raw_assertions if isinstance(raw_assertions, list) else [raw_assertions] diagnostics: list[Diagnostic] = [] for assertion in assertions: result = evaluate_condition(assertion, bindings, rule_id=rule_id) diagnostics.extend( _with_contract_location(item, document=document, contract=contract) for item in result.diagnostics ) if not result.matched: severity = _severity_from_mapping(assertion, rule) message = ( assertion.get("message") if isinstance(assertion, dict) and assertion.get("message") else rule.get("message") or f"Runtime assertion `{rule_id}` was not satisfied." ) diagnostics.append( _diagnostic( "runtime.rule.assertion_failed", str(message), document=document, contract=contract, rule_id=rule_id, severity=severity, details={"assertion": assertion if isinstance(assertion, dict) else None}, ) ) return diagnostics def _check_dynamic_sections( raw_sections: Any, document: Document, contract: DocumentContract, rule_id: str, ) -> list[Diagnostic]: if not isinstance(raw_sections, dict): return [ _diagnostic( "runtime.rule.sections_invalid", "`sections` action must be a mapping.", document=document, contract=contract, rule_id=rule_id, ) ] section_specs = {section.id: section for section in contract.sections if section.id} diagnostics: list[Diagnostic] = [] for section_id, raw in raw_sections.items(): section_spec = section_specs.get(str(section_id)) if section_spec is None: diagnostics.append( _diagnostic( "runtime.rule.unknown_section", f"Dynamic rule `{rule_id}` references unknown section `{section_id}`.", document=document, contract=contract, rule_id=rule_id, ) ) continue presence = raw.get("presence") if isinstance(raw, dict) else raw diagnostics.extend(_check_dynamic_section_presence(document, contract, section_spec, str(presence), rule_id)) return diagnostics def _check_dynamic_section_presence( document: Document, contract: DocumentContract, section_spec: SectionSpec, presence: str, rule_id: str, ) -> list[Diagnostic]: matches = _matching_section_lines(document, section_spec) if presence == "required" and not matches: return [ _diagnostic( "runtime.section.missing", f"Dynamic rule requires section `{section_spec.id}`.", document=document, contract=contract, rule_id=rule_id, guidance=f"Add a {'#' * (section_spec.level or 2)} {section_spec.title or section_spec.id} section.", ) ] if presence == "recommended" and not matches: return [ _diagnostic( "runtime.section.recommended_missing", f"Dynamic rule recommends section `{section_spec.id}`.", document=document, contract=contract, rule_id=rule_id, severity="warning", ) ] if presence == "forbidden" and matches: return [ _diagnostic( "runtime.section.forbidden", f"Dynamic rule forbids section `{section_spec.id}`.", document=document, contract=contract, rule_id=rule_id, source_line=matches[0], ) ] if presence == "discouraged" and matches: return [ _diagnostic( "runtime.section.discouraged", f"Dynamic rule discourages section `{section_spec.id}`.", document=document, contract=contract, rule_id=rule_id, source_line=matches[0], severity="warning", ) ] return [] def _validate_field_state( field_spec: FieldSpec, field_state: FieldState, document: Document, contract: DocumentContract, ) -> list[Diagnostic]: diagnostics: list[Diagnostic] = [] if field_state.required and field_state.visible and not field_state.exists: diagnostics.append( _diagnostic( "contract.field.missing", f"Required field `{field_state.id}` is missing.", document=document, contract=contract, rule_id=field_state.id, guidance=f"Provide `{field_state.path}` in the document or context.", ) ) return diagnostics if not field_state.exists: return diagnostics value = field_state.value if field_state.metadata.get("coerce"): value, coerced, coercion_error = _coerce_value(value, field_state.type) if coercion_error: diagnostics.append( _diagnostic( "runtime.field.coercion_failed", f"Field `{field_state.id}` could not be coerced to `{field_state.type}`.", document=document, contract=contract, rule_id=field_state.id, ) ) elif coerced: object.__setattr__(field_state, "value", value) if field_state.type and not _value_matches_type(value, field_state.type): diagnostics.append( _diagnostic( "contract.field.type_mismatch", ( f"Field `{field_state.id}` must be `{field_state.type}`, " f"got `{type(value).__name__}`." ), document=document, contract=contract, rule_id=field_state.id, ) ) allowed_values = field_state.allowed_values if allowed_values is not None and value not in allowed_values: diagnostics.append( _diagnostic( "contract.field.enum", f"Field `{field_state.id}` must be one of {allowed_values}.", document=document, contract=contract, rule_id=field_state.id, ) ) if field_spec.pattern and isinstance(value, str) and not re.search(field_spec.pattern, value): diagnostics.append( _diagnostic( "contract.field.pattern", f"Field `{field_state.id}` does not match its required pattern.", document=document, contract=contract, rule_id=field_state.id, ) ) if field_spec.min_length is not None and hasattr(value, "__len__") and len(value) < field_spec.min_length: diagnostics.append( _diagnostic( "contract.field.min_length", f"Field `{field_state.id}` is shorter than {field_spec.min_length}.", document=document, contract=contract, rule_id=field_state.id, ) ) if field_spec.max_length is not None and hasattr(value, "__len__") and len(value) > field_spec.max_length: diagnostics.append( _diagnostic( "contract.field.max_length", f"Field `{field_state.id}` is longer than {field_spec.max_length}.", document=document, contract=contract, rule_id=field_state.id, ) ) if field_spec.min is not None and isinstance(value, int | float) and value < field_spec.min: diagnostics.append( _diagnostic( "contract.field.min", f"Field `{field_state.id}` is below {field_spec.min}.", document=document, contract=contract, rule_id=field_state.id, ) ) if field_spec.max is not None and isinstance(value, int | float) and value > field_spec.max: diagnostics.append( _diagnostic( "contract.field.max", f"Field `{field_state.id}` is above {field_spec.max}.", document=document, contract=contract, rule_id=field_state.id, ) ) return diagnostics def _coerce_value(value: Any, expected_type: str | None) -> tuple[Any, bool, bool]: if expected_type == "string" and not isinstance(value, str): return str(value), True, False if expected_type == "integer" and isinstance(value, str): try: return int(value), True, False except ValueError: return value, False, True if expected_type == "number" and isinstance(value, str): try: return float(value), True, False except ValueError: return value, False, True if expected_type == "boolean" and isinstance(value, str): normalized = value.strip().lower() if normalized in {"true", "yes", "1"}: return True, True, False if normalized in {"false", "no", "0"}: return False, True, False return value, False, True return value, False, False def _value_matches_type(value: Any, expected_type: str) -> bool: if expected_type == "string": return isinstance(value, str) if expected_type == "number": return isinstance(value, int | float) and not isinstance(value, bool) if expected_type == "integer": return isinstance(value, int) and not isinstance(value, bool) if expected_type == "boolean": return isinstance(value, bool) if expected_type == "array": return isinstance(value, list) if expected_type == "object": return isinstance(value, dict) if expected_type == "date": return isinstance(value, str) return True def _field_id_list(value: Any) -> list[str]: if value is None: return [] if isinstance(value, str): return [value] if isinstance(value, list): return [str(item) for item in value if item] if isinstance(value, dict): return [str(key) for key, enabled in value.items() if enabled] return [] def _field_bool_mapping(value: Any) -> dict[str, bool]: if value is None: return {} if isinstance(value, dict): return {str(key): bool(item) for key, item in value.items()} if isinstance(value, str): return {value: True} if isinstance(value, list): return {str(item): True for item in value if item} return {} def _resolve_template_value(value: Any, bindings: dict[str, Any]) -> Any: if isinstance(value, str): full = re.fullmatch(r"\$\{([^}]+)\}", value.strip()) if full: resolved, exists = resolve_path(bindings, full.group(1)) return resolved if exists else value def replace_match(match: re.Match[str]) -> str: resolved, exists = resolve_path(bindings, match.group(1)) return str(resolved) if exists else match.group(0) return re.sub(r"\$\{([^}]+)\}", replace_match, value) if isinstance(value, list): return [_resolve_template_value(item, bindings) for item in value] if isinstance(value, dict): return {key: _resolve_template_value(item, bindings) for key, item in value.items()} return value def _conflict_severity(field_spec: FieldSpec) -> str: raw = field_spec.raw if isinstance(field_spec.raw, dict) else {} severity = raw.get("conflict") or raw.get("conflict_severity") or "warning" return str(severity) if valid_severity(str(severity)) else "warning" def _severity_from_mapping(*items: Any) -> str: for item in items: if isinstance(item, dict) and valid_severity(str(item.get("severity"))): return str(item["severity"]) return "error" def _matching_section_lines(document: Document, section_spec: SectionSpec) -> list[int]: expected = {_normalize_heading(value) for value in section_spec.headings} return [ section.heading.line for section in document.sections if _normalize_heading(section.heading.text) in expected ] def _normalize_heading(text: str) -> str: return re.sub(r"\s+", " ", text.strip().lower()) def _with_contract_location( diagnostic: Diagnostic, *, document: Document, contract: DocumentContract, ) -> Diagnostic: return Diagnostic( severity=diagnostic.severity, code=diagnostic.code, message=diagnostic.message, source=diagnostic.source or SourceLocation(path=document.source_path), contract=diagnostic.contract or SourceLocation(path=contract.source_path, line=contract.source_line), rule_id=diagnostic.rule_id, guidance=diagnostic.guidance, details=diagnostic.details, ) def _diagnostic( code: str, message: str, *, document: Document, contract: DocumentContract, rule_id: str | None = None, severity: str = "error", guidance: str | None = None, details: dict[str, Any] | None = None, source_line: int | None = 1, ) -> Diagnostic: return Diagnostic( severity=severity, code=code, message=message, source=SourceLocation(path=document.source_path, line=source_line), contract=SourceLocation(path=contract.source_path, line=contract.source_line), rule_id=rule_id, guidance=guidance, details=details or {}, ) def _drop_empty(data: dict[str, Any]) -> dict[str, Any]: return { key: value for key, value in data.items() if value not in (None, [], {}, "") }