"""Generate benchmark charts for the README.""" import csv import os from pathlib import Path from collections import defaultdict import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import matplotlib.patches as mpatches import numpy as np CONCURRENCY_CSV = Path(os.environ.get("CONCURRENCY_CSV", "")) RESULTS_DIR = Path( os.environ.get("RESULTS_DIR", Path(__file__).parent.parent / "results") ) CHARTS_DIR = Path(os.environ.get("CHARTS_DIR", Path(__file__).parent.parent / "charts")) DJANGO_C = "#2ECC71" PHOENIX_C = "#9B59B6" BG = "#0F1117" GRID_C = "#1E2130" TEXT_C = "#E8E8E8" SUB_C = "#8892A4" LEGEND_BG = "#1A1D2E" LEGEND_ED = "#2E3250" SCENARIO_LABELS = { "add_alert": "Add alert", "delete_alert": "Delete alert", "search_filter": "Search / filter", "large_list_add": "Large list\n(500 items)", "rapid_fire": "Rapid fire\n(5 clicks)", "empty_search": "Empty search", } COMMON_KEYS = ["add_alert", "delete_alert", "search_filter"] EDGE_KEYS = ["large_list_add", "rapid_fire", "empty_search"] ALL_KEYS = COMMON_KEYS + EDGE_KEYS FW_D = "Django LiveView" FW_P = "Phoenix LiveView" def load_latest_csv(results_dir: Path) -> list[dict]: csvs = sorted(results_dir.glob("results_*.csv")) if not csvs: raise FileNotFoundError(f"No results CSV found in {results_dir}") latest = csvs[-1] print(f"Using: {latest.name}") with open(latest) as f: return list(csv.DictReader(f)) def aggregate(rows: list[dict]) -> dict: data: dict = defaultdict( lambda: defaultdict(lambda: {"ms": [], "sent": [], "recv": []}) ) for r in rows: data[r["framework"]][r["scenario_key"]]["ms"].append(float(r["ms"])) data[r["framework"]][r["scenario_key"]]["sent"].append(int(r["ws_sent_b"])) data[r["framework"]][r["scenario_key"]]["recv"].append(int(r["ws_recv_b"])) return data def _base_ax(fig, ax): fig.patch.set_facecolor(BG) ax.set_facecolor(BG) for spine in ax.spines.values(): spine.set_visible(False) ax.yaxis.grid(True, color=GRID_C, linewidth=1, zorder=0) ax.set_axisbelow(True) ax.tick_params(colors=SUB_C, length=0) def _legend(ax): ax.legend( loc="upper right", frameon=True, facecolor=LEGEND_BG, edgecolor=LEGEND_ED, labelcolor=TEXT_C, fontsize=10, handlelength=1.2, handleheight=0.9, ) def _bar_labels(ax, bars, vals, y_offset): for bar, val in zip(bars, vals): ax.text( bar.get_x() + bar.get_width() / 2, bar.get_height() + y_offset, f"{val:.1f}", ha="center", va="bottom", color=TEXT_C, fontsize=9, fontweight="bold", ) def _title(ax, title, subtitle): ax.set_title( title, color=TEXT_C, fontsize=14, fontweight="bold", pad=20, loc="left" ) ax.text(0, 1.02, subtitle, transform=ax.transAxes, color=SUB_C, fontsize=9) def chart_latency( data: dict, keys: list[str], title: str, subtitle: str, filename: str ): labels = [SCENARIO_LABELS[k] for k in keys] d_avg = [np.mean(data[FW_D][k]["ms"]) for k in keys] p_avg = [np.mean(data[FW_P][k]["ms"]) for k in keys] d_std = [np.std(data[FW_D][k]["ms"]) for k in keys] p_std = [np.std(data[FW_P][k]["ms"]) for k in keys] fig, ax = plt.subplots(figsize=(9, 5)) _base_ax(fig, ax) x, w = np.arange(len(keys)), 0.32 bars_d = ax.bar( x - w / 2, d_avg, w, color=DJANGO_C, label=FW_D, zorder=3, linewidth=0 ) bars_p = ax.bar( x + w / 2, p_avg, w, color=PHOENIX_C, label=FW_P, zorder=3, linewidth=0 ) ax.errorbar( x - w / 2, d_avg, yerr=d_std, fmt="none", ecolor="white", elinewidth=1.2, capsize=4, capthick=1.2, zorder=4, ) ax.errorbar( x + w / 2, p_avg, yerr=p_std, fmt="none", ecolor="white", elinewidth=1.2, capsize=4, capthick=1.2, zorder=4, ) y_offset = max(d_std + p_std) + 2 _bar_labels(ax, bars_d, d_avg, y_offset) _bar_labels(ax, bars_p, p_avg, y_offset) ax.set_xticks(x) ax.set_xticklabels([""] * len(keys)) for xi, label in zip(x, labels): ax.text( xi, -max(d_avg + p_avg) * 0.1, label, ha="center", va="top", color=SUB_C, fontsize=10, linespacing=1.6, ) ax.set_ylabel("Latency (ms)", color=SUB_C, fontsize=11, labelpad=12) ax.tick_params(axis="y", colors=SUB_C) ax.set_ylim(0, max(d_avg + p_avg) * 1.35) _legend(ax) _title(ax, title, subtitle) fig.tight_layout(rect=[0, 0.08, 1, 1]) out = CHARTS_DIR / filename fig.savefig(out, dpi=160, bbox_inches="tight", facecolor=BG) plt.close(fig) print(f"Saved: {out}") def chart_payload( data: dict, keys: list[str], title: str, subtitle: str, filename: str ): labels = [SCENARIO_LABELS[k] for k in keys] d_recv = [np.mean(data[FW_D][k]["recv"]) / 1024 for k in keys] p_recv = [np.mean(data[FW_P][k]["recv"]) / 1024 for k in keys] fig, ax = plt.subplots(figsize=(11, 5)) _base_ax(fig, ax) x, w = np.arange(len(keys)), 0.32 bars_d = ax.bar( x - w / 2, d_recv, w, color=DJANGO_C, label=FW_D, zorder=3, linewidth=0 ) bars_p = ax.bar( x + w / 2, p_recv, w, color=PHOENIX_C, label=FW_P, zorder=3, linewidth=0 ) y_offset = max(d_recv + p_recv) * 0.02 for bar, val in zip((*bars_d, *bars_p), (*d_recv, *p_recv)): label = f"{val:.1f}" if val < 10 else f"{val:.0f}" ax.text( bar.get_x() + bar.get_width() / 2, bar.get_height() + y_offset, label, ha="center", va="bottom", color=TEXT_C, fontsize=9, fontweight="bold", ) ax.set_xticks(x) ax.set_xticklabels([""] * len(keys)) for xi, label in zip(x, labels): ax.text( xi, -max(d_recv + p_recv) * 0.08, label, ha="center", va="top", color=SUB_C, fontsize=10, linespacing=1.6, ) ax.set_ylabel("Data received (KB)", color=SUB_C, fontsize=11, labelpad=12) ax.tick_params(axis="y", colors=SUB_C) ax.set_ylim(0, max(d_recv + p_recv) * 1.3) _legend(ax) _title(ax, title, subtitle) fig.tight_layout(rect=[0, 0.08, 1, 1]) out = CHARTS_DIR / filename fig.savefig(out, dpi=160, bbox_inches="tight", facecolor=BG) plt.close(fig) print(f"Saved: {out}") def chart_distribution( data: dict, keys: list[str], title: str, subtitle: str, filename: str ): labels = [SCENARIO_LABELS[k] for k in keys] colors = [DJANGO_C, PHOENIX_C] fig, axes = plt.subplots(1, len(keys), figsize=(10, 5), sharey=False) fig.patch.set_facecolor(BG) for ax, key, label in zip(axes, keys, labels): ax.set_facecolor(BG) for spine in ax.spines.values(): spine.set_visible(False) ax.yaxis.grid(True, color=GRID_C, linewidth=1, zorder=0) ax.set_axisbelow(True) d_ms = data[FW_D][key]["ms"] p_ms = data[FW_P][key]["ms"] bp = ax.boxplot( [d_ms, p_ms], patch_artist=True, widths=0.4, medianprops=dict(color="white", linewidth=2), whiskerprops=dict(color=SUB_C, linewidth=1), capprops=dict(color=SUB_C, linewidth=1), flierprops=dict( marker="o", markerfacecolor=SUB_C, markersize=4, alpha=0.6, linestyle="none", ), ) for box, color in zip(bp["boxes"], colors): box.set_facecolor(color) box.set_linewidth(0) ax.set_title(label, color=TEXT_C, fontsize=11, fontweight="bold", pad=10) ax.set_xticklabels(["Django", "Phoenix"], color=SUB_C, fontsize=9) ax.tick_params(colors=SUB_C, length=0) ax.set_ylabel("ms" if ax is axes[0] else "", color=SUB_C, fontsize=10) ax.tick_params(axis="y", colors=SUB_C) django_patch = mpatches.Patch(color=DJANGO_C, label=FW_D) phoenix_patch = mpatches.Patch(color=PHOENIX_C, label=FW_P) fig.legend( handles=[django_patch, phoenix_patch], loc="upper center", ncol=2, frameon=True, facecolor=LEGEND_BG, edgecolor=LEGEND_ED, labelcolor=TEXT_C, fontsize=10, bbox_to_anchor=(0.5, 1.04), ) fig.text( 0.5, 1.08, title, ha="center", color=TEXT_C, fontsize=14, fontweight="bold" ) fig.text(0.5, 1.02, subtitle, ha="center", color=SUB_C, fontsize=9) fig.tight_layout() out = CHARTS_DIR / filename fig.savefig(out, dpi=160, bbox_inches="tight", facecolor=BG) plt.close(fig) print(f"Saved: {out}") def load_concurrency_csv(path: Path) -> list[dict]: """Load a concurrency CSV produced by concurrency_test.py.""" if not path or not path.exists(): csvs = sorted(RESULTS_DIR.glob("concurrency_*.csv")) if not csvs: return [] path = csvs[-1] print(f"Using concurrency: {path.name}") with open(path) as f: return list(csv.DictReader(f)) def chart_concurrency(rows: list[dict]) -> None: """Two-panel line chart (add + search) showing p50 + p95 vs concurrency.""" if not rows: print("No concurrency data, skipping chart.") return scenarios = ["add_alert", "search_filter"] scenario_titles = ["Add alert", "Search / filter"] levels = [1, 5, 10, 25, 50] def extract(fw: str, scenario: str, col: str) -> list[float]: return [ float(r[col]) for r in rows if r["framework"] == fw and r["scenario"] == scenario ] fig, axes = plt.subplots(1, 2, figsize=(10, 5)) fig.patch.set_facecolor(BG) for ax, scenario, title in zip(axes, scenarios, scenario_titles): ax.set_facecolor(BG) for spine in ax.spines.values(): spine.set_visible(False) ax.yaxis.grid(True, color=GRID_C, linewidth=1, zorder=0) ax.set_axisbelow(True) ax.tick_params(colors=SUB_C, length=0) d50 = extract(FW_D, scenario, "p50_ms") d95 = extract(FW_D, scenario, "p95_ms") p50 = extract(FW_P, scenario, "p50_ms") p95 = extract(FW_P, scenario, "p95_ms") ax.plot( levels, d50, color=DJANGO_C, marker="o", linewidth=2, markersize=5, label="Django p50", zorder=3, ) ax.plot( levels, d95, color=DJANGO_C, marker="o", linewidth=1.2, markersize=4, linestyle="--", alpha=0.55, label="Django p95", zorder=3, ) ax.plot( levels, p50, color=PHOENIX_C, marker="s", linewidth=2, markersize=5, label="Phoenix p50", zorder=3, ) ax.plot( levels, p95, color=PHOENIX_C, marker="s", linewidth=1.2, markersize=4, linestyle="--", alpha=0.55, label="Phoenix p95", zorder=3, ) ax.set_title(title, color=TEXT_C, fontsize=12, fontweight="bold", pad=10) ax.set_xlabel("Concurrent clients", color=SUB_C, fontsize=10, labelpad=8) if ax is axes[0]: ax.set_ylabel("Latency (ms)", color=SUB_C, fontsize=10, labelpad=8) ax.tick_params(axis="y", colors=SUB_C) ax.tick_params(axis="x", colors=SUB_C) ax.set_xticks(levels) # Shared legend handles = [ plt.Line2D( [0], [0], color=DJANGO_C, linewidth=2, marker="o", markersize=5, label=f"{FW_D} p50", ), plt.Line2D( [0], [0], color=DJANGO_C, linewidth=1.2, marker="o", markersize=4, linestyle="--", alpha=0.55, label=f"{FW_D} p95", ), plt.Line2D( [0], [0], color=PHOENIX_C, linewidth=2, marker="s", markersize=5, label=f"{FW_P} p50", ), plt.Line2D( [0], [0], color=PHOENIX_C, linewidth=1.2, marker="s", markersize=4, linestyle="--", alpha=0.55, label=f"{FW_P} p95", ), ] fig.legend( handles=handles, loc="upper center", ncol=4, frameon=True, facecolor=LEGEND_BG, edgecolor=LEGEND_ED, labelcolor=TEXT_C, fontsize=9, bbox_to_anchor=(0.5, 1.06), ) fig.text( 0.5, 1.10, "Concurrency — latency under load", ha="center", color=TEXT_C, fontsize=14, fontweight="bold", ) fig.text( 0.5, 1.03, "ms · lower is better · solid = p50 · dashed = p95", ha="center", color=SUB_C, fontsize=9, ) fig.tight_layout() out = CHARTS_DIR / "concurrency.png" fig.savefig(out, dpi=160, bbox_inches="tight", facecolor=BG) plt.close(fig) print(f"Saved: {out}") def main(): CHARTS_DIR.mkdir(parents=True, exist_ok=True) rows = load_latest_csv(RESULTS_DIR) data = aggregate(rows) chart_latency( data, COMMON_KEYS, "Common scenarios — latency", "avg ms · lower is better · error bars = 1 std dev", "latency_common.png", ) chart_distribution( data, COMMON_KEYS, "Common scenarios — latency distribution", "per-iteration spread · lower is better", "distribution_common.png", ) chart_latency( data, EDGE_KEYS, "Edge case scenarios — latency", "avg ms · lower is better · error bars = 1 std dev", "latency_edge.png", ) chart_payload( data, ALL_KEYS, "WebSocket data received per action", "KB · lower is better · Phoenix sends diffs, Django sends full HTML", "payload.png", ) conc_rows = load_concurrency_csv(CONCURRENCY_CSV) chart_concurrency(conc_rows) print("All charts generated.") if __name__ == "__main__": main()