generated from coulomb/repo-seed
134 lines
4.4 KiB
Python
134 lines
4.4 KiB
Python
#!/usr/bin/env python3
|
|
"""Record guide-board artifact package linkage in State Hub."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
def main() -> None:
|
|
state_hub_url = _env("STATE_HUB_URL", "http://127.0.0.1:8000").rstrip("/")
|
|
artifact_api_url = _env("ARTIFACTSTORE_API_URL", "http://127.0.0.1:8000").rstrip("/")
|
|
run_dir = Path(_required("GUIDE_BOARD_RUN_DIR"))
|
|
run_json = _read_json(run_dir / "run.json")
|
|
retention_summary = _read_json(run_dir / "retention-summary.json")
|
|
ingest_result = _ingest_result()
|
|
|
|
package_id = _env("ARTIFACTSTORE_PACKAGE_ID") or _required_from(
|
|
ingest_result,
|
|
"package_id",
|
|
"ARTIFACTSTORE_PACKAGE_ID",
|
|
)
|
|
manifest_digest = _env("ARTIFACTSTORE_MANIFEST_DIGEST") or _required_from(
|
|
ingest_result,
|
|
"manifest_digest",
|
|
"ARTIFACTSTORE_MANIFEST_DIGEST",
|
|
)
|
|
run_id = _env("GUIDE_BOARD_RUN_ID") or str(
|
|
run_json.get("run_id") or run_json.get("id") or retention_summary.get("run_id")
|
|
)
|
|
summary = retention_summary.get("summary", {})
|
|
if not isinstance(summary, dict):
|
|
summary = {}
|
|
result_status = _env("GUIDE_BOARD_RESULT_STATUS") or str(
|
|
run_json.get("result_status") or run_json.get("status") or summary.get("status")
|
|
)
|
|
|
|
detail: dict[str, Any] = {
|
|
"producer": "guide-board",
|
|
"artifact_store_api_url": artifact_api_url,
|
|
"run_dir": str(run_dir),
|
|
"run_id": run_id,
|
|
"target_profile_ref": str(run_json["target_profile_ref"]),
|
|
"assessment_profile_ref": str(run_json["assessment_profile_ref"]),
|
|
"result_status": result_status,
|
|
"package_id": package_id,
|
|
"manifest_digest": manifest_digest,
|
|
}
|
|
if "file_count" in ingest_result:
|
|
detail["file_count"] = ingest_result["file_count"]
|
|
retention_class = _env("ARTIFACTSTORE_RETENTION_CLASS")
|
|
if retention_class:
|
|
detail["retention_class"] = retention_class
|
|
|
|
payload: dict[str, Any] = {
|
|
"event_type": _env("STATE_HUB_EVENT_TYPE", "artifact_link"),
|
|
"author": _env("STATE_HUB_AUTHOR", "artifact-store"),
|
|
"summary": _env(
|
|
"STATE_HUB_SUMMARY",
|
|
f"guide-board run {run_id} artifacts stored in artifact-store package {package_id}",
|
|
),
|
|
"detail": detail,
|
|
}
|
|
for field, env_name in (
|
|
("topic_id", "STATE_HUB_TOPIC_ID"),
|
|
("workstream_id", "STATE_HUB_WORKSTREAM_ID"),
|
|
("task_id", "STATE_HUB_TASK_ID"),
|
|
("session_id", "STATE_HUB_SESSION_ID"),
|
|
):
|
|
value = _env(env_name)
|
|
if value:
|
|
payload[field] = value
|
|
|
|
request = urllib.request.Request(
|
|
f"{state_hub_url}/progress/",
|
|
data=json.dumps(payload).encode("utf-8"),
|
|
headers={"Content-Type": "application/json", "Accept": "application/json"},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=30) as response:
|
|
print(response.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
detail_text = exc.read().decode("utf-8", errors="replace")
|
|
raise SystemExit(f"HTTP {exc.code}: {detail_text}") from exc
|
|
|
|
|
|
def _env(name: str, default: str = "") -> str:
|
|
return os.environ.get(name, default)
|
|
|
|
|
|
def _required(name: str) -> str:
|
|
value = _env(name)
|
|
if not value:
|
|
raise SystemExit(f"missing required environment variable: {name}")
|
|
return value
|
|
|
|
|
|
def _required_from(payload: dict[str, Any], key: str, env_name: str) -> str:
|
|
value = payload.get(key)
|
|
if isinstance(value, str) and value:
|
|
return value
|
|
raise SystemExit(f"missing {key!r}; set {env_name} or ARTIFACTSTORE_INGEST_RESULT_PATH")
|
|
|
|
|
|
def _ingest_result() -> dict[str, Any]:
|
|
raw_json = _env("ARTIFACTSTORE_INGEST_RESULT_JSON")
|
|
if raw_json:
|
|
payload = json.loads(raw_json)
|
|
if not isinstance(payload, dict):
|
|
raise SystemExit("ARTIFACTSTORE_INGEST_RESULT_JSON must be a JSON object")
|
|
return payload
|
|
|
|
result_path = _env("ARTIFACTSTORE_INGEST_RESULT_PATH")
|
|
if result_path:
|
|
return _read_json(Path(result_path))
|
|
return {}
|
|
|
|
|
|
def _read_json(path: Path) -> dict[str, Any]:
|
|
with path.open("r", encoding="utf-8") as fh:
|
|
payload = json.load(fh)
|
|
if not isinstance(payload, dict):
|
|
raise SystemExit(f"{path} must contain a JSON object")
|
|
return payload
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|