Files
andros de00642b04 Replace perf_counter timing with MutationObserver browser clock
Eliminates ~10ms raf quantization (16ms polling default). Observer
watches performance.now() at the instant the DOM condition is met.
Falls back to #alerts-tbody for Phoenix which has no #alerts-container.
2026-05-15 21:24:50 +02:00

269 lines
10 KiB
Python

"""
Low-level benchmark logic for a single framework instance.
Each PerformanceTest navigates to the app, waits for the WebSocket to be
ready, then runs the test scenarios defined in SCENARIOS.
"""
import time
import requests
from playwright.sync_api import Page
WARMUP = 2
class PerformanceTest:
def __init__(self, page: Page, base_url: str, label: str):
self.page = page
self.base_url = base_url
self.label = label
self._ws_sent = 0
self._ws_recv = 0
# ------------------------------------------------------------------ #
# Internal helpers #
# ------------------------------------------------------------------ #
def _reset_ws_counters(self):
self._ws_sent = 0
self._ws_recv = 0
def _track_ws(self, ws):
ws.on("framesent", self._on_sent)
ws.on("framereceived", self._on_recv)
def _on_sent(self, payload):
# Playwright 1.50+: payload is a plain str or bytes
if isinstance(payload, (str, bytes, bytearray)):
self._ws_sent += (
len(payload)
if isinstance(payload, (bytes, bytearray))
else len(payload.encode())
)
elif hasattr(payload, "payload") and payload.payload:
self._ws_sent += len(payload.payload)
def _on_recv(self, payload):
if isinstance(payload, (str, bytes, bytearray)):
self._ws_recv += (
len(payload)
if isinstance(payload, (bytes, bytearray))
else len(payload.encode())
)
elif hasattr(payload, "payload") and payload.payload:
self._ws_recv += len(payload.payload)
def _navigate(self):
self.page.on("websocket", self._track_ws)
self.page.goto(self.base_url, wait_until="load")
# Both apps expose #ws-status that reads "connected" once WS is up
self.page.wait_for_selector("#ws-status", timeout=15_000)
self.page.wait_for_function(
"document.querySelector('#ws-status').innerText.trim().toLowerCase() === 'connected'",
timeout=15_000,
)
def _api(self, path: str, params: dict | None = None):
requests.get(f"{self.base_url}/{path}", params=params, timeout=10)
def _row_count(self) -> int:
return self.page.evaluate(
"document.querySelectorAll('#alerts-tbody tr').length"
)
def _wait_row_count(self, expected: int, timeout: int = 8_000):
self.page.wait_for_function(
f"document.querySelectorAll('#alerts-tbody tr').length === {expected}",
timeout=timeout,
)
def _wait_row_count_gt(self, current: int, timeout: int = 8_000):
self.page.wait_for_function(
f"document.querySelectorAll('#alerts-tbody tr').length > {current}",
timeout=timeout,
)
def _wait_row_count_lt(self, current: int, timeout: int = 8_000):
self.page.wait_for_function(
f"document.querySelectorAll('#alerts-tbody tr').length < {current}",
timeout=timeout,
)
def _start_observer(self, op: str, threshold: int):
"""
Inject a MutationObserver on #alerts-tbody that records
performance.now() in window.__mutationTime the instant the
row-count condition (op, threshold) is satisfied.
The observer disconnects after firing to avoid memory leaks.
"""
self.page.evaluate(
"""([op, threshold]) => {
window.__mutationTime = null;
// Django wraps the table in #alerts-container (replaced wholesale).
// Phoenix has no such wrapper; it morphs #alerts-tbody in place.
// Prefer the container when present, fall back to tbody.
const root = document.querySelector('#alerts-container')
|| document.querySelector('#alerts-tbody');
if (!root) return;
const check = () => {
const n = document.querySelectorAll('#alerts-tbody tr').length;
const met = op === '>' ? n > threshold
: op === '<' ? n < threshold
: n === threshold;
if (met) {
window.__mutationTime = performance.now();
obs.disconnect();
}
};
const obs = new MutationObserver(check);
obs.observe(root, { childList: true, subtree: true });
}""",
[op, threshold],
)
def _browser_now(self) -> float:
"""Return the current browser-side performance.now() timestamp."""
return self.page.evaluate("performance.now()")
def _read_observer(self, t0: float, timeout: int = 8_000) -> float:
"""
Wait for window.__mutationTime to be set (polling every 1 ms),
then return elapsed time in ms using the shared browser clock.
"""
self.page.wait_for_function(
"window.__mutationTime !== null",
polling=1,
timeout=timeout,
)
return self.page.evaluate("window.__mutationTime") - t0
# ------------------------------------------------------------------ #
# Individual scenario measurements (return ms) #
# ------------------------------------------------------------------ #
def _measure_add_alert(self) -> tuple[float, int, int]:
before = self._row_count()
self._start_observer(">", before)
self._reset_ws_counters()
t0 = self._browser_now()
self.page.click("#add-alert-btn")
elapsed = self._read_observer(t0)
return elapsed, self._ws_sent, self._ws_recv
def _measure_delete_alert(self) -> tuple[float, int, int]:
before = self._row_count()
btn = self.page.locator(".delete-btn").first
self._start_observer("<", before)
self._reset_ws_counters()
t0 = self._browser_now()
btn.click()
elapsed = self._read_observer(t0)
return elapsed, self._ws_sent, self._ws_recv
def _measure_search(self, query: str, from_rows: int) -> tuple[float, int, int]:
"""Type query and wait for row count to drop below from_rows."""
self._start_observer("<", from_rows)
self._reset_ws_counters()
t0 = self._browser_now()
self.page.fill("#search-input", query)
elapsed = self._read_observer(t0)
return elapsed, self._ws_sent, self._ws_recv
# ------------------------------------------------------------------ #
# Public scenario runners (return list of (ms, ws_sent, ws_recv)) #
# ------------------------------------------------------------------ #
def run_scenario(self, name: str, iterations: int, warmup: int = WARMUP) -> list[dict]:
runner = getattr(self, f"_scenario_{name}")
return runner(iterations=iterations, warmup=warmup)
# -- Common scenarios --
def _scenario_add_alert(self, iterations: int, warmup: int) -> list[dict]:
self._api("bench/clear/")
self._navigate()
results = []
for i in range(iterations + warmup):
ms, sent, recv = self._measure_add_alert()
if i >= warmup:
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
return results
def _scenario_delete_alert(self, iterations: int, warmup: int) -> list[dict]:
total = iterations + warmup
self._api("bench/clear/")
self._api("bench/populate/", params={"count": total})
self._navigate()
results = []
for i in range(total):
ms, sent, recv = self._measure_delete_alert()
if i >= warmup:
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
return results
def _scenario_search_filter(self, iterations: int, warmup: int) -> list[dict]:
# Use 60 alerts so "cpu" (matching "CPU usage above 90%") returns ~6 rows
self._api("bench/clear/")
self._api("bench/populate/", params={"count": 60})
self._navigate()
total_rows = self._row_count()
results = []
for i in range(iterations + warmup):
ms, sent, recv = self._measure_search("cpu", total_rows)
if i >= warmup:
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
# Reset: clear search -> full list restored (not timed)
self.page.fill("#search-input", "")
self._wait_row_count(total_rows)
return results
# -- Edge case scenarios --
def _scenario_large_list_add(self, iterations: int, warmup: int) -> list[dict]:
self._api("bench/clear/")
self._api("bench/populate/", params={"count": 500})
self._navigate()
results = []
for i in range(iterations + warmup):
ms, sent, recv = self._measure_add_alert()
if i >= warmup:
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
return results
def _scenario_rapid_fire(self, iterations: int, warmup: int) -> list[dict]:
"""Click add 5 times quickly, measure until all 5 rows appear."""
burst = 5
self._api("bench/clear/")
self._navigate()
results = []
for i in range(iterations + warmup):
before = self._row_count()
self._start_observer(">", before + burst - 1)
self._reset_ws_counters()
t0 = self._browser_now()
for _ in range(burst):
self.page.click("#add-alert-btn")
time.sleep(0.05)
elapsed = self._read_observer(t0, timeout=15_000)
if i >= warmup:
results.append(
{"ms": elapsed, "ws_sent": self._ws_sent, "ws_recv": self._ws_recv}
)
return results
def _scenario_empty_search(self, iterations: int, warmup: int) -> list[dict]:
"""Search for a term that matches nothing — measures empty-state render."""
self._api("bench/clear/")
self._api("bench/populate/", params={"count": 50})
self._navigate()
total_rows = self._row_count()
results = []
for i in range(iterations + warmup):
ms, sent, recv = self._measure_search("ZZZNOMATCH999", total_rows)
if i >= warmup:
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
# Restore full list
self.page.fill("#search-input", "")
self._wait_row_count(total_rows)
return results