Skip to content

Commit

Permalink
update logs and add read_bench
Browse files Browse the repository at this point in the history
  • Loading branch information
baotiao committed Nov 30, 2017
1 parent a6ab8f4 commit 1d695be
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 22 deletions.
4 changes: 3 additions & 1 deletion floyd/example/simple/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ CXXFLAGS+=-I../../.. \
-I$(PINK_INCLUDE_DIR) \
-I$(ROCKSDB_INCLUDE_DIR)

OBJECT = t t1 t2 t3 t4 t5 t6 t7 t8 test_lock test_lock1 test_lock2 add_server add_server1
OBJECT = t t1 t2 t3 t4 t5 t6 t7 t8 test_lock test_lock1 test_lock2 add_server add_server1 read_bench
SRC_DIR = ./
THIRD_PATH = ../../third
OUTPUT = ./output
Expand Down Expand Up @@ -108,6 +108,8 @@ add_server: add_server.cc
$(CXX) $(CXXFLAGS) -o $@ $^ $(INCLUDE_PATH) $(LIB_PATH) $(LIBS)
add_server1: add_server1.cc
$(CXX) $(CXXFLAGS) -o $@ $^ $(INCLUDE_PATH) $(LIB_PATH) $(LIBS)
read_bench: read_bench.cc
$(CXX) $(CXXFLAGS) -o $@ $^ $(INCLUDE_PATH) $(LIB_PATH) $(LIBS)
$(OBJS): %.o : %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_PATH)

Expand Down
2 changes: 2 additions & 0 deletions floyd/example/simple/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ test_lock1 is the case that two thread preempt to a same lock
add_server test the base membership change proto, start 3 nodes and writing some data, then start the 4th node and join the group, at last start another node and join the group.

add_server1 is the case that join the group parallel with writing data

read_bench is an benchmark tool to get multi thread reading performance
129 changes: 129 additions & 0 deletions floyd/example/simple/read_bench.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include <unistd.h>
#include <stdlib.h>
#include <sys/time.h>
#include <pthread.h>
#include <signal.h>

#include <iostream>
#include <string>

#include "floyd/include/floyd.h"
#include "slash/include/testutil.h"

using namespace floyd;
uint64_t NowMicros() {
struct timeval tv;
gettimeofday(&tv, NULL);
return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
}

Floyd *f1, *f2, *f3, *f4, *f5;
std::string keystr[1001000];
std::string valstr[1001000];
int val_size = 10;
int thread_num = 32;
int item_num = 100000;

void *fun(void *arg) {
int i = 1;
Floyd *p;
if (f1->IsLeader()) {
p = f1;
} else if (f2->IsLeader()) {
p = f2;
} else if (f3->IsLeader()) {
p = f3;
} else if (f4->IsLeader()) {
p = f4;
} else {
p = f5;
}
std::string val;
while (i--) {
for (int j = 0; j < item_num; j++) {
p->Read(keystr[j], &val);
}
}
}

int main(int argc, char * argv[])
{
if (argc > 1) {
thread_num = atoi(argv[1]);
}
if (argc > 2) {
val_size = atoi(argv[2]);
}
if (argc > 3) {
item_num = atoi(argv[3]);
}

printf("multi threads test to get performance thread num %d key size %d item number %d\n", thread_num, val_size, item_num);

Options op1("127.0.0.1:8901,127.0.0.1:8902,127.0.0.1:8903,127.0.0.1:8904,127.0.0.1:8905", "127.0.0.1", 8901, "./data1/");
slash::Status s = Floyd::Open(op1, &f1);
printf("%s\n", s.ToString().c_str());

Options op2("127.0.0.1:8901,127.0.0.1:8902,127.0.0.1:8903,127.0.0.1:8904,127.0.0.1:8905", "127.0.0.1", 8902, "./data2/");
s = Floyd::Open(op2, &f2);
printf("%s\n", s.ToString().c_str());

Options op3("127.0.0.1:8901,127.0.0.1:8902,127.0.0.1:8903,127.0.0.1:8904,127.0.0.1:8905", "127.0.0.1", 8903, "./data3/");
s = Floyd::Open(op3, &f3);
printf("%s\n", s.ToString().c_str());

Options op4("127.0.0.1:8901,127.0.0.1:8902,127.0.0.1:8903,127.0.0.1:8904,127.0.0.1:8905", "127.0.0.1", 8904, "./data4/");
s = Floyd::Open(op4, &f4);
printf("%s\n", s.ToString().c_str());

Options op5("127.0.0.1:8901,127.0.0.1:8902,127.0.0.1:8903,127.0.0.1:8904,127.0.0.1:8905", "127.0.0.1", 8905, "./data5/");
s = Floyd::Open(op5, &f5);
printf("%s\n", s.ToString().c_str());

std::string msg;
int i = 10;
uint64_t st = NowMicros(), ed;
for (int i = 0; i < item_num; i++) {
keystr[i] = slash::RandomString(32);
}
for (int i = 0; i < item_num; i++) {
valstr[i] = slash::RandomString(val_size);
}
while (1) {
if (f1->HasLeader()) {
f1->GetServerStatus(&msg);
printf("%s\n", msg.c_str());
break;
}
printf("electing leader... sleep 2s\n");
sleep(2);
}
for (int i = 0; i < item_num; i++) {
f1->Write(keystr[i], valstr[i]);
}

pthread_t pid[24];
st = NowMicros();
for (int i = 0; i < thread_num; i++) {
pthread_create(&pid[i], NULL, fun, NULL);
}
for (int i = 0; i < thread_num; i++) {
pthread_join(pid[i], NULL);
}
ed = NowMicros();
printf("read_bench reading %d datas cost time microsecond(us) %ld, qps %llu\n",
item_num * thread_num, ed - st, item_num * thread_num * 1000000LL / (ed - st));

getchar();
delete f2;
delete f3;
delete f4;
delete f5;
delete f1;
return 0;
}
12 changes: 4 additions & 8 deletions floyd/example/simple/t1.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,13 @@ void *fun(void *arg) {
Floyd *p;
if (f1->IsLeader()) {
p = f1;
}
if (f2->IsLeader()) {
} else if (f2->IsLeader()) {
p = f2;
}
if (f3->IsLeader()) {
} else if (f3->IsLeader()) {
p = f3;
}
if (f4->IsLeader()) {
} else if (f4->IsLeader()) {
p = f4;
}
if (f5->IsLeader()) {
} else {
p = f5;
}
while (i--) {
Expand Down
2 changes: 1 addition & 1 deletion floyd/src/floyd_client_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ClientPool {
Status UpHoldCli(Client* client);

private:
Logger* info_log_;
Logger* const info_log_;
int timeout_ms_;
int retry_;
slash::Mutex mu_;
Expand Down
2 changes: 1 addition & 1 deletion floyd/src/floyd_peer_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ void Peer::AppendEntriesRPC() {
// response
next_index_ = adjust_index;
LOGV(INFO_LEVEL, info_log_, "Peer::AppEntriesRPC: peer_addr %s Adjust peer next_index_, Now next_index_ is %lu",
peer_addr_.c_str(), next_index_);
peer_addr_.c_str(), next_index_.load());
AddAppendEntriesTask();
}
}
Expand Down
2 changes: 1 addition & 1 deletion floyd/src/floyd_peer_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class Peer {
ClientPool* const pool_;
FloydApply* const apply_;
Options options_;
Logger* info_log_;
Logger* const info_log_;


std::atomic<uint64_t> next_index_;
Expand Down
14 changes: 5 additions & 9 deletions floyd/src/raft_log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,19 @@ uint64_t RaftLog::Append(const std::vector<const Entry *> &entries) {
slash::MutexLock l(&lli_mutex_);
rocksdb::WriteBatch wb;
LOGV(DEBUG_LEVEL, info_log_, "RaftLog::Append: entries.size %lld", entries.size());
// try to commit entries in one batch
for (size_t i = 0; i < entries.size(); i++) {
std::string buf;
entries[i]->SerializeToString(&buf);
last_log_index_++;
wb.Put(UintToBitStr(last_log_index_), buf);
// s = db_->Put(rocksdb::WriteOptions(), UintToBitStr(last_log_index_), buf);
// if (!s.ok()) {
// LOGV(ERROR_LEVEL, info_log_, "RaftLog::Append %lu string %s false\n", last_log_index_, UintToBitStr(last_log_index_).c_str());
// return --last_log_index_;
// }
}
rocksdb::Status s;
s = db_->Write(rocksdb::WriteOptions(), &wb);
if (!s.ok()) {
LOGV(ERROR_LEVEL, info_log_, "RaftLog::Append %lu false\n", last_log_index_);
last_log_index_ -= entries.size();
return last_log_index_;
LOGV(ERROR_LEVEL, info_log_, "RaftLog::Append append entries failed, entries size %u, last_log_index_ is %lu",
entries.size(), last_log_index_);
}
return last_log_index_;
}
Expand All @@ -89,7 +85,7 @@ int RaftLog::GetEntry(const uint64_t index, Entry *entry) {
std::string res;
rocksdb::Status s = db_->Get(rocksdb::ReadOptions(), buf, &res);
if (s.IsNotFound()) {
LOGV(ERROR_LEVEL, info_log_, "RaftLog::GetEntry: GetEntry not found %lld \n", index);
LOGV(ERROR_LEVEL, info_log_, "RaftLog::GetEntry: GetEntry not found, index is %lld", index);
entry = NULL;
return 1;
}
Expand Down Expand Up @@ -128,7 +124,7 @@ int RaftLog::TruncateSuffix(uint64_t index) {
rocksdb::Status s = db_->Delete(rocksdb::WriteOptions(), UintToBitStr(last_log_index_));
if (!s.ok()) {
LOGV(ERROR_LEVEL, info_log_, "RaftLog::TruncateSuffix Error last_log_index %lu "
"truncate from %lu\n", last_log_index_, index);
"truncate from %lu", last_log_index_, index);
return -1;
}
}
Expand Down
2 changes: 1 addition & 1 deletion floyd/src/raft_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class RaftLog {

private:
rocksdb::DB* const db_;
Logger* info_log_;
Logger* const info_log_;
/*
* mutex for last_log_index_
*/
Expand Down

0 comments on commit 1d695be

Please sign in to comment.