Files
andros 2fdf46f8c5 Add concurrent load benchmark for Django vs Phoenix LiveView
Implements asyncio.Barrier-synchronized N-client concurrency test across
levels [1, 5, 10, 25, 50] using the same MutationObserver timing technique
as performance_test.py. Outputs p50/p95/p99/min/max and failure counts to CSV.
2026-05-15 21:36:15 +02:00

263 lines
8.4 KiB
Python

"""Concurrent load benchmark for Django LiveView vs Phoenix LiveView."""
import argparse
import asyncio
import csv
import os
from datetime import datetime
from pathlib import Path
import requests
from playwright.async_api import async_playwright
RESULTS_DIR = Path(
os.environ.get("RESULTS_DIR", Path(__file__).parent.parent / "results")
)
CONCURRENCY_LEVELS = [1, 5, 10, 25, 50]
ACTION_TIMEOUT_MS = 5_000
WARMUP_ITERATIONS = 2
SCENARIOS = ["add_alert", "delete_alert", "search_filter"]
# Same MutationObserver technique as performance_test.py.
# Observes #alerts-container when present (Django), falls back to #alerts-tbody (Phoenix).
_OBSERVER_JS = """([op, threshold]) => {
window.__mutationTime = null;
const root = document.querySelector('#alerts-container')
|| document.querySelector('#alerts-tbody');
if (!root) return;
const check = () => {
const n = document.querySelectorAll('#alerts-tbody tr').length;
const met = op === '>' ? n > threshold
: op === '<' ? n < threshold
: n === threshold;
if (met) {
window.__mutationTime = performance.now();
obs.disconnect();
}
};
const obs = new MutationObserver(check);
obs.observe(root, { childList: true, subtree: true });
}"""
_WS_READY_JS = (
"document.querySelector('#ws-status')"
"?.innerText.trim().toLowerCase() === 'connected'"
)
# ------------------------------------------------------------------ #
# HTTP helpers #
# ------------------------------------------------------------------ #
def _api(base_url: str, path: str, params: dict | None = None) -> None:
requests.get(f"{base_url}/{path}", params=params, timeout=10)
def _prepare_db(base_url: str, scenario: str) -> int:
"""Reset DB for the scenario and return the expected initial row count."""
_api(base_url, "bench/clear/")
if scenario == "delete_alert":
_api(base_url, "bench/populate/", params={"count": 50})
return 50
if scenario == "search_filter":
_api(base_url, "bench/populate/", params={"count": 60})
return 60
return 0 # add_alert starts from empty
# ------------------------------------------------------------------ #
# Statistics #
# ------------------------------------------------------------------ #
def _percentile(data: list[float], p: float) -> float:
if not data:
return 0.0
s = sorted(data)
idx = (p / 100) * (len(s) - 1)
lo = int(idx)
hi = lo + 1
if hi >= len(s):
return s[-1]
return s[lo] + (idx - lo) * (s[hi] - s[lo])
# ------------------------------------------------------------------ #
# Single async client #
# ------------------------------------------------------------------ #
async def _run_client(
browser,
base_url: str,
scenario: str,
initial_rows: int,
barrier: asyncio.Barrier,
timeout_ms: int,
) -> float | None:
"""
Open a fresh browser context, wait for WS, synchronise at the barrier,
fire the action, and return elapsed ms. Returns None on timeout or error.
"""
context = await browser.new_context()
page = await context.new_page()
try:
await page.goto(base_url, wait_until="load")
await page.wait_for_selector("#ws-status", timeout=15_000)
await page.wait_for_function(_WS_READY_JS, timeout=15_000)
op = ">" if scenario == "add_alert" else "<"
await page.evaluate(_OBSERVER_JS, [op, initial_rows])
# All clients reach this point before any fires the action.
await barrier.wait()
t0: float = await page.evaluate("performance.now()")
if scenario == "add_alert":
await page.click("#add-alert-btn")
elif scenario == "delete_alert":
await page.locator(".delete-btn").first.click()
else:
await page.fill("#search-input", "cpu")
await page.wait_for_function(
"window.__mutationTime !== null",
polling=1,
timeout=timeout_ms,
)
mutation_time: float = await page.evaluate("window.__mutationTime")
return mutation_time - t0
except Exception:
return None
finally:
await context.close()
# ------------------------------------------------------------------ #
# Warmup #
# ------------------------------------------------------------------ #
async def _warmup(browser, base_url: str, scenario: str, initial_rows: int) -> None:
for _ in range(WARMUP_ITERATIONS):
barrier = asyncio.Barrier(1)
await _run_client(
browser, base_url, scenario, initial_rows, barrier, timeout_ms=10_000
)
# ------------------------------------------------------------------ #
# One (framework, scenario) block across all concurrency levels #
# ------------------------------------------------------------------ #
async def _benchmark_scenario(
browser,
base_url: str,
framework: str,
scenario: str,
writer: csv.DictWriter,
) -> None:
print(f" [{scenario}]")
for n_clients in CONCURRENCY_LEVELS:
# Prepare DB and warmup, then reset again for the real run.
initial_rows = _prepare_db(base_url, scenario)
await _warmup(browser, base_url, scenario, initial_rows)
initial_rows = _prepare_db(base_url, scenario)
barrier = asyncio.Barrier(n_clients)
tasks = [
_run_client(
browser, base_url, scenario, initial_rows, barrier, ACTION_TIMEOUT_MS
)
for _ in range(n_clients)
]
results = await asyncio.gather(*tasks)
times = [r for r in results if r is not None]
n_failed = n_clients - len(times)
p50 = round(_percentile(times, 50), 2)
p95 = round(_percentile(times, 95), 2)
p99 = round(_percentile(times, 99), 2)
mn = round(min(times), 2) if times else 0.0
mx = round(max(times), 2) if times else 0.0
print(
f" n={n_clients:>2} "
f"p50={p50:.1f}ms p95={p95:.1f}ms p99={p99:.1f}ms "
f"ok={len(times)} failed={n_failed}"
)
writer.writerow(
{
"framework": framework,
"scenario": scenario,
"n_clients": n_clients,
"p50_ms": p50,
"p95_ms": p95,
"p99_ms": p99,
"min_ms": mn,
"max_ms": mx,
"n_ok": len(times),
"n_failed": n_failed,
}
)
# ------------------------------------------------------------------ #
# Entry point #
# ------------------------------------------------------------------ #
async def main(django_url: str, phoenix_url: str) -> None:
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_path = RESULTS_DIR / f"concurrency_{timestamp}.csv"
fieldnames = [
"framework",
"scenario",
"n_clients",
"p50_ms",
"p95_ms",
"p99_ms",
"min_ms",
"max_ms",
"n_ok",
"n_failed",
]
frameworks = [
("Django LiveView", django_url),
("Phoenix LiveView", phoenix_url),
]
with open(csv_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
for framework, base_url in frameworks:
print(f"\n=== {framework} ===")
for scenario in SCENARIOS:
await _benchmark_scenario(
browser, base_url, framework, scenario, writer
)
await browser.close()
print(f"\nCSV saved: {csv_path}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Concurrent load benchmark")
parser.add_argument(
"--django", default=os.environ.get("DJANGO_URL", "http://localhost:8001")
)
parser.add_argument(
"--phoenix", default=os.environ.get("PHOENIX_URL", "http://localhost:8002")
)
args = parser.parse_args()
asyncio.run(main(args.django, args.phoenix))