Files

532 lines
14 KiB
Python

"""Generate benchmark charts for the README."""
import csv
import os
from pathlib import Path
from collections import defaultdict
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
CONCURRENCY_CSV = Path(os.environ.get("CONCURRENCY_CSV", ""))
RESULTS_DIR = Path(
os.environ.get("RESULTS_DIR", Path(__file__).parent.parent / "results")
)
CHARTS_DIR = Path(os.environ.get("CHARTS_DIR", Path(__file__).parent.parent / "charts"))
DJANGO_C = "#2ECC71"
PHOENIX_C = "#9B59B6"
BG = "#0F1117"
GRID_C = "#1E2130"
TEXT_C = "#E8E8E8"
SUB_C = "#8892A4"
LEGEND_BG = "#1A1D2E"
LEGEND_ED = "#2E3250"
SCENARIO_LABELS = {
"add_alert": "Add alert",
"delete_alert": "Delete alert",
"search_filter": "Search / filter",
"large_list_add": "Large list\n(500 items)",
"rapid_fire": "Rapid fire\n(5 clicks)",
"empty_search": "Empty search",
}
COMMON_KEYS = ["add_alert", "delete_alert", "search_filter"]
EDGE_KEYS = ["large_list_add", "rapid_fire", "empty_search"]
ALL_KEYS = COMMON_KEYS + EDGE_KEYS
FW_D = "Django LiveView"
FW_P = "Phoenix LiveView"
def load_latest_csv(results_dir: Path) -> list[dict]:
csvs = sorted(results_dir.glob("results_*.csv"))
if not csvs:
raise FileNotFoundError(f"No results CSV found in {results_dir}")
latest = csvs[-1]
print(f"Using: {latest.name}")
with open(latest) as f:
return list(csv.DictReader(f))
def aggregate(rows: list[dict]) -> dict:
data: dict = defaultdict(
lambda: defaultdict(lambda: {"ms": [], "sent": [], "recv": []})
)
for r in rows:
data[r["framework"]][r["scenario_key"]]["ms"].append(float(r["ms"]))
data[r["framework"]][r["scenario_key"]]["sent"].append(int(r["ws_sent_b"]))
data[r["framework"]][r["scenario_key"]]["recv"].append(int(r["ws_recv_b"]))
return data
def _base_ax(fig, ax):
fig.patch.set_facecolor(BG)
ax.set_facecolor(BG)
for spine in ax.spines.values():
spine.set_visible(False)
ax.yaxis.grid(True, color=GRID_C, linewidth=1, zorder=0)
ax.set_axisbelow(True)
ax.tick_params(colors=SUB_C, length=0)
def _legend(ax):
ax.legend(
loc="upper right",
frameon=True,
facecolor=LEGEND_BG,
edgecolor=LEGEND_ED,
labelcolor=TEXT_C,
fontsize=10,
handlelength=1.2,
handleheight=0.9,
)
def _bar_labels(ax, bars, vals, y_offset):
for bar, val in zip(bars, vals):
ax.text(
bar.get_x() + bar.get_width() / 2,
bar.get_height() + y_offset,
f"{val:.1f}",
ha="center",
va="bottom",
color=TEXT_C,
fontsize=9,
fontweight="bold",
)
def _title(ax, title, subtitle):
ax.set_title(
title, color=TEXT_C, fontsize=14, fontweight="bold", pad=20, loc="left"
)
ax.text(0, 1.02, subtitle, transform=ax.transAxes, color=SUB_C, fontsize=9)
def chart_latency(
data: dict, keys: list[str], title: str, subtitle: str, filename: str
):
labels = [SCENARIO_LABELS[k] for k in keys]
d_avg = [np.mean(data[FW_D][k]["ms"]) for k in keys]
p_avg = [np.mean(data[FW_P][k]["ms"]) for k in keys]
d_std = [np.std(data[FW_D][k]["ms"]) for k in keys]
p_std = [np.std(data[FW_P][k]["ms"]) for k in keys]
fig, ax = plt.subplots(figsize=(9, 5))
_base_ax(fig, ax)
x, w = np.arange(len(keys)), 0.32
bars_d = ax.bar(
x - w / 2, d_avg, w, color=DJANGO_C, label=FW_D, zorder=3, linewidth=0
)
bars_p = ax.bar(
x + w / 2, p_avg, w, color=PHOENIX_C, label=FW_P, zorder=3, linewidth=0
)
ax.errorbar(
x - w / 2,
d_avg,
yerr=d_std,
fmt="none",
ecolor="white",
elinewidth=1.2,
capsize=4,
capthick=1.2,
zorder=4,
)
ax.errorbar(
x + w / 2,
p_avg,
yerr=p_std,
fmt="none",
ecolor="white",
elinewidth=1.2,
capsize=4,
capthick=1.2,
zorder=4,
)
y_offset = max(d_std + p_std) + 2
_bar_labels(ax, bars_d, d_avg, y_offset)
_bar_labels(ax, bars_p, p_avg, y_offset)
ax.set_xticks(x)
ax.set_xticklabels([""] * len(keys))
for xi, label in zip(x, labels):
ax.text(
xi,
-max(d_avg + p_avg) * 0.1,
label,
ha="center",
va="top",
color=SUB_C,
fontsize=10,
linespacing=1.6,
)
ax.set_ylabel("Latency (ms)", color=SUB_C, fontsize=11, labelpad=12)
ax.tick_params(axis="y", colors=SUB_C)
ax.set_ylim(0, max(d_avg + p_avg) * 1.35)
_legend(ax)
_title(ax, title, subtitle)
fig.tight_layout(rect=[0, 0.08, 1, 1])
out = CHARTS_DIR / filename
fig.savefig(out, dpi=160, bbox_inches="tight", facecolor=BG)
plt.close(fig)
print(f"Saved: {out}")
def chart_payload(
data: dict, keys: list[str], title: str, subtitle: str, filename: str
):
labels = [SCENARIO_LABELS[k] for k in keys]
d_recv = [np.mean(data[FW_D][k]["recv"]) / 1024 for k in keys]
p_recv = [np.mean(data[FW_P][k]["recv"]) / 1024 for k in keys]
fig, ax = plt.subplots(figsize=(11, 5))
_base_ax(fig, ax)
x, w = np.arange(len(keys)), 0.32
bars_d = ax.bar(
x - w / 2, d_recv, w, color=DJANGO_C, label=FW_D, zorder=3, linewidth=0
)
bars_p = ax.bar(
x + w / 2, p_recv, w, color=PHOENIX_C, label=FW_P, zorder=3, linewidth=0
)
y_offset = max(d_recv + p_recv) * 0.02
for bar, val in zip((*bars_d, *bars_p), (*d_recv, *p_recv)):
label = f"{val:.1f}" if val < 10 else f"{val:.0f}"
ax.text(
bar.get_x() + bar.get_width() / 2,
bar.get_height() + y_offset,
label,
ha="center",
va="bottom",
color=TEXT_C,
fontsize=9,
fontweight="bold",
)
ax.set_xticks(x)
ax.set_xticklabels([""] * len(keys))
for xi, label in zip(x, labels):
ax.text(
xi,
-max(d_recv + p_recv) * 0.08,
label,
ha="center",
va="top",
color=SUB_C,
fontsize=10,
linespacing=1.6,
)
ax.set_ylabel("Data received (KB)", color=SUB_C, fontsize=11, labelpad=12)
ax.tick_params(axis="y", colors=SUB_C)
ax.set_ylim(0, max(d_recv + p_recv) * 1.3)
_legend(ax)
_title(ax, title, subtitle)
fig.tight_layout(rect=[0, 0.08, 1, 1])
out = CHARTS_DIR / filename
fig.savefig(out, dpi=160, bbox_inches="tight", facecolor=BG)
plt.close(fig)
print(f"Saved: {out}")
def chart_distribution(
data: dict, keys: list[str], title: str, subtitle: str, filename: str
):
labels = [SCENARIO_LABELS[k] for k in keys]
colors = [DJANGO_C, PHOENIX_C]
fig, axes = plt.subplots(1, len(keys), figsize=(10, 5), sharey=False)
fig.patch.set_facecolor(BG)
for ax, key, label in zip(axes, keys, labels):
ax.set_facecolor(BG)
for spine in ax.spines.values():
spine.set_visible(False)
ax.yaxis.grid(True, color=GRID_C, linewidth=1, zorder=0)
ax.set_axisbelow(True)
d_ms = data[FW_D][key]["ms"]
p_ms = data[FW_P][key]["ms"]
bp = ax.boxplot(
[d_ms, p_ms],
patch_artist=True,
widths=0.4,
medianprops=dict(color="white", linewidth=2),
whiskerprops=dict(color=SUB_C, linewidth=1),
capprops=dict(color=SUB_C, linewidth=1),
flierprops=dict(
marker="o",
markerfacecolor=SUB_C,
markersize=4,
alpha=0.6,
linestyle="none",
),
)
for box, color in zip(bp["boxes"], colors):
box.set_facecolor(color)
box.set_linewidth(0)
ax.set_title(label, color=TEXT_C, fontsize=11, fontweight="bold", pad=10)
ax.set_xticklabels(["Django", "Phoenix"], color=SUB_C, fontsize=9)
ax.tick_params(colors=SUB_C, length=0)
ax.set_ylabel("ms" if ax is axes[0] else "", color=SUB_C, fontsize=10)
ax.tick_params(axis="y", colors=SUB_C)
django_patch = mpatches.Patch(color=DJANGO_C, label=FW_D)
phoenix_patch = mpatches.Patch(color=PHOENIX_C, label=FW_P)
fig.legend(
handles=[django_patch, phoenix_patch],
loc="upper center",
ncol=2,
frameon=True,
facecolor=LEGEND_BG,
edgecolor=LEGEND_ED,
labelcolor=TEXT_C,
fontsize=10,
bbox_to_anchor=(0.5, 1.04),
)
fig.text(
0.5, 1.08, title, ha="center", color=TEXT_C, fontsize=14, fontweight="bold"
)
fig.text(0.5, 1.02, subtitle, ha="center", color=SUB_C, fontsize=9)
fig.tight_layout()
out = CHARTS_DIR / filename
fig.savefig(out, dpi=160, bbox_inches="tight", facecolor=BG)
plt.close(fig)
print(f"Saved: {out}")
def load_concurrency_csv(path: Path) -> list[dict]:
"""Load a concurrency CSV produced by concurrency_test.py."""
if not path or not path.exists():
csvs = sorted(RESULTS_DIR.glob("concurrency_*.csv"))
if not csvs:
return []
path = csvs[-1]
print(f"Using concurrency: {path.name}")
with open(path) as f:
return list(csv.DictReader(f))
def chart_concurrency(rows: list[dict]) -> None:
"""Two-panel line chart (add + search) showing p50 + p95 vs concurrency."""
if not rows:
print("No concurrency data, skipping chart.")
return
scenarios = ["add_alert", "search_filter"]
scenario_titles = ["Add alert", "Search / filter"]
levels = [1, 5, 10, 25, 50]
def extract(fw: str, scenario: str, col: str) -> list[float]:
return [
float(r[col])
for r in rows
if r["framework"] == fw and r["scenario"] == scenario
]
fig, axes = plt.subplots(1, 2, figsize=(10, 5))
fig.patch.set_facecolor(BG)
for ax, scenario, title in zip(axes, scenarios, scenario_titles):
ax.set_facecolor(BG)
for spine in ax.spines.values():
spine.set_visible(False)
ax.yaxis.grid(True, color=GRID_C, linewidth=1, zorder=0)
ax.set_axisbelow(True)
ax.tick_params(colors=SUB_C, length=0)
d50 = extract(FW_D, scenario, "p50_ms")
d95 = extract(FW_D, scenario, "p95_ms")
p50 = extract(FW_P, scenario, "p50_ms")
p95 = extract(FW_P, scenario, "p95_ms")
ax.plot(
levels,
d50,
color=DJANGO_C,
marker="o",
linewidth=2,
markersize=5,
label="Django p50",
zorder=3,
)
ax.plot(
levels,
d95,
color=DJANGO_C,
marker="o",
linewidth=1.2,
markersize=4,
linestyle="--",
alpha=0.55,
label="Django p95",
zorder=3,
)
ax.plot(
levels,
p50,
color=PHOENIX_C,
marker="s",
linewidth=2,
markersize=5,
label="Phoenix p50",
zorder=3,
)
ax.plot(
levels,
p95,
color=PHOENIX_C,
marker="s",
linewidth=1.2,
markersize=4,
linestyle="--",
alpha=0.55,
label="Phoenix p95",
zorder=3,
)
ax.set_title(title, color=TEXT_C, fontsize=12, fontweight="bold", pad=10)
ax.set_xlabel("Concurrent clients", color=SUB_C, fontsize=10, labelpad=8)
if ax is axes[0]:
ax.set_ylabel("Latency (ms)", color=SUB_C, fontsize=10, labelpad=8)
ax.tick_params(axis="y", colors=SUB_C)
ax.tick_params(axis="x", colors=SUB_C)
ax.set_xticks(levels)
# Shared legend
handles = [
plt.Line2D(
[0],
[0],
color=DJANGO_C,
linewidth=2,
marker="o",
markersize=5,
label=f"{FW_D} p50",
),
plt.Line2D(
[0],
[0],
color=DJANGO_C,
linewidth=1.2,
marker="o",
markersize=4,
linestyle="--",
alpha=0.55,
label=f"{FW_D} p95",
),
plt.Line2D(
[0],
[0],
color=PHOENIX_C,
linewidth=2,
marker="s",
markersize=5,
label=f"{FW_P} p50",
),
plt.Line2D(
[0],
[0],
color=PHOENIX_C,
linewidth=1.2,
marker="s",
markersize=4,
linestyle="--",
alpha=0.55,
label=f"{FW_P} p95",
),
]
fig.legend(
handles=handles,
loc="upper center",
ncol=4,
frameon=True,
facecolor=LEGEND_BG,
edgecolor=LEGEND_ED,
labelcolor=TEXT_C,
fontsize=9,
bbox_to_anchor=(0.5, 1.06),
)
fig.text(
0.5,
1.10,
"Concurrency — latency under load",
ha="center",
color=TEXT_C,
fontsize=14,
fontweight="bold",
)
fig.text(
0.5,
1.03,
"ms · lower is better · solid = p50 · dashed = p95",
ha="center",
color=SUB_C,
fontsize=9,
)
fig.tight_layout()
out = CHARTS_DIR / "concurrency.png"
fig.savefig(out, dpi=160, bbox_inches="tight", facecolor=BG)
plt.close(fig)
print(f"Saved: {out}")
def main():
CHARTS_DIR.mkdir(parents=True, exist_ok=True)
rows = load_latest_csv(RESULTS_DIR)
data = aggregate(rows)
chart_latency(
data,
COMMON_KEYS,
"Common scenarios — latency",
"avg ms · lower is better · error bars = 1 std dev",
"latency_common.png",
)
chart_distribution(
data,
COMMON_KEYS,
"Common scenarios — latency distribution",
"per-iteration spread · lower is better",
"distribution_common.png",
)
chart_latency(
data,
EDGE_KEYS,
"Edge case scenarios — latency",
"avg ms · lower is better · error bars = 1 std dev",
"latency_edge.png",
)
chart_payload(
data,
ALL_KEYS,
"WebSocket data received per action",
"KB · lower is better · Phoenix sends diffs, Django sends full HTML",
"payload.png",
)
conc_rows = load_concurrency_csv(CONCURRENCY_CSV)
chart_concurrency(conc_rows)
print("All charts generated.")
if __name__ == "__main__":
main()