Skip to content

Commit

Permalink
improve analysis script
Browse files Browse the repository at this point in the history
  • Loading branch information
Kiibou-chan committed Nov 14, 2024
1 parent 9f8ef41 commit 47831a2
Show file tree
Hide file tree
Showing 17 changed files with 154 additions and 47 deletions.
147 changes: 115 additions & 32 deletions Modules/Examples/Protocol Benchmarks/analysis/Protocol Benchmarks.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"import os\n",
"from functools import *\n",
Expand All @@ -35,29 +36,45 @@
"try:\n",
" files = list_files(os.environ['BENCH_RESULTS_DIR'] + '/*/*.csv')\n",
"except:\n",
" files = list_files(\"../bench-results/*/*.csv\")\n",
" files = list_files(\"../bench-results/*/*/*.csv\")\n",
"dfs = []\n",
"for file in files:\n",
" global parent\n",
" parent = Path(file).parent.name\n",
" df = pd.read_csv(file, delimiter=\";\")\n",
" df[\"runId\"] = parent\n",
" runId = Path(file).parent.name\n",
" system = Path(file).parent.parent.name\n",
" df = pd.read_csv(file, delimiter=\";\", dtype={'latency': 'float64', 'send-time': 'float64', 'receive-time': 'float64'})\n",
"\n",
" params = runId.split('-')\n",
" \n",
" df[\"run_config\"] = f\"{params[0]} {params[1]} {params[2]}\"\n",
" df[\"run_id\"] = params[3]\n",
" df[\"run\"] = runId\n",
" df[\"system\"] = system\n",
" df[\"index\"] = pd.Series(range(0, len(df)))\n",
" df['receive-time'] = df['receive-time'] - df['send-time'].min()\n",
" df['send-time'] = df['send-time'] - df['send-time'].min()\n",
" dfs.append(df)\n",
"\n",
"df = pd.concat(dfs)\n",
"df[\"latency\"] = df[\"latency\"] / 1000\n",
"df[\"latency\"] = (df[\"latency\"] / 1000)\n",
"df['send-time'] = df['send-time'] / 1000\n",
"df['receive-time'] = df['receive-time'] / 1000\n",
"df[\"unit\"] = \"ms\" # fix unit\n",
"\n",
"\n",
"for runId in df[\"runId\"].unique():\n",
" start_time = df[(df[\"runId\"] == runId)][\"send-time\"].min()\n",
" \n",
" df.loc[df[\"runId\"] == runId, \"send-time\"] = (df.loc[df[\"runId\"] == runId, \"send-time\"] - start_time) / 1000\n",
" df.loc[df[\"runId\"] == runId, \"receive-time\"] = (df.loc[df[\"runId\"] == runId, \"receive-time\"] - start_time) / 1000\n",
"\n",
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3f937b89-a8a8-41b5-8df6-2be81920589e",
"metadata": {},
"outputs": [],
"source": [
"latency_df = df[['system', 'run_config', 'latency', 'index', 'name']] \\\n",
" .groupby(['system', 'run_config', 'index', 'name']) \\\n",
" .mean().reset_index().set_index('index')"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -67,16 +84,43 @@
"source": [
"fig, ax = plt.subplots(1, 1, figsize=(7,5))\n",
"\n",
"for runId in df[\"runId\"].unique():\n",
" for name in df[(df[\"runId\"] == runId)][\"name\"].unique():\n",
" grouped = df[(df[\"runId\"] == runId) & (df[\"name\"] == name)].groupby({x: x // 10000 for x in range(len(df))})[\"latency\"].mean().plot(ax=ax, label=f\"{runId} {name}\")\n",
"def plot_latency(ax, label: str, df: pd.DataFrame, group_size):\n",
" grouped_df = df.groupby({x: (x // group_size) * group_size for x in range(len(df))})\n",
" mean_latency = grouped_df.mean()\n",
" \n",
" mean_latency.plot(y='latency', ax=ax, label=label)\n",
" \n",
"ax.set_xlabel(\"number of queries in groups of 10000\")\n",
"\n",
"\n",
"for system in latency_df['system'].unique():\n",
" print(system)\n",
" system_df = latency_df[latency_df['system'] == system]\n",
" \n",
" for run_config in latency_df['run_config'].unique():\n",
" print(run_config)\n",
" run_df = system_df[system_df['run_config'] == run_config]\n",
" \n",
" plot_latency(ax, f\"{system} {run_config.split(' ')[1]}\", run_df[['latency']], 10000)\n",
"\n",
"ax.set_xlabel(\"number of queries\")\n",
"ax.set_ylabel(\"latency [ms]\")\n",
"\n",
"# ax.set_ylim([0,None])\n",
"\n",
"ax.legend()\n",
"\n",
"plt.savefig(fname=\"latency.png\", format=\"png\")"
"plt.savefig(fname=\"latency.png\", format=\"png\")\n",
"plt.savefig(fname=\"latency.pdf\", format=\"pdf\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5e99ad3f-78e7-4703-a37a-86a2676ad6a8",
"metadata": {},
"outputs": [],
"source": [
"df[\"seconds\"] = (df[\"receive-time\"] / 1000).apply(round)"
]
},
{
Expand All @@ -86,27 +130,56 @@
"metadata": {},
"outputs": [],
"source": [
"df[\"seconds\"] = (df[\"receive-time\"] / 1000).apply(round)\n",
"\n",
"fig, ax = plt.subplots(1, 1, figsize=(7,5))\n",
"\n",
"for runId in df[\"runId\"].unique():\n",
" filtered_df = df[(df[\"runId\"] == runId)]\n",
" \n",
" throughput = filtered_df.groupby(\"seconds\")[\"name\"].count()\n",
" throughput.plot(ax=ax, label=runId)\n",
" mean_throughput = throughput.quantile(0.5)\n",
" # print(f\"Mean Throughput for {runId}: {mean_throughput}\")\n",
"res_data = []\n",
"\n",
" # also_mean_throughput = filtered_df[\"name\"].count() / filtered_df[\"seconds\"].max()\n",
" # print(f\"Diff {mean_throughput - also_mean_throughput}\")\n",
"for system in df['system'].unique():\n",
" sys_df = df[df['system'] == system]\n",
" \n",
" ax.axhline(y=mean_throughput, color=ax.get_lines()[-1].get_color())\n",
" # ax.axhline(y=also_mean_throughput, color=ax.get_lines()[-1].get_color())\n",
" for run_id in sys_df[\"run_config\"].unique():\n",
" filtered_df = sys_df[(sys_df[\"run_config\"] == run_id)]\n",
" \n",
" throughput = filtered_df.groupby(\"seconds\")[\"name\"].count() / len(sys_df[sys_df['run_config'] == run_id]['run_id'].unique())\n",
" throughput.plot(ax=ax, label=f\"{system} {run_id.split(' ')[1]}\")\n",
" mean_throughput = throughput.quantile(0.5)\n",
" res_data.append([system, run_id.split(\" \")[1], mean_throughput])\n",
" print(f\"Mean Throughput for {system} {run_id}: {mean_throughput}\")\n",
" \n",
" # also_mean_throughput = filtered_df[\"name\"].count() / filtered_df[\"seconds\"].max()\n",
" # print(f\"Diff {mean_throughput - also_mean_throughput}\")\n",
" \n",
" ax.axhline(y=mean_throughput, color=ax.get_lines()[-1].get_color())\n",
" # ax.axhline(y=also_mean_throughput, color=ax.get_lines()[-1].get_color())\n",
"\n",
"ax.legend()\n",
"\n",
"plt.savefig(fname=\"throughput.png\", format=\"png\")"
"ax.set_xlabel('time [s]')\n",
"ax.set_ylabel('throughput [ops/s]')\n",
"\n",
"plt.savefig(fname=\"throughput.pdf\", format=\"pdf\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dd920bb9-b5b8-47f1-8ce0-fc8caa33425c",
"metadata": {},
"outputs": [],
"source": [
"fig, ax = plt.subplots(1, 1, figsize=(7,5))\n",
"\n",
"res_df = pd.DataFrame(data=res_data, columns=['system', 'type', 'throughput'])\n",
"\n",
"etcd_df = res_df[res_df.system == 'etcd'][['type', 'throughput']]\n",
"pb_df = res_df[res_df.system == 'pb'][['type', 'throughput']]\n",
"\n",
"pd.DataFrame(data={'type': etcd_df.type, 'etcd': list(etcd_df.throughput), 'pb': list(pb_df.throughput)}).set_index('type').plot.bar(ax=ax)\n",
"\n",
"ax.set_ylabel('throughput [ops/s]')\n",
"ax.set_xlabel('')\n",
"\n",
"fig.savefig(fname=\"throughput_comp.pdf\", format=\"pdf\")"
]
},
{
Expand All @@ -115,6 +188,16 @@
"id": "c0dc468c-0f71-4426-99f2-cd3272a395e8",
"metadata": {},
"outputs": [],
"source": [
"df.groupby(['system', 'run_config', 'run_id'])['send-time'].max() / 1000"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9526da1b-a2f8-4930-a072-7e9297b0decc",
"metadata": {},
"outputs": [],
"source": []
}
],
Expand Down
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
multiput key%n value%n 10000
benchmark
multiget key%n 10000
mixed 1 10000 1_000_000
save-benchmark
exit
2 changes: 1 addition & 1 deletion Modules/Examples/Protocol Benchmarks/args/bench-put-100k
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
multiput warmup warmup%n 1000
multiput warmup warmup%n 10000
benchmark
multiput key%n value%n 100000
save-benchmark
Expand Down
2 changes: 1 addition & 1 deletion Modules/Examples/Protocol Benchmarks/args/bench-put-1m
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
multiput warmup warmup%n 1000
multiput warmup warmup%n 10000
benchmark
multiput key%n value%n 1000000
save-benchmark
Expand Down
2 changes: 1 addition & 1 deletion Modules/Examples/Protocol Benchmarks/args/bench-put-300k
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
multiput warmup warmup%n 1000
multiput warmup warmup%n 10000
benchmark
multiput key%n value%n 300000
save-benchmark
Expand Down
5 changes: 5 additions & 0 deletions Modules/Examples/Protocol Benchmarks/args/bench-put-500k
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
multiput warmup warmup%n 10000
benchmark
multiput key%n value%n 500000
save-benchmark
exit
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Client1;localhost:8010;bench-mixed-1m
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Client1;localhost:8010;bench-put-500k
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ class Node(val name: Uid, val initialClusterIds: Set[Uid]) {

val delta = newState.upkeep()
val upkept: ClusterState = newState.merge(delta)
val end = System.nanoTime()
timeStep("upkeep + merge")

if !(upkept <= newState) || upkept.counter > newState.counter then {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import probench.clients.{ClientCLI, EtcdClient, ProBenchClient}
import probench.data.{ClientNodeState, KVOperation, Request}
import rdts.base.Uid
import rdts.datatypes.experiments.protocols.Membership
import rdts.datatypes.experiments.protocols.simplified.Paxos
import rdts.datatypes.experiments.protocols.simplified.{GeneralizedPaxos, Paxos}

import java.net.{DatagramSocket, InetSocketAddress}
import java.util.Timer
Expand Down Expand Up @@ -49,7 +49,10 @@ object cli {

given JsonValueCodec[ClientNodeState] = JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true))

given JsonValueCodec[Membership[Request, Paxos, Paxos]] =
given paxosMembership: JsonValueCodec[Membership[Request, Paxos, Paxos]] =
JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true))

given genPaxosMembership: JsonValueCodec[Membership[Request, GeneralizedPaxos, GeneralizedPaxos]] =
JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true))

def socketPath(host: String, port: Int) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ trait Client(name: Uid) {
println(s"Did $times put queries in ${(System.nanoTime() - start) / 1_000_000}ms")
}

def mixed(min: Int, max: Int, times: Int = 1): Unit = {
val start = System.nanoTime()
for i <- 1 to times do {
val num = Math.round(Math.random() * (max - min) + min).toInt
if Math.random() > 0.5 then
read(f"key$num")
else
write(f"key$num",f"value$num")
}
println(s"Did $times mixed queries in ${(System.nanoTime() - start) / 1_000_000}ms")
}

def handleOp(op: KVOperation[String, String]): Unit = {
val start = if doBenchmark then System.nanoTime() else 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,30 @@ class ClientCLI(name: Uid, client: Client) {
private val put: Regex = """put ([\w%]+) ([\w%]+)""".r
private val multiget: Regex = """multiget ([\w%]+) ([\d_]+)""".r
private val multiput: Regex = """multiput ([\w%]+) ([\w%]+) ([\d_]+)""".r
private val mixed: Regex = """mixed ([\d_]+) ([\d_]+) ([\d_]+)""".r
private val mp: Regex = """mp ([\d_]+)""".r
private val benchmark: Regex = """benchmark""".r
private val saveBenchmark: Regex = """save-benchmark""".r

private def parseInt(str: String): Int = str.replace("_", "").toInt

def startCLI(): Unit = {
var running = true
while running do {
print("client> ")
val line = Option(readLine()).map(_.strip())
line match {
case Some(commented()) => // ignore
case Some(get(key)) => client.read(key)
case Some(put(key, value)) => client.write(key, value)
case Some(multiget(key, times)) => client.multiget(key, times.replace("_", "").toInt)
case Some(multiput(key, value, times)) => client.multiput(key, value, times.replace("_", "").toInt)
case Some(mp(times)) => client.multiput("key%n", "value%n", times.replace("_", "").toInt)
case Some(commented()) => // ignore
case Some(get(key)) => client.read(key)
case Some(put(key, value)) => client.write(key, value)
case Some(multiget(key, times)) => client.multiget(key, parseInt(times))
case Some(multiput(key, value, times)) => client.multiput(key, value, parseInt(times))
case Some(mp(times)) => client.multiput("key%n", "value%n", parseInt(times))
case Some(mixed(min, max, times)) => client.mixed(parseInt(min), parseInt(max), parseInt(times))
case Some(benchmark()) =>
client.doBenchmark = true
case Some(saveBenchmark()) =>
val env = System.getenv()

val env = System.getenv()
val runId = env.getOrDefault("RUN_ID", Uid.gen().delegate)
val benchmarkPath = Path.of(env.getOrDefault("BENCH_RESULTS_DIR", "bench-results")).resolve(runId)
val writer = new CSVWriter(";", benchmarkPath, s"${name.delegate}-$runId", BenchmarkData.header)
Expand Down

0 comments on commit 47831a2

Please sign in to comment.