From f6f3116ae78cd06963cc7506f3c66614040078ea Mon Sep 17 00:00:00 2001 From: tegwick Date: Tue, 5 May 2026 20:58:25 +0200 Subject: [PATCH] Testbased performance monitor --- README.md | 5 + docs/test-performance-monitoring.md | 109 +++++ tests/conftest.py | 697 ++++++++++++++++++++++++++++ 3 files changed, 811 insertions(+) create mode 100644 docs/test-performance-monitoring.md create mode 100644 tests/conftest.py diff --git a/README.md b/README.md index 4775a06..9882e1c 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ Start here: - `docs/markitect-tool-integration-usecases.md` - `docs/markitect-tool-capacity-risks.md` - `examples/markitect-tool-contract/` +- `docs/test-performance-monitoring.md` - `docs/phase-memory-boundary.md` - `docs/system-layer-extraction-inventory.md` - `docs/system-layer-migration-backlog.md` @@ -33,6 +34,10 @@ This repo uses Python 3.12+, setuptools, a `src/` package layout, and pytest. python3 -m pytest ``` +Pytest records a compact rolling performance history under +`.pytest_cache/kontextual/performance-history.json`; see +`docs/test-performance-monitoring.md`. + The first runtime slice implements artifacts, collections, relationships, in-memory storage, ingestion adapters, query, workflow run manifests, and agent-facing context packages. The current roadmap re-scopes the next work diff --git a/docs/test-performance-monitoring.md b/docs/test-performance-monitoring.md new file mode 100644 index 0000000..e8c7eb3 --- /dev/null +++ b/docs/test-performance-monitoring.md @@ -0,0 +1,109 @@ +# Test Performance Monitoring + +Date: 2026-05-05 + +Status: lightweight pytest performance history for local situational awareness. + +## Purpose + +The test suite records a compact performance history on every pytest run. The +goal is not detailed profiling. It is a small scorekeeping loop that helps us +notice negative drift while the engine grows. + +The monitor captures: + +- run start and finish timestamps, +- total test run duration, +- per-test duration and outcome, +- Python and platform identity, +- logical CPU count, +- load averages and load-per-CPU where available, +- memory total, available memory, and available ratio from `/proc/meminfo` + where available, +- process user/system CPU deltas and peak resident memory. + +## Storage + +Default history path: + +```text +.pytest_cache/kontextual/performance-history.json +``` + +`.pytest_cache/` is ignored by git, so regular test runs do not dirty the +repository. A different path can be supplied with `--perf-history-path` or +`KONTEXTUAL_PERF_HISTORY`. + +## Retention Model + +The JSON file keeps a bounded, compact record: + +- the last `N` raw runs, +- the last `N` rolling averages over the retained runs, +- the average of the last `N` rolling averages, +- one compact daily average record per day, updated on every run, +- daily records retained for a configurable number of days. + +Defaults: + +- `N = 20`, +- daily retention = `730` days, +- drift warning ratio = `35%`, +- minimum duration delta before warning = `0.05s`. + +Skipped tests are recorded in raw runs and aggregate counts, but they are not +used as per-test duration baselines. This keeps optional Markitect and capacity +tests from producing false regressions when they switch from skipped to +executed. + +## Warnings + +At the end of the pytest run, the monitor compares the current run with the +previous average-of-averages. It prints warnings for: + +- total run duration drift, when the executed test count is comparable, +- individual test duration drift, +- materially higher normalized start load, +- materially lower available-memory ratio. + +Warnings do not fail the test run. They are meant to create attention, not gate +development. + +## Configuration + +Disable monitoring: + +```bash +python3 -m pytest --perf-history-disable +``` + +or: + +```bash +KONTEXTUAL_PERF_MONITOR=0 python3 -m pytest +``` + +Override retention and warning thresholds: + +```bash +python3 -m pytest \ + --perf-history-window 30 \ + --perf-history-drift-ratio 0.50 \ + --perf-history-min-delta 0.10 +``` + +Environment equivalents: + +- `KONTEXTUAL_PERF_HISTORY`, +- `KONTEXTUAL_PERF_WINDOW`, +- `KONTEXTUAL_PERF_DAILY_RETENTION_DAYS`, +- `KONTEXTUAL_PERF_DRIFT_RATIO`, +- `KONTEXTUAL_PERF_MIN_DELTA_SECONDS`. + +## When To Profile Instead + +Use this monitor to spot drift and identify candidate tests or areas. If a +warning points to a real bottleneck, create a focused profiling experiment or a +capacity sentinel. Do not add large traces or per-function profiling data to +the rolling history. + diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..3967178 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,697 @@ +from __future__ import annotations + +import json +import os +import platform +import statistics +import sys +import time +from collections import defaultdict +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +try: + import resource +except ImportError: # pragma: no cover - Windows fallback + resource = None + + +HISTORY_SCHEMA_VERSION = 2 +DEFAULT_WINDOW_SIZE = 20 +DEFAULT_DAILY_RETENTION_DAYS = 730 +DEFAULT_DRIFT_RATIO = 0.35 +DEFAULT_MIN_DELTA_SECONDS = 0.05 +_PERF_MONITOR: PerformanceMonitor | None = None + + +def pytest_addoption(parser) -> None: + group = parser.getgroup("kontextual performance") + group.addoption( + "--perf-history-disable", + action="store_true", + help="Disable compact performance history capture for this pytest run.", + ) + group.addoption( + "--perf-history-path", + action="store", + default=None, + help="Write compact performance history to this JSON file.", + ) + group.addoption( + "--perf-history-window", + action="store", + default=None, + help="Number of recent runs and rolling averages to retain.", + ) + group.addoption( + "--perf-history-drift-ratio", + action="store", + default=None, + help="Relative slowdown ratio that emits a performance drift warning.", + ) + group.addoption( + "--perf-history-min-delta", + action="store", + default=None, + help="Minimum absolute slowdown in seconds before warning.", + ) + + +def pytest_configure(config) -> None: + global _PERF_MONITOR + enabled = not config.getoption("--perf-history-disable") and os.environ.get( + "KONTEXTUAL_PERF_MONITOR", "1" + ).lower() not in {"0", "false", "no"} + monitor = PerformanceMonitor(config, enabled=enabled) + config._kontextual_perf_monitor = monitor + _PERF_MONITOR = monitor + if enabled: + monitor.start() + + +def pytest_runtest_logreport(report) -> None: + monitor = _PERF_MONITOR + if monitor is not None and monitor.enabled: + monitor.record_report(report) + + +def pytest_sessionfinish(session, exitstatus) -> None: + monitor = getattr(session.config, "_kontextual_perf_monitor", None) + if monitor is not None and monitor.enabled: + monitor.finish(exitstatus) + + +def pytest_terminal_summary(terminalreporter, exitstatus, config) -> None: + monitor = getattr(config, "_kontextual_perf_monitor", None) + if monitor is None or not monitor.enabled or monitor.summary is None: + return + summary = monitor.summary + terminalreporter.write_sep("-", "kontextual performance") + terminalreporter.write_line( + ( + f"history={summary['history_path']} " + f"run={summary['duration_seconds']:.3f}s " + f"tests={summary['counts']['total']} " + f"executed={summary['counts']['executed']} " + f"window={summary['window_size']}" + ) + ) + for warning in summary.get("warnings", []): + terminalreporter.write_line(f"WARNING: {warning}", yellow=True) + + +class PerformanceMonitor: + def __init__(self, config, *, enabled: bool) -> None: + self.config = config + self.enabled = enabled + self.history_path = _history_path(config) + self.window_size = _positive_int( + config.getoption("--perf-history-window") + or os.environ.get("KONTEXTUAL_PERF_WINDOW"), + DEFAULT_WINDOW_SIZE, + ) + self.daily_retention_days = _positive_int( + os.environ.get("KONTEXTUAL_PERF_DAILY_RETENTION_DAYS"), + DEFAULT_DAILY_RETENTION_DAYS, + ) + self.drift_ratio = _positive_float( + config.getoption("--perf-history-drift-ratio") + or os.environ.get("KONTEXTUAL_PERF_DRIFT_RATIO"), + DEFAULT_DRIFT_RATIO, + ) + self.min_delta_seconds = _positive_float( + config.getoption("--perf-history-min-delta") + or os.environ.get("KONTEXTUAL_PERF_MIN_DELTA_SECONDS"), + DEFAULT_MIN_DELTA_SECONDS, + ) + self.started_at_perf = 0.0 + self.started_at = "" + self.start_resources: dict[str, Any] = {} + self.history_before: dict[str, Any] = {} + self.previous_average: dict[str, Any] | None = None + self.test_durations: dict[str, float] = defaultdict(float) + self.outcomes: dict[str, str] = {} + self.summary: dict[str, Any] | None = None + + def start(self) -> None: + self.started_at_perf = time.perf_counter() + self.started_at = _utc_now() + self.start_resources = _resource_snapshot() + self.history_before = _load_history(self.history_path) + self.previous_average = self.history_before.get("average_of_averages") + + def record_report(self, report) -> None: + nodeid = report.nodeid + duration = max(float(getattr(report, "duration", 0.0) or 0.0), 0.0) + self.test_durations[nodeid] += duration + if report.outcome == "failed": + self.outcomes[nodeid] = "failed" + elif report.outcome == "skipped" and nodeid not in self.outcomes: + self.outcomes[nodeid] = "skipped" + elif report.when == "call" and nodeid not in self.outcomes: + self.outcomes[nodeid] = report.outcome + + def finish(self, exitstatus: int) -> None: + finished_at = _utc_now() + end_resources = _resource_snapshot() + duration_seconds = time.perf_counter() - self.started_at_perf + tests = self._test_records() + run = { + "id": _run_id(self.started_at), + "started_at": self.started_at, + "finished_at": finished_at, + "duration_seconds": round(duration_seconds, 6), + "exitstatus": int(exitstatus), + "counts": _counts(tests), + "environment": _environment_fingerprint(), + "resources": { + "start": self.start_resources, + "end": end_resources, + "process_delta": _process_delta(self.start_resources, end_resources), + }, + "tests": tests, + } + warnings = _drift_warnings( + run, + self.previous_average, + drift_ratio=self.drift_ratio, + min_delta_seconds=self.min_delta_seconds, + ) + history = _update_history( + self.history_before, + run, + window_size=self.window_size, + daily_retention_days=self.daily_retention_days, + drift_ratio=self.drift_ratio, + min_delta_seconds=self.min_delta_seconds, + ) + _save_history(self.history_path, history) + self.summary = { + "history_path": str(self.history_path), + "duration_seconds": duration_seconds, + "counts": run["counts"], + "window_size": self.window_size, + "warnings": warnings, + } + + def _test_records(self) -> dict[str, dict[str, Any]]: + records: dict[str, dict[str, Any]] = {} + for nodeid in sorted(self.test_durations): + outcome = self.outcomes.get(nodeid, "unknown") + records[nodeid] = { + "duration_seconds": round(self.test_durations[nodeid], 6), + "outcome": outcome, + } + return records + + +def _history_path(config) -> Path: + configured = config.getoption("--perf-history-path") or os.environ.get("KONTEXTUAL_PERF_HISTORY") + if configured: + return Path(configured).expanduser().resolve() + return Path(config.rootpath) / ".pytest_cache" / "kontextual" / "performance-history.json" + + +def _load_history(path: Path) -> dict[str, Any]: + if not path.exists(): + return {"schema_version": HISTORY_SCHEMA_VERSION, "runs": [], "averages": [], "daily": []} + try: + data = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return {"schema_version": HISTORY_SCHEMA_VERSION, "runs": [], "averages": [], "daily": []} + if data.get("schema_version") != HISTORY_SCHEMA_VERSION: + return {"schema_version": HISTORY_SCHEMA_VERSION, "runs": [], "averages": [], "daily": []} + data.setdefault("runs", []) + data.setdefault("averages", []) + data.setdefault("daily", []) + return data + + +def _save_history(path: Path, history: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = path.with_suffix(path.suffix + ".tmp") + tmp_path.write_text(json.dumps(history, indent=2, sort_keys=True) + "\n", encoding="utf-8") + tmp_path.replace(path) + + +def _update_history( + history: dict[str, Any], + run: dict[str, Any], + *, + window_size: int, + daily_retention_days: int, + drift_ratio: float, + min_delta_seconds: float, +) -> dict[str, Any]: + runs = (history.get("runs") or []) + [run] + runs = runs[-window_size:] + window_average = _average_runs(runs, run["finished_at"], window_size) + averages = (history.get("averages") or []) + [window_average] + averages = averages[-window_size:] + average_of_averages = _average_average_entries(averages, run["finished_at"], window_size) + daily = _update_daily(history.get("daily") or [], window_average, daily_retention_days) + return { + "schema_version": HISTORY_SCHEMA_VERSION, + "updated_at": run["finished_at"], + "config": { + "window_size": window_size, + "daily_retention_days": daily_retention_days, + "drift_ratio": drift_ratio, + "min_delta_seconds": min_delta_seconds, + }, + "runs": runs, + "averages": averages, + "average_of_averages": average_of_averages, + "daily": daily, + } + + +def _average_runs(runs: list[dict[str, Any]], captured_at: str, window_size: int) -> dict[str, Any]: + return { + "captured_at": captured_at, + "date": captured_at[:10], + "window_size": window_size, + "sample_count": len(runs), + "metrics": _average_metric_dicts([_run_metrics(run) for run in runs]), + "tests": _average_tests(runs), + } + + +def _average_average_entries( + averages: list[dict[str, Any]], captured_at: str, window_size: int +) -> dict[str, Any]: + return { + "captured_at": captured_at, + "date": captured_at[:10], + "window_size": window_size, + "sample_count": len(averages), + "metrics": _average_metric_dicts([entry.get("metrics", {}) for entry in averages]), + "tests": _average_test_average_entries(averages), + } + + +def _update_daily( + daily_entries: list[dict[str, Any]], + window_average: dict[str, Any], + daily_retention_days: int, +) -> list[dict[str, Any]]: + date = window_average["date"] + by_date = {entry["date"]: entry for entry in daily_entries if "date" in entry} + previous = by_date.get(date) + if previous: + sample_count = int(previous.get("sample_count", 0)) + 1 + by_date[date] = { + "date": date, + "updated_at": window_average["captured_at"], + "sample_count": sample_count, + "metrics": _merge_running_average( + previous.get("metrics", {}), window_average.get("metrics", {}), sample_count + ), + "tests": _merge_test_running_average( + previous.get("tests", {}), window_average.get("tests", {}), sample_count + ), + } + else: + by_date[date] = { + "date": date, + "updated_at": window_average["captured_at"], + "sample_count": 1, + "metrics": window_average.get("metrics", {}), + "tests": window_average.get("tests", {}), + } + return [by_date[key] for key in sorted(by_date)[-daily_retention_days:]] + + +def _run_metrics(run: dict[str, Any]) -> dict[str, float]: + counts = run.get("counts", {}) + metrics = { + "run.duration_seconds": _as_float(run.get("duration_seconds")), + "run.total_tests": _as_float(counts.get("total")), + "run.executed_tests": _as_float(counts.get("executed")), + "run.passed_tests": _as_float(counts.get("passed")), + "run.failed_tests": _as_float(counts.get("failed")), + "run.skipped_tests": _as_float(counts.get("skipped")), + "resource.process_user_seconds": _as_float( + run.get("resources", {}).get("process_delta", {}).get("user_seconds") + ), + "resource.process_system_seconds": _as_float( + run.get("resources", {}).get("process_delta", {}).get("system_seconds") + ), + "resource.end_process_max_rss_mib": _as_float( + run.get("resources", {}).get("end", {}).get("process", {}).get("max_rss_mib") + ), + } + for label in ("start", "end"): + snapshot = run.get("resources", {}).get(label, {}) + metrics[f"resource.{label}_load_1"] = _as_float(snapshot.get("load", {}).get("load_1")) + metrics[f"resource.{label}_load_5"] = _as_float(snapshot.get("load", {}).get("load_5")) + metrics[f"resource.{label}_load_15"] = _as_float(snapshot.get("load", {}).get("load_15")) + metrics[f"resource.{label}_load_1_per_cpu"] = _as_float( + snapshot.get("load", {}).get("load_1_per_cpu") + ) + metrics[f"resource.{label}_memory_available_ratio"] = _as_float( + snapshot.get("memory", {}).get("available_ratio") + ) + return {key: value for key, value in metrics.items() if value is not None} + + +def _average_tests(runs: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: + samples: dict[str, list[float]] = defaultdict(list) + outcomes: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) + for run in runs: + for nodeid, test in (run.get("tests") or {}).items(): + outcome = str(test.get("outcome", "unknown")) + outcomes[nodeid][outcome] += 1 + if outcome != "skipped": + duration = _as_float(test.get("duration_seconds")) + if duration is not None: + samples[nodeid].append(duration) + result = {} + for nodeid in sorted(outcomes): + durations = samples.get(nodeid, []) + if not durations: + continue + result[nodeid] = { + "duration_seconds": round(statistics.fmean(durations), 6), + "samples": len(durations), + "outcomes": dict(sorted(outcomes[nodeid].items())), + } + return result + + +def _average_test_average_entries(averages: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: + samples: dict[str, list[float]] = defaultdict(list) + for entry in averages: + for nodeid, test in (entry.get("tests") or {}).items(): + duration = _as_float(test.get("duration_seconds")) + if duration is not None: + samples[nodeid].append(duration) + return { + nodeid: { + "duration_seconds": round(statistics.fmean(durations), 6), + "samples": len(durations), + } + for nodeid, durations in sorted(samples.items()) + } + + +def _average_metric_dicts(metric_dicts: list[dict[str, float]]) -> dict[str, float]: + samples: dict[str, list[float]] = defaultdict(list) + for metrics in metric_dicts: + for key, value in metrics.items(): + if value is not None: + samples[key].append(float(value)) + return { + key: round(statistics.fmean(values), 6) + for key, values in sorted(samples.items()) + if values + } + + +def _merge_running_average( + previous: dict[str, float], + current: dict[str, float], + sample_count: int, +) -> dict[str, float]: + keys = set(previous) | set(current) + merged = {} + for key in sorted(keys): + if key in previous and key in current: + merged[key] = round(((float(previous[key]) * (sample_count - 1)) + float(current[key])) / sample_count, 6) + elif key in current: + merged[key] = round(float(current[key]), 6) + else: + merged[key] = round(float(previous[key]), 6) + return merged + + +def _merge_test_running_average( + previous: dict[str, dict[str, Any]], + current: dict[str, dict[str, Any]], + sample_count: int, +) -> dict[str, dict[str, Any]]: + merged = {} + for nodeid in sorted(set(previous) | set(current)): + prev_duration = _as_float(previous.get(nodeid, {}).get("duration_seconds")) + curr_duration = _as_float(current.get(nodeid, {}).get("duration_seconds")) + if prev_duration is not None and curr_duration is not None: + duration = ((prev_duration * (sample_count - 1)) + curr_duration) / sample_count + samples = int(previous.get(nodeid, {}).get("samples", 0)) + int( + current.get(nodeid, {}).get("samples", 0) + ) + elif curr_duration is not None: + duration = curr_duration + samples = int(current.get(nodeid, {}).get("samples", 0)) + else: + duration = prev_duration + samples = int(previous.get(nodeid, {}).get("samples", 0)) + merged[nodeid] = { + "duration_seconds": round(duration, 6) if duration is not None else None, + "samples": samples, + } + return merged + + +def _drift_warnings( + run: dict[str, Any], + previous_average: dict[str, Any] | None, + *, + drift_ratio: float, + min_delta_seconds: float, +) -> list[str]: + if not previous_average: + return [] + warnings = [] + metrics = previous_average.get("metrics", {}) + previous_duration = _as_float(metrics.get("run.duration_seconds")) + current_duration = _as_float(run.get("duration_seconds")) + previous_executed = _as_float(metrics.get("run.executed_tests")) + current_executed = _as_float(run.get("counts", {}).get("executed")) + if ( + previous_duration is not None + and current_duration is not None + and _similar_sample_size(previous_executed, current_executed) + and _negative_drift(current_duration, previous_duration, drift_ratio, min_delta_seconds) + ): + warnings.append(_format_drift("run duration", current_duration, previous_duration, "s")) + + test_warnings = [] + baseline_tests = previous_average.get("tests", {}) + for nodeid, test in (run.get("tests") or {}).items(): + if test.get("outcome") == "skipped": + continue + baseline_duration = _as_float(baseline_tests.get(nodeid, {}).get("duration_seconds")) + current_test_duration = _as_float(test.get("duration_seconds")) + if ( + baseline_duration is not None + and current_test_duration is not None + and _negative_drift(current_test_duration, baseline_duration, drift_ratio, min_delta_seconds) + ): + test_warnings.append((current_test_duration - baseline_duration, nodeid, current_test_duration, baseline_duration)) + for _delta, nodeid, current_test_duration, baseline_duration in sorted(test_warnings, reverse=True)[:5]: + warnings.append(_format_drift(f"test {nodeid}", current_test_duration, baseline_duration, "s")) + + current_load = _as_float(run.get("resources", {}).get("start", {}).get("load", {}).get("load_1_per_cpu")) + baseline_load = _as_float(metrics.get("resource.start_load_1_per_cpu")) + if ( + current_load is not None + and baseline_load is not None + and current_load > max(baseline_load * (1.0 + drift_ratio), baseline_load + 0.2) + ): + warnings.append(_format_drift("start load per CPU", current_load, baseline_load, "")) + + current_memory = _as_float( + run.get("resources", {}).get("start", {}).get("memory", {}).get("available_ratio") + ) + baseline_memory = _as_float(metrics.get("resource.start_memory_available_ratio")) + if ( + current_memory is not None + and baseline_memory is not None + and current_memory < min(baseline_memory * (1.0 - drift_ratio), baseline_memory - 0.05) + ): + warnings.append( + f"start available memory ratio {current_memory:.3f} is below historical {baseline_memory:.3f}" + ) + return warnings + + +def _counts(tests: dict[str, dict[str, Any]]) -> dict[str, int]: + counts = {"total": len(tests), "executed": 0, "passed": 0, "failed": 0, "skipped": 0, "unknown": 0} + for test in tests.values(): + outcome = test.get("outcome") + if outcome == "passed": + counts["passed"] += 1 + counts["executed"] += 1 + elif outcome == "failed": + counts["failed"] += 1 + counts["executed"] += 1 + elif outcome == "skipped": + counts["skipped"] += 1 + else: + counts["unknown"] += 1 + counts["executed"] += 1 + return counts + + +def _resource_snapshot() -> dict[str, Any]: + return { + "captured_at": _utc_now(), + "cpu": {"logical_count": os.cpu_count() or 1}, + "load": _load_snapshot(), + "memory": _memory_snapshot(), + "process": _process_snapshot(), + } + + +def _environment_fingerprint() -> dict[str, Any]: + uname = platform.uname() + return { + "python": { + "implementation": platform.python_implementation(), + "version": platform.python_version(), + }, + "platform": { + "system": uname.system, + "release": uname.release, + "machine": uname.machine, + }, + "pytest_root": str(Path.cwd()), + "cpu_logical_count": os.cpu_count() or 1, + } + + +def _load_snapshot() -> dict[str, float | None]: + cpu_count = os.cpu_count() or 1 + if not hasattr(os, "getloadavg"): + return { + "load_1": None, + "load_5": None, + "load_15": None, + "load_1_per_cpu": None, + "load_5_per_cpu": None, + "load_15_per_cpu": None, + } + load_1, load_5, load_15 = os.getloadavg() + return { + "load_1": round(load_1, 6), + "load_5": round(load_5, 6), + "load_15": round(load_15, 6), + "load_1_per_cpu": round(load_1 / cpu_count, 6), + "load_5_per_cpu": round(load_5 / cpu_count, 6), + "load_15_per_cpu": round(load_15 / cpu_count, 6), + } + + +def _memory_snapshot() -> dict[str, float | None]: + meminfo = Path("/proc/meminfo") + if not meminfo.exists(): + return {"total_mib": None, "available_mib": None, "available_ratio": None} + values = {} + for line in meminfo.read_text(encoding="utf-8").splitlines(): + if ":" not in line: + continue + key, raw_value = line.split(":", 1) + parts = raw_value.strip().split() + if not parts: + continue + try: + values[key] = float(parts[0]) + except ValueError: + continue + total_kib = values.get("MemTotal") + available_kib = values.get("MemAvailable") + if not total_kib or available_kib is None: + return {"total_mib": None, "available_mib": None, "available_ratio": None} + return { + "total_mib": round(total_kib / 1024.0, 3), + "available_mib": round(available_kib / 1024.0, 3), + "available_ratio": round(available_kib / total_kib, 6), + } + + +def _process_snapshot() -> dict[str, float]: + if resource is None: + return { + "user_seconds": 0.0, + "system_seconds": 0.0, + "max_rss_mib": 0.0, + } + usage = resource.getrusage(resource.RUSAGE_SELF) + return { + "user_seconds": round(float(usage.ru_utime), 6), + "system_seconds": round(float(usage.ru_stime), 6), + "max_rss_mib": round(_rss_to_mib(float(usage.ru_maxrss)), 6), + } + + +def _process_delta(start: dict[str, Any], end: dict[str, Any]) -> dict[str, float | None]: + start_process = start.get("process", {}) + end_process = end.get("process", {}) + user_start = _as_float(start_process.get("user_seconds")) + user_end = _as_float(end_process.get("user_seconds")) + system_start = _as_float(start_process.get("system_seconds")) + system_end = _as_float(end_process.get("system_seconds")) + return { + "user_seconds": round(user_end - user_start, 6) + if user_start is not None and user_end is not None + else None, + "system_seconds": round(system_end - system_start, 6) + if system_start is not None and system_end is not None + else None, + } + + +def _rss_to_mib(value: float) -> float: + if sys.platform == "darwin": + return value / (1024.0 * 1024.0) + return value / 1024.0 + + +def _negative_drift(current: float, baseline: float, ratio: float, min_delta: float) -> bool: + return baseline > 0 and current - baseline >= min_delta and current >= baseline * (1.0 + ratio) + + +def _similar_sample_size(previous: float | None, current: float | None) -> bool: + if previous is None or current is None: + return True + return abs(current - previous) <= max(2.0, previous * 0.1) + + +def _format_drift(name: str, current: float, baseline: float, unit: str) -> str: + ratio = ((current / baseline) - 1.0) * 100.0 if baseline else 0.0 + suffix = unit if unit else "" + return f"{name} {current:.3f}{suffix} is {ratio:.1f}% above historical {baseline:.3f}{suffix}" + + +def _run_id(started_at: str) -> str: + return started_at.replace("-", "").replace(":", "").replace(".", "").replace("+", "Z") + + +def _utc_now() -> str: + return datetime.now(UTC).isoformat(timespec="microseconds").replace("+00:00", "Z") + + +def _positive_int(value: Any, default: int) -> int: + try: + parsed = int(value) + except (TypeError, ValueError): + return default + return parsed if parsed > 0 else default + + +def _positive_float(value: Any, default: float) -> float: + try: + parsed = float(value) + except (TypeError, ValueError): + return default + return parsed if parsed > 0 else default + + +def _as_float(value: Any) -> float | None: + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError): + return None