bdd181425b
Docker Compose project with automated Playwright benchmarks comparing django-liveview 2.2.0 against Phoenix LiveView 1.0 across 6 scenarios.
224 lines
8.9 KiB
Python
224 lines
8.9 KiB
Python
"""
|
|
Low-level benchmark logic for a single framework instance.
|
|
|
|
Each PerformanceTest navigates to the app, waits for the WebSocket to be
|
|
ready, then runs the test scenarios defined in SCENARIOS.
|
|
"""
|
|
import time
|
|
import requests
|
|
from playwright.sync_api import Page
|
|
|
|
WARMUP = 2
|
|
|
|
|
|
class PerformanceTest:
|
|
def __init__(self, page: Page, base_url: str, label: str):
|
|
self.page = page
|
|
self.base_url = base_url
|
|
self.label = label
|
|
self._ws_sent = 0
|
|
self._ws_recv = 0
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Internal helpers #
|
|
# ------------------------------------------------------------------ #
|
|
|
|
def _reset_ws_counters(self):
|
|
self._ws_sent = 0
|
|
self._ws_recv = 0
|
|
|
|
def _track_ws(self, ws):
|
|
ws.on("framesent", self._on_sent)
|
|
ws.on("framereceived", self._on_recv)
|
|
|
|
def _on_sent(self, payload):
|
|
# Playwright 1.50+: payload is a plain str or bytes
|
|
if isinstance(payload, (str, bytes, bytearray)):
|
|
self._ws_sent += len(payload) if isinstance(payload, (bytes, bytearray)) else len(payload.encode())
|
|
elif hasattr(payload, "payload") and payload.payload:
|
|
self._ws_sent += len(payload.payload)
|
|
|
|
def _on_recv(self, payload):
|
|
if isinstance(payload, (str, bytes, bytearray)):
|
|
self._ws_recv += len(payload) if isinstance(payload, (bytes, bytearray)) else len(payload.encode())
|
|
elif hasattr(payload, "payload") and payload.payload:
|
|
self._ws_recv += len(payload.payload)
|
|
|
|
def _navigate(self):
|
|
self.page.on("websocket", self._track_ws)
|
|
self.page.goto(self.base_url, wait_until="load")
|
|
# Both apps expose #ws-status that reads "connected" once WS is up
|
|
self.page.wait_for_selector("#ws-status", timeout=15_000)
|
|
self.page.wait_for_function(
|
|
"document.querySelector('#ws-status').innerText.trim().toLowerCase() === 'connected'",
|
|
timeout=15_000,
|
|
)
|
|
|
|
def _api(self, path: str, params: dict | None = None):
|
|
requests.get(f"{self.base_url}/{path}", params=params, timeout=10)
|
|
|
|
def _row_count(self) -> int:
|
|
return self.page.evaluate(
|
|
"document.querySelectorAll('#alerts-tbody tr').length"
|
|
)
|
|
|
|
def _wait_row_count(self, expected: int, timeout: int = 8_000):
|
|
self.page.wait_for_function(
|
|
f"document.querySelectorAll('#alerts-tbody tr').length === {expected}",
|
|
timeout=timeout,
|
|
)
|
|
|
|
def _wait_row_count_gt(self, current: int, timeout: int = 8_000):
|
|
self.page.wait_for_function(
|
|
f"document.querySelectorAll('#alerts-tbody tr').length > {current}",
|
|
timeout=timeout,
|
|
)
|
|
|
|
def _wait_row_count_lt(self, current: int, timeout: int = 8_000):
|
|
self.page.wait_for_function(
|
|
f"document.querySelectorAll('#alerts-tbody tr').length < {current}",
|
|
timeout=timeout,
|
|
)
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Individual scenario measurements (return ms) #
|
|
# ------------------------------------------------------------------ #
|
|
|
|
def _measure_add_alert(self) -> tuple[float, int, int]:
|
|
before = self._row_count()
|
|
self._reset_ws_counters()
|
|
t0 = time.perf_counter()
|
|
self.page.click("#add-alert-btn")
|
|
self._wait_row_count_gt(before)
|
|
elapsed = (time.perf_counter() - t0) * 1000
|
|
return elapsed, self._ws_sent, self._ws_recv
|
|
|
|
def _measure_delete_alert(self) -> tuple[float, int, int]:
|
|
before = self._row_count()
|
|
btn = self.page.locator(".delete-btn").first
|
|
self._reset_ws_counters()
|
|
t0 = time.perf_counter()
|
|
btn.click()
|
|
self._wait_row_count_lt(before)
|
|
elapsed = (time.perf_counter() - t0) * 1000
|
|
return elapsed, self._ws_sent, self._ws_recv
|
|
|
|
def _measure_search(self, query: str, from_rows: int) -> tuple[float, int, int]:
|
|
"""Type query, wait for row count to change from from_rows."""
|
|
self._reset_ws_counters()
|
|
t0 = time.perf_counter()
|
|
self.page.fill("#search-input", query)
|
|
# fill() fires the native input event; both frameworks listen to it
|
|
self._wait_row_count_lt(from_rows)
|
|
elapsed = (time.perf_counter() - t0) * 1000
|
|
return elapsed, self._ws_sent, self._ws_recv
|
|
|
|
def _measure_search_clear(self, total_rows: int) -> tuple[float, int, int]:
|
|
"""Clear search, wait until full list is restored."""
|
|
self._reset_ws_counters()
|
|
t0 = time.perf_counter()
|
|
self.page.fill("#search-input", "")
|
|
self._wait_row_count(total_rows)
|
|
elapsed = (time.perf_counter() - t0) * 1000
|
|
return elapsed, self._ws_sent, self._ws_recv
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Public scenario runners (return list of (ms, ws_sent, ws_recv)) #
|
|
# ------------------------------------------------------------------ #
|
|
|
|
def run_scenario(self, name: str, iterations: int, warmup: int = WARMUP) -> list[dict]:
|
|
runner = getattr(self, f"_scenario_{name}")
|
|
return runner(iterations=iterations, warmup=warmup)
|
|
|
|
# -- Common scenarios --
|
|
|
|
def _scenario_add_alert(self, iterations: int, warmup: int) -> list[dict]:
|
|
self._api("bench/clear/")
|
|
self._navigate()
|
|
results = []
|
|
for i in range(iterations + warmup):
|
|
ms, sent, recv = self._measure_add_alert()
|
|
if i >= warmup:
|
|
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
|
|
return results
|
|
|
|
def _scenario_delete_alert(self, iterations: int, warmup: int) -> list[dict]:
|
|
total = iterations + warmup
|
|
self._api("bench/clear/")
|
|
self._api("bench/populate/", params={"count": total})
|
|
self._navigate()
|
|
results = []
|
|
for i in range(total):
|
|
ms, sent, recv = self._measure_delete_alert()
|
|
if i >= warmup:
|
|
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
|
|
return results
|
|
|
|
def _scenario_search_filter(self, iterations: int, warmup: int) -> list[dict]:
|
|
# Use 60 alerts so "cpu" (matching "CPU usage above 90%") returns ~6 rows
|
|
self._api("bench/clear/")
|
|
self._api("bench/populate/", params={"count": 60})
|
|
self._navigate()
|
|
total_rows = self._row_count()
|
|
|
|
results = []
|
|
for i in range(iterations + warmup):
|
|
# Measure: type "cpu" -> filtered results appear (< total_rows)
|
|
ms, sent, recv = self._measure_search("cpu", total_rows)
|
|
if i >= warmup:
|
|
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
|
|
# Reset: clear search -> full list restored
|
|
self.page.fill("#search-input", "")
|
|
self._wait_row_count(total_rows)
|
|
return results
|
|
|
|
# -- Edge case scenarios --
|
|
|
|
def _scenario_large_list_add(self, iterations: int, warmup: int) -> list[dict]:
|
|
self._api("bench/clear/")
|
|
self._api("bench/populate/", params={"count": 500})
|
|
self._navigate()
|
|
results = []
|
|
for i in range(iterations + warmup):
|
|
ms, sent, recv = self._measure_add_alert()
|
|
if i >= warmup:
|
|
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
|
|
return results
|
|
|
|
def _scenario_rapid_fire(self, iterations: int, warmup: int) -> list[dict]:
|
|
"""Click add 5 times quickly, measure until all 5 rows appear."""
|
|
burst = 5
|
|
self._api("bench/clear/")
|
|
self._navigate()
|
|
results = []
|
|
for i in range(iterations + warmup):
|
|
before = self._row_count()
|
|
self._reset_ws_counters()
|
|
t0 = time.perf_counter()
|
|
for _ in range(burst):
|
|
self.page.click("#add-alert-btn")
|
|
time.sleep(0.05)
|
|
self._wait_row_count_gt(before + burst - 1, timeout=15_000)
|
|
elapsed = (time.perf_counter() - t0) * 1000
|
|
if i >= warmup:
|
|
results.append(
|
|
{"ms": elapsed, "ws_sent": self._ws_sent, "ws_recv": self._ws_recv}
|
|
)
|
|
return results
|
|
|
|
def _scenario_empty_search(self, iterations: int, warmup: int) -> list[dict]:
|
|
"""Search for a term that matches nothing - measures empty-state render."""
|
|
self._api("bench/clear/")
|
|
self._api("bench/populate/", params={"count": 50})
|
|
self._navigate()
|
|
total_rows = self._row_count()
|
|
results = []
|
|
for i in range(iterations + warmup):
|
|
ms, sent, recv = self._measure_search("ZZZNOMATCH999", total_rows)
|
|
if i >= warmup:
|
|
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
|
|
# Restore full list
|
|
self.page.fill("#search-input", "")
|
|
self._wait_row_count(total_rows)
|
|
return results
|