"""Replay llm-connect audit records without making provider calls.""" from __future__ import annotations import argparse import json from pathlib import Path from typing import Any from llm_connect.claude_code import _unwrap_cli_json_envelope from llm_connect.models import RunConfig def parse_audit_record(record: dict[str, Any]) -> dict[str, Any]: """Parse the recorded provider response and compare it to saved content.""" config = RunConfig.from_dict(record.get("config", {})) provider = record.get("provider") or _infer_provider(record) provider_response = record.get("provider_response") or {} body = provider_response.get("body") parsed_content = _parse_provider_response(provider, body, config) recorded_content = record.get("parsed_content") schema_check = _check_structured_output(parsed_content, config.model_params.get("json_schema")) return { "provider": provider, "parsed_content": parsed_content, "matches_recorded_content": parsed_content == recorded_content, "structured_output": schema_check, } def main(argv: list[str] | None = None) -> None: parser = argparse.ArgumentParser( prog="python -m llm_connect.replay", description="Replay parsing for a llm-connect audit JSON file.", ) parser.add_argument("audit_file", help="Path to an audit JSON file") parser.add_argument("--json", action="store_true", help="Print the full replay report") args = parser.parse_args(argv) record = json.loads(Path(args.audit_file).read_text(encoding="utf-8")) report = parse_audit_record(record) if args.json: print(json.dumps(report, indent=2, sort_keys=True)) else: print(report["parsed_content"]) def _parse_provider_response(provider: str | None, body: Any, config: RunConfig) -> str: if provider in {"openai", "openrouter"}: if isinstance(body, dict): choice = (body.get("choices") or [{}])[0] return choice.get("message", {}).get("content", "") return "" if provider == "gemini": if isinstance(body, dict): candidates = body.get("candidates") or [] if not candidates: return "" parts = candidates[0].get("content", {}).get("parts", []) return "".join(part.get("text", "") for part in parts) return "" if provider == "claude-code": if isinstance(body, dict): return _unwrap_cli_json_envelope(body.get("stdout", ""), config) return "" if isinstance(body, str): return body if body is None: return "" return json.dumps(body) def _infer_provider(record: dict[str, Any]) -> str | None: request = record.get("provider_request") or {} url = request.get("url", "") if "openrouter.ai" in url: return "openrouter" if "api.openai.com" in url: return "openai" if "generativelanguage.googleapis.com" in url: return "gemini" if request.get("command"): return "claude-code" return None def _check_structured_output(content: str, schema: Any) -> dict[str, Any]: if not schema: return {"checked": False} if isinstance(schema, str): try: schema = json.loads(schema) except ValueError as exc: return {"checked": True, "valid": False, "error": f"invalid schema JSON: {exc}"} if not isinstance(schema, dict): return {"checked": True, "valid": False, "error": "schema must be an object"} try: parsed = json.loads(content) except ValueError as exc: return {"checked": True, "valid": False, "error": f"invalid output JSON: {exc}"} missing = [] if schema.get("type") == "object": if not isinstance(parsed, dict): return {"checked": True, "valid": False, "error": "output is not an object"} for key in schema.get("required", []): if key not in parsed: missing.append(key) if missing: return {"checked": True, "valid": False, "missing_required": missing} return {"checked": True, "valid": True} if __name__ == "__main__": main()