""" Low-level benchmark logic for a single framework instance. Each PerformanceTest navigates to the app, waits for the WebSocket to be ready, then runs the test scenarios defined in SCENARIOS. """ import time import requests from playwright.sync_api import Page WARMUP = 2 class PerformanceTest: def __init__(self, page: Page, base_url: str, label: str): self.page = page self.base_url = base_url self.label = label self._ws_sent = 0 self._ws_recv = 0 # ------------------------------------------------------------------ # # Internal helpers # # ------------------------------------------------------------------ # def _reset_ws_counters(self): self._ws_sent = 0 self._ws_recv = 0 def _track_ws(self, ws): ws.on("framesent", self._on_sent) ws.on("framereceived", self._on_recv) def _on_sent(self, payload): # Playwright 1.50+: payload is a plain str or bytes if isinstance(payload, (str, bytes, bytearray)): self._ws_sent += ( len(payload) if isinstance(payload, (bytes, bytearray)) else len(payload.encode()) ) elif hasattr(payload, "payload") and payload.payload: self._ws_sent += len(payload.payload) def _on_recv(self, payload): if isinstance(payload, (str, bytes, bytearray)): self._ws_recv += ( len(payload) if isinstance(payload, (bytes, bytearray)) else len(payload.encode()) ) elif hasattr(payload, "payload") and payload.payload: self._ws_recv += len(payload.payload) def _navigate(self): self.page.on("websocket", self._track_ws) self.page.goto(self.base_url, wait_until="load") # Both apps expose #ws-status that reads "connected" once WS is up self.page.wait_for_selector("#ws-status", timeout=15_000) self.page.wait_for_function( "document.querySelector('#ws-status').innerText.trim().toLowerCase() === 'connected'", timeout=15_000, ) def _api(self, path: str, params: dict | None = None): requests.get(f"{self.base_url}/{path}", params=params, timeout=10) def _row_count(self) -> int: return self.page.evaluate( "document.querySelectorAll('#alerts-tbody tr').length" ) def _wait_row_count(self, expected: int, timeout: int = 8_000): self.page.wait_for_function( f"document.querySelectorAll('#alerts-tbody tr').length === {expected}", timeout=timeout, ) def _wait_row_count_gt(self, current: int, timeout: int = 8_000): self.page.wait_for_function( f"document.querySelectorAll('#alerts-tbody tr').length > {current}", timeout=timeout, ) def _wait_row_count_lt(self, current: int, timeout: int = 8_000): self.page.wait_for_function( f"document.querySelectorAll('#alerts-tbody tr').length < {current}", timeout=timeout, ) def _start_observer(self, op: str, threshold: int): """ Inject a MutationObserver on #alerts-tbody that records performance.now() in window.__mutationTime the instant the row-count condition (op, threshold) is satisfied. The observer disconnects after firing to avoid memory leaks. """ self.page.evaluate( """([op, threshold]) => { window.__mutationTime = null; // Django wraps the table in #alerts-container (replaced wholesale). // Phoenix has no such wrapper; it morphs #alerts-tbody in place. // Prefer the container when present, fall back to tbody. const root = document.querySelector('#alerts-container') || document.querySelector('#alerts-tbody'); if (!root) return; const check = () => { const n = document.querySelectorAll('#alerts-tbody tr').length; const met = op === '>' ? n > threshold : op === '<' ? n < threshold : n === threshold; if (met) { window.__mutationTime = performance.now(); obs.disconnect(); } }; const obs = new MutationObserver(check); obs.observe(root, { childList: true, subtree: true }); }""", [op, threshold], ) def _browser_now(self) -> float: """Return the current browser-side performance.now() timestamp.""" return self.page.evaluate("performance.now()") def _read_observer(self, t0: float, timeout: int = 8_000) -> float: """ Wait for window.__mutationTime to be set (polling every 1 ms), then return elapsed time in ms using the shared browser clock. """ self.page.wait_for_function( "window.__mutationTime !== null", polling=1, timeout=timeout, ) return self.page.evaluate("window.__mutationTime") - t0 # ------------------------------------------------------------------ # # Individual scenario measurements (return ms) # # ------------------------------------------------------------------ # def _measure_add_alert(self) -> tuple[float, int, int]: before = self._row_count() self._start_observer(">", before) self._reset_ws_counters() t0 = self._browser_now() self.page.click("#add-alert-btn") elapsed = self._read_observer(t0) return elapsed, self._ws_sent, self._ws_recv def _measure_delete_alert(self) -> tuple[float, int, int]: before = self._row_count() btn = self.page.locator(".delete-btn").first self._start_observer("<", before) self._reset_ws_counters() t0 = self._browser_now() btn.click() elapsed = self._read_observer(t0) return elapsed, self._ws_sent, self._ws_recv def _measure_search(self, query: str, from_rows: int) -> tuple[float, int, int]: """Type query and wait for row count to drop below from_rows.""" self._start_observer("<", from_rows) self._reset_ws_counters() t0 = self._browser_now() self.page.fill("#search-input", query) elapsed = self._read_observer(t0) return elapsed, self._ws_sent, self._ws_recv # ------------------------------------------------------------------ # # Public scenario runners (return list of (ms, ws_sent, ws_recv)) # # ------------------------------------------------------------------ # def run_scenario(self, name: str, iterations: int, warmup: int = WARMUP) -> list[dict]: runner = getattr(self, f"_scenario_{name}") return runner(iterations=iterations, warmup=warmup) # -- Common scenarios -- def _scenario_add_alert(self, iterations: int, warmup: int) -> list[dict]: self._api("bench/clear/") self._navigate() results = [] for i in range(iterations + warmup): ms, sent, recv = self._measure_add_alert() if i >= warmup: results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv}) return results def _scenario_delete_alert(self, iterations: int, warmup: int) -> list[dict]: total = iterations + warmup self._api("bench/clear/") self._api("bench/populate/", params={"count": total}) self._navigate() results = [] for i in range(total): ms, sent, recv = self._measure_delete_alert() if i >= warmup: results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv}) return results def _scenario_search_filter(self, iterations: int, warmup: int) -> list[dict]: # Use 60 alerts so "cpu" (matching "CPU usage above 90%") returns ~6 rows self._api("bench/clear/") self._api("bench/populate/", params={"count": 60}) self._navigate() total_rows = self._row_count() results = [] for i in range(iterations + warmup): ms, sent, recv = self._measure_search("cpu", total_rows) if i >= warmup: results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv}) # Reset: clear search -> full list restored (not timed) self.page.fill("#search-input", "") self._wait_row_count(total_rows) return results # -- Edge case scenarios -- def _scenario_large_list_add(self, iterations: int, warmup: int) -> list[dict]: self._api("bench/clear/") self._api("bench/populate/", params={"count": 500}) self._navigate() results = [] for i in range(iterations + warmup): ms, sent, recv = self._measure_add_alert() if i >= warmup: results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv}) return results def _scenario_rapid_fire(self, iterations: int, warmup: int) -> list[dict]: """Click add 5 times quickly, measure until all 5 rows appear.""" burst = 5 self._api("bench/clear/") self._navigate() results = [] for i in range(iterations + warmup): before = self._row_count() self._start_observer(">", before + burst - 1) self._reset_ws_counters() t0 = self._browser_now() for _ in range(burst): self.page.click("#add-alert-btn") time.sleep(0.05) elapsed = self._read_observer(t0, timeout=15_000) if i >= warmup: results.append( {"ms": elapsed, "ws_sent": self._ws_sent, "ws_recv": self._ws_recv} ) return results def _scenario_empty_search(self, iterations: int, warmup: int) -> list[dict]: """Search for a term that matches nothing — measures empty-state render.""" self._api("bench/clear/") self._api("bench/populate/", params={"count": 50}) self._navigate() total_rows = self._row_count() results = [] for i in range(iterations + warmup): ms, sent, recv = self._measure_search("ZZZNOMATCH999", total_rows) if i >= warmup: results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv}) # Restore full list self.page.fill("#search-input", "") self._wait_row_count(total_rows) return results