generated from coulomb/repo-seed
Ran Railiance01 cluster validation for POST /admin/sync without restarting actcore-worker, added a repeatable smoke script, and closed the workplan.
212 lines
6.8 KiB
Python
Executable File
212 lines
6.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Railiance01 no-restart smoke for POST /admin/sync.
|
|
|
|
Patches the disabled ops-service-inventory-probes projection in the cluster
|
|
ConfigMap, waits for the API pod volume to refresh, runs /admin/sync twice,
|
|
verifies DB + Temporal schedule drift without restarting actcore-worker, then
|
|
rolls the ConfigMap back to the disabled baseline.
|
|
|
|
Requires:
|
|
- KUBECONFIG pointing at railiance01 (for example ~/.kube/config-hosteurope)
|
|
- kubectl access to the activity-core namespace
|
|
|
|
Example:
|
|
export KUBECONFIG=~/.kube/config-hosteurope
|
|
python3 scripts/smoke_admin_sync_no_restart.py
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
ACTIVITY_ID = "40d15a87-7ff6-4d8e-992c-37df15f95110"
|
|
CONFIGMAP = "actcore-external-activity-definitions"
|
|
DEFINITION_KEY = "ops-service-inventory-probes.md"
|
|
MOUNTED_FILE = (
|
|
"/etc/activity-core/external-definitions/activity-definitions/"
|
|
f"{DEFINITION_KEY}"
|
|
)
|
|
VOLUME_PROPAGATION_SECONDS = 65
|
|
|
|
|
|
def kubectl(*args: str, input_text: str | None = None) -> str:
|
|
cmd = ["kubectl", "-n", "activity-core", *args]
|
|
return subprocess.check_output(
|
|
cmd,
|
|
input=input_text,
|
|
text=True,
|
|
)
|
|
|
|
|
|
def api_json(path: str, *, method: str = "GET") -> dict:
|
|
script = (
|
|
"import urllib.request, json\n"
|
|
f'req = urllib.request.Request("http://localhost:8010{path}", method="{method}")\n'
|
|
"print(urllib.request.urlopen(req).read().decode())"
|
|
)
|
|
return json.loads(kubectl("exec", "deploy/actcore-api", "--", "python3", "-c", script))
|
|
|
|
|
|
def worker_lines(script: str) -> list[str]:
|
|
return kubectl("exec", "deploy/actcore-worker", "--", "python3", "-c", script).splitlines()
|
|
|
|
|
|
def worker_uid() -> str:
|
|
return kubectl(
|
|
"get",
|
|
"pod",
|
|
"-l",
|
|
"app.kubernetes.io/name=actcore-worker",
|
|
"-o",
|
|
"jsonpath={.items[0].metadata.uid}",
|
|
).strip()
|
|
|
|
|
|
def load_configmap() -> dict:
|
|
return json.loads(kubectl("get", "configmap", CONFIGMAP, "-o", "json"))
|
|
|
|
|
|
def apply_configmap(cm: dict) -> None:
|
|
kubectl("apply", "-f", "-", input_text=json.dumps(cm))
|
|
|
|
|
|
def patch_definition(cm: dict, *, enabled: bool, cron: str) -> None:
|
|
text = cm["data"][DEFINITION_KEY]
|
|
for line in text.splitlines():
|
|
if line.strip().startswith("enabled:"):
|
|
break
|
|
else:
|
|
raise RuntimeError("enabled field not found in projection")
|
|
|
|
text = _replace_once(text, 'enabled: false', f"enabled: {'true' if enabled else 'false'}")
|
|
text = _replace_once(text, 'enabled: true', f"enabled: {'true' if enabled else 'false'}")
|
|
text = _replace_once(
|
|
text,
|
|
'cron_expression: "15 * * * *"',
|
|
f'cron_expression: "{cron}"',
|
|
)
|
|
text = _replace_once(
|
|
text,
|
|
'cron_expression: "25 * * * *"',
|
|
f'cron_expression: "{cron}"',
|
|
)
|
|
cm["data"][DEFINITION_KEY] = text
|
|
apply_configmap(cm)
|
|
|
|
|
|
def _replace_once(text: str, old: str, new: str) -> str:
|
|
if old not in text:
|
|
return text
|
|
return text.replace(old, new, 1)
|
|
|
|
|
|
def wait_for_mount(*, enabled: bool, cron: str) -> None:
|
|
deadline = time.time() + VOLUME_PROPAGATION_SECONDS
|
|
want_enabled = "enabled: true" if enabled else "enabled: false"
|
|
want_cron = f'cron_expression: "{cron}"'
|
|
while time.time() < deadline:
|
|
content = kubectl("exec", "deploy/actcore-api", "--", "cat", MOUNTED_FILE)
|
|
if want_enabled in content and want_cron in content:
|
|
return
|
|
time.sleep(5)
|
|
raise RuntimeError(
|
|
f"ConfigMap projection did not refresh within {VOLUME_PROPAGATION_SECONDS}s"
|
|
)
|
|
|
|
|
|
def get_definition() -> dict[str, object]:
|
|
for item in api_json("/activity-definitions/"):
|
|
if item["id"] == ACTIVITY_ID:
|
|
return {
|
|
"enabled": item["enabled"],
|
|
"cron": item["trigger_config"]["cron_expression"],
|
|
}
|
|
raise RuntimeError(f"ActivityDefinition {ACTIVITY_ID} not found")
|
|
|
|
|
|
def describe_schedule() -> dict[str, object]:
|
|
script = f"""
|
|
import asyncio
|
|
from temporalio.client import Client
|
|
|
|
async def main() -> None:
|
|
client = await Client.connect("actcore-temporal:7233")
|
|
handle = client.get_schedule_handle("activity-schedule-{ACTIVITY_ID}")
|
|
described = await handle.describe()
|
|
schedule = described.schedule
|
|
minute = schedule.spec.calendars[0].minute[0].start if schedule.spec.calendars else None
|
|
print(schedule.state.paused)
|
|
print(minute)
|
|
|
|
asyncio.run(main())
|
|
"""
|
|
paused, minute = worker_lines(script)
|
|
return {"paused": paused == "True", "minute": int(minute)}
|
|
|
|
|
|
def main() -> int:
|
|
worker_before = worker_uid()
|
|
cm = load_configmap()
|
|
|
|
print("1) enable + cadence change via ConfigMap")
|
|
patch_definition(cm, enabled=True, cron="25 * * * *")
|
|
wait_for_mount(enabled=True, cron="25 * * * *")
|
|
|
|
print("2) POST /admin/sync (first pass)")
|
|
sync1 = api_json("/admin/sync?definitions=true&schedules=true", method="POST")
|
|
if not sync1.get("ok"):
|
|
print(json.dumps(sync1, indent=2), file=sys.stderr)
|
|
return 1
|
|
|
|
defn = get_definition()
|
|
schedule = describe_schedule()
|
|
print(" definition:", defn)
|
|
print(" schedule:", schedule)
|
|
if defn != {"enabled": True, "cron": "25 * * * *"}:
|
|
print("definition drift after sync", file=sys.stderr)
|
|
return 1
|
|
if schedule["paused"] or schedule["minute"] != 25:
|
|
print("schedule drift after enable sync", file=sys.stderr)
|
|
return 1
|
|
|
|
print("3) POST /admin/sync (idempotent repeat)")
|
|
sync2 = api_json("/admin/sync?definitions=true&schedules=true", method="POST")
|
|
if sync2.get("schedules") != sync1.get("schedules"):
|
|
print("idempotent schedule counts changed", file=sys.stderr)
|
|
print(json.dumps({"sync1": sync1, "sync2": sync2}, indent=2), file=sys.stderr)
|
|
return 1
|
|
|
|
print("4) rollback ConfigMap + sync")
|
|
cm = load_configmap()
|
|
patch_definition(cm, enabled=False, cron="15 * * * *")
|
|
wait_for_mount(enabled=False, cron="15 * * * *")
|
|
sync3 = api_json("/admin/sync?definitions=true&schedules=true", method="POST")
|
|
if not sync3.get("ok"):
|
|
print(json.dumps(sync3, indent=2), file=sys.stderr)
|
|
return 1
|
|
|
|
defn = get_definition()
|
|
schedule = describe_schedule()
|
|
print(" definition:", defn)
|
|
print(" schedule:", schedule)
|
|
if defn != {"enabled": False, "cron": "15 * * * *"}:
|
|
print("rollback definition drift", file=sys.stderr)
|
|
return 1
|
|
if not schedule["paused"] or schedule["minute"] != 15:
|
|
print("rollback schedule drift", file=sys.stderr)
|
|
return 1
|
|
|
|
worker_after = worker_uid()
|
|
if worker_before != worker_after:
|
|
print("actcore-worker pod restarted during smoke", file=sys.stderr)
|
|
return 1
|
|
|
|
print("smoke passed: admin sync hot-reload without worker restart")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main()) |