Files
kontextual-engine/tests/conftest.py
2026-05-05 20:58:25 +02:00

698 lines
25 KiB
Python

from __future__ import annotations
import json
import os
import platform
import statistics
import sys
import time
from collections import defaultdict
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
try:
import resource
except ImportError: # pragma: no cover - Windows fallback
resource = None
HISTORY_SCHEMA_VERSION = 2
DEFAULT_WINDOW_SIZE = 20
DEFAULT_DAILY_RETENTION_DAYS = 730
DEFAULT_DRIFT_RATIO = 0.35
DEFAULT_MIN_DELTA_SECONDS = 0.05
_PERF_MONITOR: PerformanceMonitor | None = None
def pytest_addoption(parser) -> None:
group = parser.getgroup("kontextual performance")
group.addoption(
"--perf-history-disable",
action="store_true",
help="Disable compact performance history capture for this pytest run.",
)
group.addoption(
"--perf-history-path",
action="store",
default=None,
help="Write compact performance history to this JSON file.",
)
group.addoption(
"--perf-history-window",
action="store",
default=None,
help="Number of recent runs and rolling averages to retain.",
)
group.addoption(
"--perf-history-drift-ratio",
action="store",
default=None,
help="Relative slowdown ratio that emits a performance drift warning.",
)
group.addoption(
"--perf-history-min-delta",
action="store",
default=None,
help="Minimum absolute slowdown in seconds before warning.",
)
def pytest_configure(config) -> None:
global _PERF_MONITOR
enabled = not config.getoption("--perf-history-disable") and os.environ.get(
"KONTEXTUAL_PERF_MONITOR", "1"
).lower() not in {"0", "false", "no"}
monitor = PerformanceMonitor(config, enabled=enabled)
config._kontextual_perf_monitor = monitor
_PERF_MONITOR = monitor
if enabled:
monitor.start()
def pytest_runtest_logreport(report) -> None:
monitor = _PERF_MONITOR
if monitor is not None and monitor.enabled:
monitor.record_report(report)
def pytest_sessionfinish(session, exitstatus) -> None:
monitor = getattr(session.config, "_kontextual_perf_monitor", None)
if monitor is not None and monitor.enabled:
monitor.finish(exitstatus)
def pytest_terminal_summary(terminalreporter, exitstatus, config) -> None:
monitor = getattr(config, "_kontextual_perf_monitor", None)
if monitor is None or not monitor.enabled or monitor.summary is None:
return
summary = monitor.summary
terminalreporter.write_sep("-", "kontextual performance")
terminalreporter.write_line(
(
f"history={summary['history_path']} "
f"run={summary['duration_seconds']:.3f}s "
f"tests={summary['counts']['total']} "
f"executed={summary['counts']['executed']} "
f"window={summary['window_size']}"
)
)
for warning in summary.get("warnings", []):
terminalreporter.write_line(f"WARNING: {warning}", yellow=True)
class PerformanceMonitor:
def __init__(self, config, *, enabled: bool) -> None:
self.config = config
self.enabled = enabled
self.history_path = _history_path(config)
self.window_size = _positive_int(
config.getoption("--perf-history-window")
or os.environ.get("KONTEXTUAL_PERF_WINDOW"),
DEFAULT_WINDOW_SIZE,
)
self.daily_retention_days = _positive_int(
os.environ.get("KONTEXTUAL_PERF_DAILY_RETENTION_DAYS"),
DEFAULT_DAILY_RETENTION_DAYS,
)
self.drift_ratio = _positive_float(
config.getoption("--perf-history-drift-ratio")
or os.environ.get("KONTEXTUAL_PERF_DRIFT_RATIO"),
DEFAULT_DRIFT_RATIO,
)
self.min_delta_seconds = _positive_float(
config.getoption("--perf-history-min-delta")
or os.environ.get("KONTEXTUAL_PERF_MIN_DELTA_SECONDS"),
DEFAULT_MIN_DELTA_SECONDS,
)
self.started_at_perf = 0.0
self.started_at = ""
self.start_resources: dict[str, Any] = {}
self.history_before: dict[str, Any] = {}
self.previous_average: dict[str, Any] | None = None
self.test_durations: dict[str, float] = defaultdict(float)
self.outcomes: dict[str, str] = {}
self.summary: dict[str, Any] | None = None
def start(self) -> None:
self.started_at_perf = time.perf_counter()
self.started_at = _utc_now()
self.start_resources = _resource_snapshot()
self.history_before = _load_history(self.history_path)
self.previous_average = self.history_before.get("average_of_averages")
def record_report(self, report) -> None:
nodeid = report.nodeid
duration = max(float(getattr(report, "duration", 0.0) or 0.0), 0.0)
self.test_durations[nodeid] += duration
if report.outcome == "failed":
self.outcomes[nodeid] = "failed"
elif report.outcome == "skipped" and nodeid not in self.outcomes:
self.outcomes[nodeid] = "skipped"
elif report.when == "call" and nodeid not in self.outcomes:
self.outcomes[nodeid] = report.outcome
def finish(self, exitstatus: int) -> None:
finished_at = _utc_now()
end_resources = _resource_snapshot()
duration_seconds = time.perf_counter() - self.started_at_perf
tests = self._test_records()
run = {
"id": _run_id(self.started_at),
"started_at": self.started_at,
"finished_at": finished_at,
"duration_seconds": round(duration_seconds, 6),
"exitstatus": int(exitstatus),
"counts": _counts(tests),
"environment": _environment_fingerprint(),
"resources": {
"start": self.start_resources,
"end": end_resources,
"process_delta": _process_delta(self.start_resources, end_resources),
},
"tests": tests,
}
warnings = _drift_warnings(
run,
self.previous_average,
drift_ratio=self.drift_ratio,
min_delta_seconds=self.min_delta_seconds,
)
history = _update_history(
self.history_before,
run,
window_size=self.window_size,
daily_retention_days=self.daily_retention_days,
drift_ratio=self.drift_ratio,
min_delta_seconds=self.min_delta_seconds,
)
_save_history(self.history_path, history)
self.summary = {
"history_path": str(self.history_path),
"duration_seconds": duration_seconds,
"counts": run["counts"],
"window_size": self.window_size,
"warnings": warnings,
}
def _test_records(self) -> dict[str, dict[str, Any]]:
records: dict[str, dict[str, Any]] = {}
for nodeid in sorted(self.test_durations):
outcome = self.outcomes.get(nodeid, "unknown")
records[nodeid] = {
"duration_seconds": round(self.test_durations[nodeid], 6),
"outcome": outcome,
}
return records
def _history_path(config) -> Path:
configured = config.getoption("--perf-history-path") or os.environ.get("KONTEXTUAL_PERF_HISTORY")
if configured:
return Path(configured).expanduser().resolve()
return Path(config.rootpath) / ".pytest_cache" / "kontextual" / "performance-history.json"
def _load_history(path: Path) -> dict[str, Any]:
if not path.exists():
return {"schema_version": HISTORY_SCHEMA_VERSION, "runs": [], "averages": [], "daily": []}
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {"schema_version": HISTORY_SCHEMA_VERSION, "runs": [], "averages": [], "daily": []}
if data.get("schema_version") != HISTORY_SCHEMA_VERSION:
return {"schema_version": HISTORY_SCHEMA_VERSION, "runs": [], "averages": [], "daily": []}
data.setdefault("runs", [])
data.setdefault("averages", [])
data.setdefault("daily", [])
return data
def _save_history(path: Path, history: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = path.with_suffix(path.suffix + ".tmp")
tmp_path.write_text(json.dumps(history, indent=2, sort_keys=True) + "\n", encoding="utf-8")
tmp_path.replace(path)
def _update_history(
history: dict[str, Any],
run: dict[str, Any],
*,
window_size: int,
daily_retention_days: int,
drift_ratio: float,
min_delta_seconds: float,
) -> dict[str, Any]:
runs = (history.get("runs") or []) + [run]
runs = runs[-window_size:]
window_average = _average_runs(runs, run["finished_at"], window_size)
averages = (history.get("averages") or []) + [window_average]
averages = averages[-window_size:]
average_of_averages = _average_average_entries(averages, run["finished_at"], window_size)
daily = _update_daily(history.get("daily") or [], window_average, daily_retention_days)
return {
"schema_version": HISTORY_SCHEMA_VERSION,
"updated_at": run["finished_at"],
"config": {
"window_size": window_size,
"daily_retention_days": daily_retention_days,
"drift_ratio": drift_ratio,
"min_delta_seconds": min_delta_seconds,
},
"runs": runs,
"averages": averages,
"average_of_averages": average_of_averages,
"daily": daily,
}
def _average_runs(runs: list[dict[str, Any]], captured_at: str, window_size: int) -> dict[str, Any]:
return {
"captured_at": captured_at,
"date": captured_at[:10],
"window_size": window_size,
"sample_count": len(runs),
"metrics": _average_metric_dicts([_run_metrics(run) for run in runs]),
"tests": _average_tests(runs),
}
def _average_average_entries(
averages: list[dict[str, Any]], captured_at: str, window_size: int
) -> dict[str, Any]:
return {
"captured_at": captured_at,
"date": captured_at[:10],
"window_size": window_size,
"sample_count": len(averages),
"metrics": _average_metric_dicts([entry.get("metrics", {}) for entry in averages]),
"tests": _average_test_average_entries(averages),
}
def _update_daily(
daily_entries: list[dict[str, Any]],
window_average: dict[str, Any],
daily_retention_days: int,
) -> list[dict[str, Any]]:
date = window_average["date"]
by_date = {entry["date"]: entry for entry in daily_entries if "date" in entry}
previous = by_date.get(date)
if previous:
sample_count = int(previous.get("sample_count", 0)) + 1
by_date[date] = {
"date": date,
"updated_at": window_average["captured_at"],
"sample_count": sample_count,
"metrics": _merge_running_average(
previous.get("metrics", {}), window_average.get("metrics", {}), sample_count
),
"tests": _merge_test_running_average(
previous.get("tests", {}), window_average.get("tests", {}), sample_count
),
}
else:
by_date[date] = {
"date": date,
"updated_at": window_average["captured_at"],
"sample_count": 1,
"metrics": window_average.get("metrics", {}),
"tests": window_average.get("tests", {}),
}
return [by_date[key] for key in sorted(by_date)[-daily_retention_days:]]
def _run_metrics(run: dict[str, Any]) -> dict[str, float]:
counts = run.get("counts", {})
metrics = {
"run.duration_seconds": _as_float(run.get("duration_seconds")),
"run.total_tests": _as_float(counts.get("total")),
"run.executed_tests": _as_float(counts.get("executed")),
"run.passed_tests": _as_float(counts.get("passed")),
"run.failed_tests": _as_float(counts.get("failed")),
"run.skipped_tests": _as_float(counts.get("skipped")),
"resource.process_user_seconds": _as_float(
run.get("resources", {}).get("process_delta", {}).get("user_seconds")
),
"resource.process_system_seconds": _as_float(
run.get("resources", {}).get("process_delta", {}).get("system_seconds")
),
"resource.end_process_max_rss_mib": _as_float(
run.get("resources", {}).get("end", {}).get("process", {}).get("max_rss_mib")
),
}
for label in ("start", "end"):
snapshot = run.get("resources", {}).get(label, {})
metrics[f"resource.{label}_load_1"] = _as_float(snapshot.get("load", {}).get("load_1"))
metrics[f"resource.{label}_load_5"] = _as_float(snapshot.get("load", {}).get("load_5"))
metrics[f"resource.{label}_load_15"] = _as_float(snapshot.get("load", {}).get("load_15"))
metrics[f"resource.{label}_load_1_per_cpu"] = _as_float(
snapshot.get("load", {}).get("load_1_per_cpu")
)
metrics[f"resource.{label}_memory_available_ratio"] = _as_float(
snapshot.get("memory", {}).get("available_ratio")
)
return {key: value for key, value in metrics.items() if value is not None}
def _average_tests(runs: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
samples: dict[str, list[float]] = defaultdict(list)
outcomes: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for run in runs:
for nodeid, test in (run.get("tests") or {}).items():
outcome = str(test.get("outcome", "unknown"))
outcomes[nodeid][outcome] += 1
if outcome != "skipped":
duration = _as_float(test.get("duration_seconds"))
if duration is not None:
samples[nodeid].append(duration)
result = {}
for nodeid in sorted(outcomes):
durations = samples.get(nodeid, [])
if not durations:
continue
result[nodeid] = {
"duration_seconds": round(statistics.fmean(durations), 6),
"samples": len(durations),
"outcomes": dict(sorted(outcomes[nodeid].items())),
}
return result
def _average_test_average_entries(averages: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
samples: dict[str, list[float]] = defaultdict(list)
for entry in averages:
for nodeid, test in (entry.get("tests") or {}).items():
duration = _as_float(test.get("duration_seconds"))
if duration is not None:
samples[nodeid].append(duration)
return {
nodeid: {
"duration_seconds": round(statistics.fmean(durations), 6),
"samples": len(durations),
}
for nodeid, durations in sorted(samples.items())
}
def _average_metric_dicts(metric_dicts: list[dict[str, float]]) -> dict[str, float]:
samples: dict[str, list[float]] = defaultdict(list)
for metrics in metric_dicts:
for key, value in metrics.items():
if value is not None:
samples[key].append(float(value))
return {
key: round(statistics.fmean(values), 6)
for key, values in sorted(samples.items())
if values
}
def _merge_running_average(
previous: dict[str, float],
current: dict[str, float],
sample_count: int,
) -> dict[str, float]:
keys = set(previous) | set(current)
merged = {}
for key in sorted(keys):
if key in previous and key in current:
merged[key] = round(((float(previous[key]) * (sample_count - 1)) + float(current[key])) / sample_count, 6)
elif key in current:
merged[key] = round(float(current[key]), 6)
else:
merged[key] = round(float(previous[key]), 6)
return merged
def _merge_test_running_average(
previous: dict[str, dict[str, Any]],
current: dict[str, dict[str, Any]],
sample_count: int,
) -> dict[str, dict[str, Any]]:
merged = {}
for nodeid in sorted(set(previous) | set(current)):
prev_duration = _as_float(previous.get(nodeid, {}).get("duration_seconds"))
curr_duration = _as_float(current.get(nodeid, {}).get("duration_seconds"))
if prev_duration is not None and curr_duration is not None:
duration = ((prev_duration * (sample_count - 1)) + curr_duration) / sample_count
samples = int(previous.get(nodeid, {}).get("samples", 0)) + int(
current.get(nodeid, {}).get("samples", 0)
)
elif curr_duration is not None:
duration = curr_duration
samples = int(current.get(nodeid, {}).get("samples", 0))
else:
duration = prev_duration
samples = int(previous.get(nodeid, {}).get("samples", 0))
merged[nodeid] = {
"duration_seconds": round(duration, 6) if duration is not None else None,
"samples": samples,
}
return merged
def _drift_warnings(
run: dict[str, Any],
previous_average: dict[str, Any] | None,
*,
drift_ratio: float,
min_delta_seconds: float,
) -> list[str]:
if not previous_average:
return []
warnings = []
metrics = previous_average.get("metrics", {})
previous_duration = _as_float(metrics.get("run.duration_seconds"))
current_duration = _as_float(run.get("duration_seconds"))
previous_executed = _as_float(metrics.get("run.executed_tests"))
current_executed = _as_float(run.get("counts", {}).get("executed"))
if (
previous_duration is not None
and current_duration is not None
and _similar_sample_size(previous_executed, current_executed)
and _negative_drift(current_duration, previous_duration, drift_ratio, min_delta_seconds)
):
warnings.append(_format_drift("run duration", current_duration, previous_duration, "s"))
test_warnings = []
baseline_tests = previous_average.get("tests", {})
for nodeid, test in (run.get("tests") or {}).items():
if test.get("outcome") == "skipped":
continue
baseline_duration = _as_float(baseline_tests.get(nodeid, {}).get("duration_seconds"))
current_test_duration = _as_float(test.get("duration_seconds"))
if (
baseline_duration is not None
and current_test_duration is not None
and _negative_drift(current_test_duration, baseline_duration, drift_ratio, min_delta_seconds)
):
test_warnings.append((current_test_duration - baseline_duration, nodeid, current_test_duration, baseline_duration))
for _delta, nodeid, current_test_duration, baseline_duration in sorted(test_warnings, reverse=True)[:5]:
warnings.append(_format_drift(f"test {nodeid}", current_test_duration, baseline_duration, "s"))
current_load = _as_float(run.get("resources", {}).get("start", {}).get("load", {}).get("load_1_per_cpu"))
baseline_load = _as_float(metrics.get("resource.start_load_1_per_cpu"))
if (
current_load is not None
and baseline_load is not None
and current_load > max(baseline_load * (1.0 + drift_ratio), baseline_load + 0.2)
):
warnings.append(_format_drift("start load per CPU", current_load, baseline_load, ""))
current_memory = _as_float(
run.get("resources", {}).get("start", {}).get("memory", {}).get("available_ratio")
)
baseline_memory = _as_float(metrics.get("resource.start_memory_available_ratio"))
if (
current_memory is not None
and baseline_memory is not None
and current_memory < min(baseline_memory * (1.0 - drift_ratio), baseline_memory - 0.05)
):
warnings.append(
f"start available memory ratio {current_memory:.3f} is below historical {baseline_memory:.3f}"
)
return warnings
def _counts(tests: dict[str, dict[str, Any]]) -> dict[str, int]:
counts = {"total": len(tests), "executed": 0, "passed": 0, "failed": 0, "skipped": 0, "unknown": 0}
for test in tests.values():
outcome = test.get("outcome")
if outcome == "passed":
counts["passed"] += 1
counts["executed"] += 1
elif outcome == "failed":
counts["failed"] += 1
counts["executed"] += 1
elif outcome == "skipped":
counts["skipped"] += 1
else:
counts["unknown"] += 1
counts["executed"] += 1
return counts
def _resource_snapshot() -> dict[str, Any]:
return {
"captured_at": _utc_now(),
"cpu": {"logical_count": os.cpu_count() or 1},
"load": _load_snapshot(),
"memory": _memory_snapshot(),
"process": _process_snapshot(),
}
def _environment_fingerprint() -> dict[str, Any]:
uname = platform.uname()
return {
"python": {
"implementation": platform.python_implementation(),
"version": platform.python_version(),
},
"platform": {
"system": uname.system,
"release": uname.release,
"machine": uname.machine,
},
"pytest_root": str(Path.cwd()),
"cpu_logical_count": os.cpu_count() or 1,
}
def _load_snapshot() -> dict[str, float | None]:
cpu_count = os.cpu_count() or 1
if not hasattr(os, "getloadavg"):
return {
"load_1": None,
"load_5": None,
"load_15": None,
"load_1_per_cpu": None,
"load_5_per_cpu": None,
"load_15_per_cpu": None,
}
load_1, load_5, load_15 = os.getloadavg()
return {
"load_1": round(load_1, 6),
"load_5": round(load_5, 6),
"load_15": round(load_15, 6),
"load_1_per_cpu": round(load_1 / cpu_count, 6),
"load_5_per_cpu": round(load_5 / cpu_count, 6),
"load_15_per_cpu": round(load_15 / cpu_count, 6),
}
def _memory_snapshot() -> dict[str, float | None]:
meminfo = Path("/proc/meminfo")
if not meminfo.exists():
return {"total_mib": None, "available_mib": None, "available_ratio": None}
values = {}
for line in meminfo.read_text(encoding="utf-8").splitlines():
if ":" not in line:
continue
key, raw_value = line.split(":", 1)
parts = raw_value.strip().split()
if not parts:
continue
try:
values[key] = float(parts[0])
except ValueError:
continue
total_kib = values.get("MemTotal")
available_kib = values.get("MemAvailable")
if not total_kib or available_kib is None:
return {"total_mib": None, "available_mib": None, "available_ratio": None}
return {
"total_mib": round(total_kib / 1024.0, 3),
"available_mib": round(available_kib / 1024.0, 3),
"available_ratio": round(available_kib / total_kib, 6),
}
def _process_snapshot() -> dict[str, float]:
if resource is None:
return {
"user_seconds": 0.0,
"system_seconds": 0.0,
"max_rss_mib": 0.0,
}
usage = resource.getrusage(resource.RUSAGE_SELF)
return {
"user_seconds": round(float(usage.ru_utime), 6),
"system_seconds": round(float(usage.ru_stime), 6),
"max_rss_mib": round(_rss_to_mib(float(usage.ru_maxrss)), 6),
}
def _process_delta(start: dict[str, Any], end: dict[str, Any]) -> dict[str, float | None]:
start_process = start.get("process", {})
end_process = end.get("process", {})
user_start = _as_float(start_process.get("user_seconds"))
user_end = _as_float(end_process.get("user_seconds"))
system_start = _as_float(start_process.get("system_seconds"))
system_end = _as_float(end_process.get("system_seconds"))
return {
"user_seconds": round(user_end - user_start, 6)
if user_start is not None and user_end is not None
else None,
"system_seconds": round(system_end - system_start, 6)
if system_start is not None and system_end is not None
else None,
}
def _rss_to_mib(value: float) -> float:
if sys.platform == "darwin":
return value / (1024.0 * 1024.0)
return value / 1024.0
def _negative_drift(current: float, baseline: float, ratio: float, min_delta: float) -> bool:
return baseline > 0 and current - baseline >= min_delta and current >= baseline * (1.0 + ratio)
def _similar_sample_size(previous: float | None, current: float | None) -> bool:
if previous is None or current is None:
return True
return abs(current - previous) <= max(2.0, previous * 0.1)
def _format_drift(name: str, current: float, baseline: float, unit: str) -> str:
ratio = ((current / baseline) - 1.0) * 100.0 if baseline else 0.0
suffix = unit if unit else ""
return f"{name} {current:.3f}{suffix} is {ratio:.1f}% above historical {baseline:.3f}{suffix}"
def _run_id(started_at: str) -> str:
return started_at.replace("-", "").replace(":", "").replace(".", "").replace("+", "Z")
def _utc_now() -> str:
return datetime.now(UTC).isoformat(timespec="microseconds").replace("+00:00", "Z")
def _positive_int(value: Any, default: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
return default
return parsed if parsed > 0 else default
def _positive_float(value: Any, default: float) -> float:
try:
parsed = float(value)
except (TypeError, ValueError):
return default
return parsed if parsed > 0 else default
def _as_float(value: Any) -> float | None:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None