"""artifact-store publish adapter for optimizer evidence (WP-0004 Part 3).""" from __future__ import annotations import json import os import uuid from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional from urllib import error, parse, request from ..metrics import OptimizerStore ENV_API_URL = "ARTIFACTSTORE_API_URL" ENV_API_TOKEN = "ARTIFACTSTORE_API_TOKEN" DEFAULT_RETENTION_CLASS = "raw-evidence" PRODUCER = "kaizen-agentic" @dataclass class PublishResult: package_id: str manifest_digest: Optional[str] files_uploaded: int retention_class: str def build_optimizer_manifest( project_root: Path, *, agents: Optional[List[str]] = None, ) -> Dict[str, Any]: """Manifest metadata for an optimizer evidence package.""" store = OptimizerStore(project_root) analysis = {} if store.analysis_path.exists(): analysis = json.loads(store.analysis_path.read_text(encoding="utf-8")) return { "schema": "kaizen-agentic/optimizer-evidence/v1", "project": project_root.name, "project_root": str(project_root.resolve()), "producer": PRODUCER, "retention_class": DEFAULT_RETENTION_CLASS, "retention_days": 180, "optimized_at": analysis.get("optimized_at"), "agents": agents or [item.get("agent") for item in analysis.get("agents", [])], "files": [ "optimizer/analysis.json", "optimizer/recommendations.jsonl", ], } def publish_optimizer_evidence( project_root: Path, *, api_url: str, token: str, subject: Optional[str] = None, retention_class: str = DEFAULT_RETENTION_CLASS, ) -> PublishResult: """Register optimizer outputs as an artifact-store package.""" store = OptimizerStore(project_root) if not store.analysis_path.exists(): raise FileNotFoundError( f"No optimizer analysis at {store.analysis_path}. " "Run: kaizen-agentic metrics optimize" ) manifest = build_optimizer_manifest(project_root) package_name = f"kaizen-optimizer-{project_root.name}" package_subject = subject or project_root.name created = _http_json( "POST", api_url, "/packages", token, { "name": package_name, "producer": PRODUCER, "subject": package_subject, "retention_class": retention_class, "metadata": manifest, }, ) package_id = created["id"] uploads = [ ( store.analysis_path, "optimizer/analysis.json", "application/json", ), ] if store.recommendations_path.exists(): uploads.append( ( store.recommendations_path, "optimizer/recommendations.jsonl", "application/x-ndjson", ) ) for path, relative_path, media_type in uploads: _http_multipart( api_url, f"/packages/{package_id}/files", token, fields={"relative_path": relative_path, "media_type": media_type}, file_field="file", file_name=path.name, file_content_type=media_type, file_bytes=path.read_bytes(), ) finalized = _http_json( "POST", api_url, f"/packages/{package_id}/finalize", token, {}, ) return PublishResult( package_id=package_id, manifest_digest=finalized.get("manifest_digest"), files_uploaded=len(uploads), retention_class=retention_class, ) def default_api_url() -> str: return os.environ.get(ENV_API_URL, "http://127.0.0.1:8000").rstrip("/") def default_api_token() -> str: return os.environ.get(ENV_API_TOKEN, "") def _http_json( method: str, base_url: str, path: str, token: str, payload: Dict[str, Any], ) -> Dict[str, Any]: body = json.dumps(payload).encode("utf-8") if payload else None headers = {"Accept": "application/json"} if body is not None: headers["Content-Type"] = "application/json" response = _http_bytes(method, base_url, path, token, body=body, headers=headers) decoded = json.loads(response) if not isinstance(decoded, dict): raise ValueError(f"expected JSON object from {path}") return decoded def _http_multipart( base_url: str, path: str, token: str, *, fields: Dict[str, str], file_field: str, file_name: str, file_content_type: str, file_bytes: bytes, ) -> Dict[str, Any]: boundary = f"kaizen-{uuid.uuid4().hex}" body = bytearray() for name, value in fields.items(): body.extend(f"--{boundary}\r\n".encode("ascii")) body.extend( f'Content-Disposition: form-data; name="{_quote(name)}"\r\n\r\n'.encode() ) body.extend(value.encode()) body.extend(b"\r\n") body.extend(f"--{boundary}\r\n".encode("ascii")) body.extend( ( f'Content-Disposition: form-data; name="{_quote(file_field)}"; ' f'filename="{_quote(file_name)}"\r\n' f"Content-Type: {file_content_type}\r\n\r\n" ).encode() ) body.extend(file_bytes) body.extend(b"\r\n") body.extend(f"--{boundary}--\r\n".encode("ascii")) response = _http_bytes( "POST", base_url, path, token, body=bytes(body), headers={ "Content-Type": f"multipart/form-data; boundary={boundary}", "Accept": "application/json", }, ) decoded = json.loads(response) if not isinstance(decoded, dict): raise ValueError(f"expected JSON object from {path}") return decoded def _http_bytes( method: str, base_url: str, path: str, token: str, *, body: Optional[bytes] = None, headers: Optional[Dict[str, str]] = None, ) -> bytes: url = f"{base_url.rstrip('/')}/{path.lstrip('/')}" effective_headers = dict(headers or {}) if token: effective_headers["Authorization"] = f"Bearer {token}" req = request.Request(url, data=body, headers=effective_headers, method=method) try: with request.urlopen(req, timeout=30) as resp: return resp.read() except error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise RuntimeError(f"HTTP {exc.code} from {path}: {detail}") from exc def _quote(value: str) -> str: return parse.quote(value, safe="")