Skip to content

Commit

Permalink
Enable arbitrary commands to be run on cluster mode
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecosta90 committed Mar 14, 2022
1 parent 18d2646 commit e3cac2f
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 50 deletions.
36 changes: 16 additions & 20 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,62 +12,56 @@ jobs:
- uses: actions/checkout@v2
- name: Install dependencies
run: |
sudo apt-get -qq update
sudo apt-get -qq update
sudo apt-get install autoconf automake pkg-config libevent-dev libpcre3-dev libssl-dev
- name: Build
run: autoreconf -ivf && ./configure && make
run: autoreconf -ivf && ./configure && make -j
- name: Setup Python
uses: actions/setup-python@v1
uses: actions/setup-python@v2
with:
python-version: '3.6'
architecture: x64

- name: Cache pip
uses: actions/cache@v1
with:
path: ~/.cache/pip # This path is specific to Ubuntu
# Look to see if there is a cache hit for the corresponding requirements file
key: ${{ runner.os }}-pip-${{ hashFiles('tests/test_requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
${{ runner.os }}-
- name: Install Python dependencies
run: pip install -r tests/test_requirements.txt
run: pip install -r ./tests/test_requirements.txt

- name: Cache Redis
id: cache-redis
uses: actions/cache@v1
with:
path: /home/runner/work/memtier_benchmark/memtier_benchmark/redis
path: /home/runner/work/redis
key: ${{ runner.os }}-redis

- name: Install Redis Server test dependencies
if: steps.cache-redis.outputs.cache-hit != 'true'
run: |
git clone git://github.com/antirez/redis.git --branch unstable
git clone git://github.com/antirez/redis.git --branch 6.2.2
cd redis
make BUILD_TLS=yes
make BUILD_TLS=yes -j
./utils/gen-test-certs.sh
./src/redis-server --version
cd ..
- name: Test OSS TCP
timeout-minutes: 10
run: |
cd tests
MEMTIER_BINARY=./../memtier_benchmark \
python3 -m RLTest \
RLTest \
--env oss -v --clear-logs \
--oss-redis-path ../redis/src/redis-server
cd ..
- name: Test OSS TCP TLS
if: matrix.platform == 'ubuntu-latest'
timeout-minutes: 10
run: |
cd tests
TLS_CERT=../redis/tests/tls/redis.crt \
TLS_KEY=../redis/tests/tls/redis.key \
TLS_CACERT=../redis/tests/tls/ca.crt \
MEMTIER_BINARY=../memtier_benchmark \
python3 -m RLTest \
RLTest \
--env oss -v --clear-logs \
--oss-redis-path ../redis/src/redis-server \
--tls-cert-file ../redis/tests/tls/redis.crt \
Expand All @@ -77,23 +71,25 @@ jobs:
cd ..
- name: Test OSS-CLUSTER TCP
timeout-minutes: 10
run: |
cd tests
MEMTIER_BINARY=./../memtier_benchmark \
python3 -m RLTest \
RLTest \
--env oss-cluster -v --clear-logs --shards-count 3 \
--oss-redis-path ../redis/src/redis-server
cd ..
- name: Test OSS-CLUSTER TCP TLS
timeout-minutes: 10
if: matrix.platform == 'ubuntu-latest'
run: |
cd tests
TLS_CERT=../redis/tests/tls/redis.crt \
TLS_KEY=../redis/tests/tls/redis.key \
TLS_CACERT=../redis/tests/tls/ca.crt \
MEMTIER_BINARY=../memtier_benchmark \
python3 -m RLTest \
RLTest \
--env oss-cluster --shards-count 3 -v --clear-logs \
--oss-redis-path ../redis/src/redis-server \
--tls-cert-file ../redis/tests/tls/redis.crt \
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ stamp-h1
# memtier outputs
*.hgrm
*.txt
!/tests/test_requirements.txt
__pycache__
101 changes: 85 additions & 16 deletions cluster_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,54 @@ bool cluster_client::get_key_for_conn(unsigned int conn_id, int iter, unsigned l
}
}


void cluster_client::create_arbitrary_request(const arbitrary_command* cmd, struct timeval& timestamp, unsigned int conn_id) {
int cmd_size = 0;

benchmark_debug_log("%s [%s]:\n", cmd->command_name.c_str(), cmd->command.c_str());

for (unsigned int i = 0; i < cmd->command_args.size(); i++) {
const command_arg* arg = &cmd->command_args[i];

if (arg->type == const_type) {
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg);
} else if (arg->type == key_type) {
unsigned long long key_index;

// get key
if (!get_key_for_conn(conn_id, get_arbitrary_obj_iter_type(cmd, m_executed_command_index), &key_index)) {
return;
}

assert(key_index >= 0);
assert(m_key_len > 0);

cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, m_key_buffer, m_key_len);
} else if (arg->type == data_type) {
unsigned int value_len;
const char *value = m_obj_gen->get_value(0, &value_len);

assert(value != NULL);
assert(value_len > 0);

cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, value, value_len);
}
}

m_connections[conn_id]->send_arbitrary_command_end(m_executed_command_index, &timestamp, cmd_size);
}

// This function could use some urgent TLC -- but we need to do it without altering the behavior
void cluster_client::create_request(struct timeval timestamp, unsigned int conn_id)
{
// are we using arbitrary command?
if (m_config->arbitrary_commands->is_defined()) {
const arbitrary_command* executed_command = m_config->arbitrary_commands->get_next_executed_command(m_arbitrary_command_ratio_count,
m_executed_command_index);
create_arbitrary_request(executed_command, timestamp, conn_id);
return;
}

// If the Set:Wait ratio is not 0, start off with WAITs
if (m_config->wait_ratio.b &&
(m_tot_wait_ops == 0 ||
Expand Down Expand Up @@ -416,16 +461,28 @@ void cluster_client::create_request(struct timeval timestamp, unsigned int conn_
void cluster_client::handle_moved(unsigned int conn_id, struct timeval timestamp,
request *request, protocol_response *response) {
// update stats
if (request->m_type == rt_get) {
m_stats.update_moved_get_op(&timestamp,
switch (request->m_type) {
case rt_get:
m_stats.update_moved_get_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
} else if (request->m_type == rt_set) {
m_stats.update_moved_set_op(&timestamp,
break;
case rt_set:
m_stats.update_moved_set_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
} else {
assert(0);
break;
case rt_arbitrary: {
arbitrary_request *ar = static_cast<arbitrary_request *>(request);
m_stats.update_moved_arbitrary_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp),
ar->index);
break;
}
default:
assert(0);
break;
}

// connection already issued 'cluster slots' command, wait for slots mapping to be updated
Expand All @@ -444,16 +501,28 @@ void cluster_client::handle_moved(unsigned int conn_id, struct timeval timestamp
void cluster_client::handle_ask(unsigned int conn_id, struct timeval timestamp,
request *request, protocol_response *response) {
// update stats
if (request->m_type == rt_get) {
m_stats.update_ask_get_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
} else if (request->m_type == rt_set) {
m_stats.update_ask_set_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
} else {
assert(0);
switch (request->m_type) {
case rt_get:
m_stats.update_ask_get_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
break;
case rt_set:
m_stats.update_ask_set_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
break;
case rt_arbitrary: {
arbitrary_request *ar = static_cast<arbitrary_request *>(request);
m_stats.update_ask_arbitrary_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp),
ar->index);
break;
}
default:
assert(0);
break;
}
}

Expand Down
1 change: 1 addition & 0 deletions cluster_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class cluster_client : public client {

// client manager api's
virtual void handle_cluster_slots(protocol_response *r);
virtual void create_arbitrary_request(const arbitrary_command* cmd, struct timeval& timestamp, unsigned int conn_id);
virtual void create_request(struct timeval timestamp, unsigned int conn_id);
virtual bool hold_pipeline(unsigned int conn_id);
virtual void handle_response(unsigned int conn_id, struct timeval timestamp,
Expand Down
3 changes: 0 additions & 3 deletions memtier_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,6 @@ static bool verify_cluster_option(struct benchmark_config *cfg) {
} else if (cfg->unix_socket) {
fprintf(stderr, "error: cluster mode dose not support unix-socket option.\n");
return false;
} else if (cfg->arbitrary_commands->is_defined()) {
fprintf(stderr, "error: cluster mode dose not support arbitrary command option.\n");
return false;
}

return true;
Expand Down
47 changes: 41 additions & 6 deletions run_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,17 @@ void run_stats::update_moved_set_op(struct timeval* ts, unsigned int bytes, unsi
hdr_record_value(m_set_latency_histogram,latency);
}

void run_stats::update_moved_arbitrary_op(struct timeval *ts, unsigned int bytes,
unsigned int latency, size_t request_index) {
roll_cur_stats(ts);

m_cur_stats.m_ar_commands.at(request_index).update_moved_op(bytes, latency);
m_totals.update_op(bytes, latency);

struct hdr_histogram* hist = m_ar_commands_latency_histograms.at(request_index);
hdr_record_value(hist,latency);
}

void run_stats::update_ask_get_op(struct timeval* ts, unsigned int bytes, unsigned int latency)
{
roll_cur_stats(ts);
Expand All @@ -214,6 +225,17 @@ void run_stats::update_ask_set_op(struct timeval* ts, unsigned int bytes, unsign
hdr_record_value(m_set_latency_histogram,latency);
}

void run_stats::update_ask_arbitrary_op(struct timeval *ts, unsigned int bytes,
unsigned int latency, size_t request_index) {
roll_cur_stats(ts);

m_cur_stats.m_ar_commands.at(request_index).update_ask_op(bytes, latency);
m_totals.update_op(bytes, latency);

struct hdr_histogram* hist = m_ar_commands_latency_histograms.at(request_index);
hdr_record_value(hist,latency);
}

void run_stats::update_wait_op(struct timeval *ts, unsigned int latency)
{
roll_cur_stats(ts);
Expand Down Expand Up @@ -975,11 +997,18 @@ void run_stats::print_moved_sec_column(output_table &table) {

column.elements.push_back(*el.init_str("%12s ", "MOVED/sec"));
column.elements.push_back(*el.init_str("%s", "-------------"));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_set_cmd.m_moved_sec));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_get_cmd.m_moved_sec));
column.elements.push_back(*el.init_str("%12s ", "---"));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_moved_sec));

if (print_arbitrary_commands_results()) {
for (unsigned int i=0; i<m_totals.m_ar_commands.size(); i++) {
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ar_commands[i].m_moved_sec));
}
} else {
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_set_cmd.m_moved_sec));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_get_cmd.m_moved_sec));
column.elements.push_back(*el.init_str("%12s ", "---"));

}
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_moved_sec));
table.add_column(column);
}

Expand All @@ -989,11 +1018,17 @@ void run_stats::print_ask_sec_column(output_table &table) {

column.elements.push_back(*el.init_str("%12s ", "ASK/sec"));
column.elements.push_back(*el.init_str("%s", "-------------"));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_set_cmd.m_ask_sec));
if (print_arbitrary_commands_results()) {
for (unsigned int i=0; i<m_totals.m_ar_commands.size(); i++) {
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ar_commands[i].m_ask_sec));
}
} else {
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_set_cmd.m_ask_sec));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_get_cmd.m_ask_sec));
column.elements.push_back(*el.init_str("%12s ", "---"));
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ask_sec));

}
column.elements.push_back(*el.init_double("%12.2f ", m_totals.m_ask_sec));
table.add_column(column);
}

Expand Down
4 changes: 4 additions & 0 deletions run_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,13 @@ class run_stats {

void update_moved_get_op(struct timeval* ts, unsigned int bytes, unsigned int latency);
void update_moved_set_op(struct timeval* ts, unsigned int bytes, unsigned int latency);
void update_moved_arbitrary_op(struct timeval *ts, unsigned int bytes,
unsigned int latency, size_t arbitrary_index);

void update_ask_get_op(struct timeval* ts, unsigned int bytes, unsigned int latency);
void update_ask_set_op(struct timeval* ts, unsigned int bytes, unsigned int latency);
void update_ask_arbitrary_op(struct timeval *ts, unsigned int bytes,
unsigned int latency, size_t arbitrary_index);

void update_wait_op(struct timeval* ts, unsigned int latency);
void update_arbitrary_op(struct timeval *ts, unsigned int bytes,
Expand Down
9 changes: 5 additions & 4 deletions tests/include.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ def addTLSArgs(benchmark_specs, env):
benchmark_specs['args'].append('--cacert={}'.format(TLS_CACERT))


def get_default_memtier_config():
def get_default_memtier_config(threads=10, clients=5, requests=1000):
config = {
"memtier_benchmark": {
"binary": MEMTIER_BINARY,
"threads": 10,
"clients": 5,
"requests": 1000
"threads": threads,
"clients": clients,
"requests": requests
},
}
return config
Expand All @@ -106,3 +106,4 @@ def ensure_clean_benchmark_folder(dirname):
if os.path.exists(dirname):
os.removedirs(dirname)
os.makedirs(dirname)

Loading

0 comments on commit e3cac2f

Please sign in to comment.