"""Concurrent load benchmark for Django LiveView vs Phoenix LiveView.""" import argparse import asyncio import csv import os from datetime import datetime from pathlib import Path import requests from playwright.async_api import async_playwright RESULTS_DIR = Path( os.environ.get("RESULTS_DIR", Path(__file__).parent.parent / "results") ) CONCURRENCY_LEVELS = [1, 5, 10, 25, 50] ACTION_TIMEOUT_MS = 5_000 WARMUP_ITERATIONS = 2 SCENARIOS = ["add_alert", "delete_alert", "search_filter"] # Same MutationObserver technique as performance_test.py. # Observes #alerts-container when present (Django), falls back to #alerts-tbody (Phoenix). _OBSERVER_JS = """([op, threshold]) => { window.__mutationTime = null; const root = document.querySelector('#alerts-container') || document.querySelector('#alerts-tbody'); if (!root) return; const check = () => { const n = document.querySelectorAll('#alerts-tbody tr').length; const met = op === '>' ? n > threshold : op === '<' ? n < threshold : n === threshold; if (met) { window.__mutationTime = performance.now(); obs.disconnect(); } }; const obs = new MutationObserver(check); obs.observe(root, { childList: true, subtree: true }); }""" _WS_READY_JS = ( "document.querySelector('#ws-status')" "?.innerText.trim().toLowerCase() === 'connected'" ) # ------------------------------------------------------------------ # # HTTP helpers # # ------------------------------------------------------------------ # def _api(base_url: str, path: str, params: dict | None = None) -> None: requests.get(f"{base_url}/{path}", params=params, timeout=10) def _prepare_db(base_url: str, scenario: str) -> int: """Reset DB for the scenario and return the expected initial row count.""" _api(base_url, "bench/clear/") if scenario == "delete_alert": _api(base_url, "bench/populate/", params={"count": 50}) return 50 if scenario == "search_filter": _api(base_url, "bench/populate/", params={"count": 60}) return 60 return 0 # add_alert starts from empty # ------------------------------------------------------------------ # # Statistics # # ------------------------------------------------------------------ # def _percentile(data: list[float], p: float) -> float: if not data: return 0.0 s = sorted(data) idx = (p / 100) * (len(s) - 1) lo = int(idx) hi = lo + 1 if hi >= len(s): return s[-1] return s[lo] + (idx - lo) * (s[hi] - s[lo]) # ------------------------------------------------------------------ # # Single async client # # ------------------------------------------------------------------ # async def _run_client( browser, base_url: str, scenario: str, initial_rows: int, barrier: asyncio.Barrier, timeout_ms: int, ) -> float | None: """ Open a fresh browser context, wait for WS, synchronise at the barrier, fire the action, and return elapsed ms. Returns None on timeout or error. """ context = await browser.new_context() page = await context.new_page() try: await page.goto(base_url, wait_until="load") await page.wait_for_selector("#ws-status", timeout=15_000) await page.wait_for_function(_WS_READY_JS, timeout=15_000) op = ">" if scenario == "add_alert" else "<" await page.evaluate(_OBSERVER_JS, [op, initial_rows]) # All clients reach this point before any fires the action. await barrier.wait() t0: float = await page.evaluate("performance.now()") if scenario == "add_alert": await page.click("#add-alert-btn") elif scenario == "delete_alert": await page.locator(".delete-btn").first.click() else: await page.fill("#search-input", "cpu") await page.wait_for_function( "window.__mutationTime !== null", polling=1, timeout=timeout_ms, ) mutation_time: float = await page.evaluate("window.__mutationTime") return mutation_time - t0 except Exception: return None finally: await context.close() # ------------------------------------------------------------------ # # Warmup # # ------------------------------------------------------------------ # async def _warmup(browser, base_url: str, scenario: str, initial_rows: int) -> None: for _ in range(WARMUP_ITERATIONS): barrier = asyncio.Barrier(1) await _run_client( browser, base_url, scenario, initial_rows, barrier, timeout_ms=10_000 ) # ------------------------------------------------------------------ # # One (framework, scenario) block across all concurrency levels # # ------------------------------------------------------------------ # async def _benchmark_scenario( browser, base_url: str, framework: str, scenario: str, writer: csv.DictWriter, ) -> None: print(f" [{scenario}]") for n_clients in CONCURRENCY_LEVELS: # Prepare DB and warmup, then reset again for the real run. initial_rows = _prepare_db(base_url, scenario) await _warmup(browser, base_url, scenario, initial_rows) initial_rows = _prepare_db(base_url, scenario) barrier = asyncio.Barrier(n_clients) tasks = [ _run_client( browser, base_url, scenario, initial_rows, barrier, ACTION_TIMEOUT_MS ) for _ in range(n_clients) ] results = await asyncio.gather(*tasks) times = [r for r in results if r is not None] n_failed = n_clients - len(times) p50 = round(_percentile(times, 50), 2) p95 = round(_percentile(times, 95), 2) p99 = round(_percentile(times, 99), 2) mn = round(min(times), 2) if times else 0.0 mx = round(max(times), 2) if times else 0.0 print( f" n={n_clients:>2} " f"p50={p50:.1f}ms p95={p95:.1f}ms p99={p99:.1f}ms " f"ok={len(times)} failed={n_failed}" ) writer.writerow( { "framework": framework, "scenario": scenario, "n_clients": n_clients, "p50_ms": p50, "p95_ms": p95, "p99_ms": p99, "min_ms": mn, "max_ms": mx, "n_ok": len(times), "n_failed": n_failed, } ) # ------------------------------------------------------------------ # # Entry point # # ------------------------------------------------------------------ # async def main(django_url: str, phoenix_url: str) -> None: RESULTS_DIR.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") csv_path = RESULTS_DIR / f"concurrency_{timestamp}.csv" fieldnames = [ "framework", "scenario", "n_clients", "p50_ms", "p95_ms", "p99_ms", "min_ms", "max_ms", "n_ok", "n_failed", ] frameworks = [ ("Django LiveView", django_url), ("Phoenix LiveView", phoenix_url), ] with open(csv_path, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() async with async_playwright() as pw: browser = await pw.chromium.launch(headless=True) for framework, base_url in frameworks: print(f"\n=== {framework} ===") for scenario in SCENARIOS: await _benchmark_scenario( browser, base_url, framework, scenario, writer ) await browser.close() print(f"\nCSV saved: {csv_path}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Concurrent load benchmark") parser.add_argument( "--django", default=os.environ.get("DJANGO_URL", "http://localhost:8001") ) parser.add_argument( "--phoenix", default=os.environ.get("PHOENIX_URL", "http://localhost:8002") ) args = parser.parse_args() asyncio.run(main(args.django, args.phoenix))