Files
andros bdd181425b Initial commit: Django LiveView vs Phoenix LiveView benchmark
Docker Compose project with automated Playwright benchmarks comparing
django-liveview 2.2.0 against Phoenix LiveView 1.0 across 6 scenarios.
2026-05-15 15:46:50 +02:00

224 lines
8.9 KiB
Python

"""
Low-level benchmark logic for a single framework instance.
Each PerformanceTest navigates to the app, waits for the WebSocket to be
ready, then runs the test scenarios defined in SCENARIOS.
"""
import time
import requests
from playwright.sync_api import Page
WARMUP = 2
class PerformanceTest:
def __init__(self, page: Page, base_url: str, label: str):
self.page = page
self.base_url = base_url
self.label = label
self._ws_sent = 0
self._ws_recv = 0
# ------------------------------------------------------------------ #
# Internal helpers #
# ------------------------------------------------------------------ #
def _reset_ws_counters(self):
self._ws_sent = 0
self._ws_recv = 0
def _track_ws(self, ws):
ws.on("framesent", self._on_sent)
ws.on("framereceived", self._on_recv)
def _on_sent(self, payload):
# Playwright 1.50+: payload is a plain str or bytes
if isinstance(payload, (str, bytes, bytearray)):
self._ws_sent += len(payload) if isinstance(payload, (bytes, bytearray)) else len(payload.encode())
elif hasattr(payload, "payload") and payload.payload:
self._ws_sent += len(payload.payload)
def _on_recv(self, payload):
if isinstance(payload, (str, bytes, bytearray)):
self._ws_recv += len(payload) if isinstance(payload, (bytes, bytearray)) else len(payload.encode())
elif hasattr(payload, "payload") and payload.payload:
self._ws_recv += len(payload.payload)
def _navigate(self):
self.page.on("websocket", self._track_ws)
self.page.goto(self.base_url, wait_until="load")
# Both apps expose #ws-status that reads "connected" once WS is up
self.page.wait_for_selector("#ws-status", timeout=15_000)
self.page.wait_for_function(
"document.querySelector('#ws-status').innerText.trim().toLowerCase() === 'connected'",
timeout=15_000,
)
def _api(self, path: str, params: dict | None = None):
requests.get(f"{self.base_url}/{path}", params=params, timeout=10)
def _row_count(self) -> int:
return self.page.evaluate(
"document.querySelectorAll('#alerts-tbody tr').length"
)
def _wait_row_count(self, expected: int, timeout: int = 8_000):
self.page.wait_for_function(
f"document.querySelectorAll('#alerts-tbody tr').length === {expected}",
timeout=timeout,
)
def _wait_row_count_gt(self, current: int, timeout: int = 8_000):
self.page.wait_for_function(
f"document.querySelectorAll('#alerts-tbody tr').length > {current}",
timeout=timeout,
)
def _wait_row_count_lt(self, current: int, timeout: int = 8_000):
self.page.wait_for_function(
f"document.querySelectorAll('#alerts-tbody tr').length < {current}",
timeout=timeout,
)
# ------------------------------------------------------------------ #
# Individual scenario measurements (return ms) #
# ------------------------------------------------------------------ #
def _measure_add_alert(self) -> tuple[float, int, int]:
before = self._row_count()
self._reset_ws_counters()
t0 = time.perf_counter()
self.page.click("#add-alert-btn")
self._wait_row_count_gt(before)
elapsed = (time.perf_counter() - t0) * 1000
return elapsed, self._ws_sent, self._ws_recv
def _measure_delete_alert(self) -> tuple[float, int, int]:
before = self._row_count()
btn = self.page.locator(".delete-btn").first
self._reset_ws_counters()
t0 = time.perf_counter()
btn.click()
self._wait_row_count_lt(before)
elapsed = (time.perf_counter() - t0) * 1000
return elapsed, self._ws_sent, self._ws_recv
def _measure_search(self, query: str, from_rows: int) -> tuple[float, int, int]:
"""Type query, wait for row count to change from from_rows."""
self._reset_ws_counters()
t0 = time.perf_counter()
self.page.fill("#search-input", query)
# fill() fires the native input event; both frameworks listen to it
self._wait_row_count_lt(from_rows)
elapsed = (time.perf_counter() - t0) * 1000
return elapsed, self._ws_sent, self._ws_recv
def _measure_search_clear(self, total_rows: int) -> tuple[float, int, int]:
"""Clear search, wait until full list is restored."""
self._reset_ws_counters()
t0 = time.perf_counter()
self.page.fill("#search-input", "")
self._wait_row_count(total_rows)
elapsed = (time.perf_counter() - t0) * 1000
return elapsed, self._ws_sent, self._ws_recv
# ------------------------------------------------------------------ #
# Public scenario runners (return list of (ms, ws_sent, ws_recv)) #
# ------------------------------------------------------------------ #
def run_scenario(self, name: str, iterations: int, warmup: int = WARMUP) -> list[dict]:
runner = getattr(self, f"_scenario_{name}")
return runner(iterations=iterations, warmup=warmup)
# -- Common scenarios --
def _scenario_add_alert(self, iterations: int, warmup: int) -> list[dict]:
self._api("bench/clear/")
self._navigate()
results = []
for i in range(iterations + warmup):
ms, sent, recv = self._measure_add_alert()
if i >= warmup:
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
return results
def _scenario_delete_alert(self, iterations: int, warmup: int) -> list[dict]:
total = iterations + warmup
self._api("bench/clear/")
self._api("bench/populate/", params={"count": total})
self._navigate()
results = []
for i in range(total):
ms, sent, recv = self._measure_delete_alert()
if i >= warmup:
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
return results
def _scenario_search_filter(self, iterations: int, warmup: int) -> list[dict]:
# Use 60 alerts so "cpu" (matching "CPU usage above 90%") returns ~6 rows
self._api("bench/clear/")
self._api("bench/populate/", params={"count": 60})
self._navigate()
total_rows = self._row_count()
results = []
for i in range(iterations + warmup):
# Measure: type "cpu" -> filtered results appear (< total_rows)
ms, sent, recv = self._measure_search("cpu", total_rows)
if i >= warmup:
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
# Reset: clear search -> full list restored
self.page.fill("#search-input", "")
self._wait_row_count(total_rows)
return results
# -- Edge case scenarios --
def _scenario_large_list_add(self, iterations: int, warmup: int) -> list[dict]:
self._api("bench/clear/")
self._api("bench/populate/", params={"count": 500})
self._navigate()
results = []
for i in range(iterations + warmup):
ms, sent, recv = self._measure_add_alert()
if i >= warmup:
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
return results
def _scenario_rapid_fire(self, iterations: int, warmup: int) -> list[dict]:
"""Click add 5 times quickly, measure until all 5 rows appear."""
burst = 5
self._api("bench/clear/")
self._navigate()
results = []
for i in range(iterations + warmup):
before = self._row_count()
self._reset_ws_counters()
t0 = time.perf_counter()
for _ in range(burst):
self.page.click("#add-alert-btn")
time.sleep(0.05)
self._wait_row_count_gt(before + burst - 1, timeout=15_000)
elapsed = (time.perf_counter() - t0) * 1000
if i >= warmup:
results.append(
{"ms": elapsed, "ws_sent": self._ws_sent, "ws_recv": self._ws_recv}
)
return results
def _scenario_empty_search(self, iterations: int, warmup: int) -> list[dict]:
"""Search for a term that matches nothing - measures empty-state render."""
self._api("bench/clear/")
self._api("bench/populate/", params={"count": 50})
self._navigate()
total_rows = self._row_count()
results = []
for i in range(iterations + warmup):
ms, sent, recv = self._measure_search("ZZZNOMATCH999", total_rows)
if i >= warmup:
results.append({"ms": ms, "ws_sent": sent, "ws_recv": recv})
# Restore full list
self.page.fill("#search-input", "")
self._wait_row_count(total_rows)
return results