Skip to content

Commit

Permalink
WIP: ongoing work on performance optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavel Kraynyukhov committed May 19, 2018
1 parent fdc50fe commit cd183ad
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 52 deletions.
1 change: 1 addition & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* 19.05.2018 - 0.6.3 branched-out and marked stable. 0.7.0 branch is in upstream - experimental work on performance improvements.
* 09.05.2018 - 0.6.2; New features: service configuration option max_inbound_message_size to limit size of inbound messages
* 09.05.2018 - 0.6.2; New features: workers configuration option: max_connections to limit maximum amount of connections per worker (10 000 by default)
* 09.05.2018 - 0.6.2; New features: workers configuration option: auto_fragment, - to autofragment outbound messages (default value: false)
Expand Down
58 changes: 58 additions & 0 deletions README.md

Large diffs are not rendered by default.

48 changes: 29 additions & 19 deletions benchmark/echo_client_tls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ using websocketpp::lib::placeholders::_1;
using websocketpp::lib::placeholders::_2;
using websocketpp::lib::bind;

std::string blob(1024,'c');
size_t rounds_per_run=40000;

size_t om_counter=0;

uint64_t start=0;
Expand All @@ -77,17 +80,31 @@ typedef websocketpp::config::asio_client::message_type::ptr message_ptr;
// prints the message and then sends a copy of the message back to the server.
void on_message(client* c, websocketpp::connection_hdl hdl, message_ptr msg)
{

//std::cout << "Message received: " << msg->get_payload() << std::endl;
//
om_counter++;

slice=time_now();
if((slice - start) >= 1000)
if(om_counter == rounds_per_run)
{
std::cout << "Messages " << om_counter << " per " << slice - start << "ms" << std::endl;
start=time_now();
slice=time_now();
size_t ms = slice - start;
std::cout << om_counter << " messages of size " << blob.size() << " octets have had their roundtrips within " << ms << " ms " << std::endl;
std::cout << "average rountrip time: " << float(om_counter)/float(ms) << " per ms, or " << (float(om_counter)/float(ms))*1000.0f << std::endl;
std::cout << "tabdata: " << blob.size() << " " << (float(om_counter)/float(ms))*1000.0f << std::endl;
// 512kb max, no reason for larger blobs so far.
if(blob.size() < 262144)
{
blob=std::move(std::string(blob.size()*2,'x'));
}
else{
std::cout << "finished" << std::endl;
exit(0);
}
// make less iterations for larger blobs, or it never ends
if(rounds_per_run > 100)
{
rounds_per_run/=2;
}
om_counter=0;
start=time_now();
}


Expand All @@ -97,28 +114,21 @@ void on_message(client* c, websocketpp::connection_hdl hdl, message_ptr msg)
if (ec) {
std::cout << "Echo failed because: " << ec.message() << std::endl;
}
//std::cout << "Message have been sent" << std::endl;
}

void on_open(client* c, websocketpp::connection_hdl hdl)
{
websocketpp::lib::error_code ec;
c->send(hdl,"ooooooooooooooooooooooooooooooooooooooooooooooooooooooooo\
ooooooooooooooooooooooooooooooooooooooooooooooooooooooooo\
ooooooooooooooooooooooooooooooooooooooooooooooooooooooooo\
ooooooooooooooooooooooooooooooooooooooo000000000000000000",
websocketpp::frame::opcode::text,ec);
start=time_now();
c->send(hdl,blob,websocketpp::frame::opcode::text,ec);
if(ec)
{
std::cout << "Can't send a message: " << ec.message() << std::endl;
}
// else c->get_alog().write(websocketpp::log::alevel::app, "Sent Message: test");

c->send(hdl,"ooooooooooooooooooooooooooooooooooooooooooooooooooooooooo\
ooooooooooooooooooooooooooooooooooooooooooooooooooooooooo\
ooooooooooooooooooooooooooooooooooooooooooooooooooooooooo\
ooooooooooooooooooooooooooooooooooooooo000000000000000000",
websocketpp::frame::opcode::text,ec);
// speedup
c->send(hdl,"", websocketpp::frame::opcode::binary,ec);
if(ec)
{
std::cout << "Can't send a message: " << ec.message() << std::endl;
Expand All @@ -138,7 +148,7 @@ int main(int argc, char* argv[]) {
c.set_tls_init_handler(f);


std::string uri = "wss://localhost:5080/echo";
std::string uri = "wss://localhost:5083/echo";

if (argc == 2) {
uri = argv[1];
Expand Down
33 changes: 33 additions & 0 deletions benchmark/runBenchmark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/bin/bash

if [ -f ./echo_client_tls.cpp -a ! -f ./benchmark ]
then
g++ -pthread -O3 -std=c++14 echo_client_tls.cpp -o benchmark -lboost_system -lcrypto -lssl
fi

export clients=80

if [ "${1}x" != "x" ]
then
clients=$1
fi

rm log.*; let j=0; while [ $j -lt $clients ]; do ./benchmark > log.$j & let j++; done

clients_started=$(ps ax | grep benchmark | grep -v grep | wc -l)

while [ 1 ]
do
count_bms=$(ps ax | grep benchmark | grep -v grep | wc -l)
if [ $count_bms -eq 0 ]
then
break;
else
sleep 1;
fi
done

echo "All results bellow are for ${clients_started} clients"
echo

grep tabdata log.* | awk '{print $2, $3}' | awk -v clients=$clients_started '{count[$1]++; avg[$1]=($2+(count[$1]-1)*avg[$1])/count[$1]}END{for(i in avg) print avg[i]*clients, i, avg[i], i*avg[i]*clients/1024/1024*8 " Mbit/s" }' | sort -nr
10 changes: 5 additions & 5 deletions include/Balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,30 +87,30 @@ namespace LAppS

if(mWorkersCache.size()>0)
{
size_t choosen=0;
size_t chosen=0;
auto stats=mWorkersCache[0]->getStats();
for(size_t i=1;i<mWorkersCache.size();++i)
{
auto stats2=mWorkersCache[i]->getStats();
if(stats.mConnections>stats2.mConnections) // candidate i
{
choosen=i;
chosen=i;
if(stats.mEventQSize > stats2.mEventQSize)
{
stats=stats2;
choosen=i;
chosen=i;
}
}
else
{
if(stats.mEventQSize > stats2.mConnections*mConnectionWeight)
{
stats=stats2;
choosen=i;
chosen=i;
}
}
}
mWorkersCache[choosen]->update(inbound_connection);
mWorkersCache[chosen]->update(inbound_connection);
}else
{
mMayRun.store(false);
Expand Down
4 changes: 3 additions & 1 deletion include/IOWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ namespace LAppS
{
SyncLock sync(mInboundMutex);
mInboundConnections.push(std::move(socketsptr));
mStats.mConnections=mConnections.size()+mInboundConnections.size();
haveNewConnections.store(true);
}

Expand All @@ -100,6 +101,7 @@ namespace LAppS
{
mInboundConnections.push(std::move(socketsptr[i]));
}
mStats.mConnections=mConnections.size()+mInboundConnections.size();
haveNewConnections.store(true);
}

Expand Down Expand Up @@ -176,7 +178,7 @@ namespace LAppS

void updateStats()
{
mStats.mConnections=mConnections.size();
mStats.mConnections=mConnections.size()+mInboundConnections.size();
}

void submitResponse(const int fd, std::queue<MSGBufferTypeSPtr>& messages)
Expand Down
39 changes: 20 additions & 19 deletions include/WSServerMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ namespace WebSocketProtocol

FragmentedServerMessage(msgQType& out,const WebSocketProtocol::OpCode oc,const char* src, const size_t len)
{
int fragments = len/1392;
const size_t fragment_pl_size=1392;
int fragments = len/fragment_pl_size;
size_t bytes_left=len;
size_t offset=0;

Expand All @@ -76,11 +77,11 @@ namespace WebSocketProtocol
// first fragment
outmsg->clear();
outmsg->push_back(oc);
WS::putLength(1392,*outmsg);
WS::putLength(fragment_pl_size,*outmsg);
offset=outmsg->size();
outmsg->resize(offset+1392);
memcpy(outmsg->data()+offset,src,1392);
bytes_left=bytes_left-1392;
outmsg->resize(offset+fragment_pl_size);
memcpy(outmsg->data()+offset,src,fragment_pl_size);
bytes_left=bytes_left-fragment_pl_size;
out.push(outmsg);
--fragments;

Expand All @@ -90,11 +91,11 @@ namespace WebSocketProtocol
outmsg=std::make_shared<MSGBufferType>();
outmsg->clear();
outmsg->push_back(0);
WS::putLength(1392,*outmsg);
WS::putLength(fragment_pl_size,*outmsg);
offset=outmsg->size();
outmsg->resize(offset+1392);
memcpy(outmsg->data()+offset,src+(len-bytes_left),1392);
bytes_left=bytes_left-1392;
outmsg->resize(offset+fragment_pl_size);
memcpy(outmsg->data()+offset,src+(len-bytes_left),fragment_pl_size);
bytes_left=bytes_left-fragment_pl_size;
out.push(outmsg);
--fragments;
}
Expand All @@ -121,8 +122,8 @@ namespace WebSocketProtocol
FragmentedServerMessage(msgQType& out,const WebSocketProtocol::OpCode oc,const std::vector<uint8_t>& src)
{
const size_t len=src.size();

int fragments = len/1392;
const size_t fragment_pl_size=1392;
int fragments = len/fragment_pl_size;
size_t bytes_left=len;
size_t offset=0;

Expand All @@ -133,11 +134,11 @@ namespace WebSocketProtocol
// first fragment
outmsg->clear();
outmsg->push_back(oc);
WS::putLength(1392,*outmsg);
WS::putLength(fragment_pl_size,*outmsg);
offset=outmsg->size();
outmsg->resize(offset+1392);
memcpy(outmsg->data()+offset,src.data(),1392);
bytes_left=bytes_left-1392;
outmsg->resize(offset+fragment_pl_size);
memcpy(outmsg->data()+offset,src.data(),fragment_pl_size);
bytes_left=bytes_left-fragment_pl_size;
out.push(outmsg);
--fragments;

Expand All @@ -147,11 +148,11 @@ namespace WebSocketProtocol
outmsg=std::make_shared<MSGBufferType>();
outmsg->clear();
outmsg->push_back(0);
WS::putLength(1392,*outmsg);
WS::putLength(fragment_pl_size,*outmsg);
offset=outmsg->size();
outmsg->resize(offset+1392);
memcpy(outmsg->data()+offset,src.data()+(len-bytes_left),1392);
bytes_left=bytes_left-1392;
outmsg->resize(offset+fragment_pl_size);
memcpy(outmsg->data()+offset,src.data()+(len-bytes_left),fragment_pl_size);
bytes_left=bytes_left-fragment_pl_size;
out.push(outmsg);
--fragments;
}
Expand Down
1 change: 0 additions & 1 deletion include/WebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ template <bool TLSEnable=false, bool StatsEnable=false> class WebSocket
return false;
case io::Status::POLLIN:
case io::Status::POLLOUT:
return true;
default:
return true;
}
Expand Down
7 changes: 2 additions & 5 deletions include/WorkerStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@

#ifndef __WORKERSTATS_H__
# define __WORKERSTATS_H__
#include <mutex>
#include <sys/synclock.h>


/**
* @brief statistics of every worker. No locking/syncing. Worker
* writes to the stat-fields. ConsoleManager, gathers data from workers
* "AS IS" with no guaranty of an accurate synchronized reading.
* writes to the stat-fields. No accuracy guarantee.
*
**/
struct WorkerStats
Expand All @@ -48,4 +46,3 @@ struct WorkerStats
};

#endif /* __WORKERSTATS_H__ */

4 changes: 2 additions & 2 deletions nbproject/Makefile-Debug.mk
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ ${CND_DISTDIR}/${CND_CONF}/${CND_PLATFORM}/lapps: ${OBJECTFILES}
${OBJECTDIR}/src/getLog.o: src/getLog.cpp nbproject/Makefile-${CND_CONF}.mk
${MKDIR} -p ${OBJECTDIR}/src
${RM} "$@.d"
$(COMPILE.cc) -g -DAPP_NAME=\"LAppS\" -DLOG_FILE=\"lapps.log\" -DLOG_INFO -DMAX_BUFF_SIZE=512 -DSTATS_ENABLE -DTSAFE_LOG=1 -I../libressl/include -I../ITCFramework/include -I../ITCLib/include -I../utils/include -Iinclude -Iinclude/modules -I/usr/local/include -I/usr/local/include/luajit-2.0 -std=c++14 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/getLog.o src/getLog.cpp
$(COMPILE.cc) -g -DAPP_NAME=\"LAppS\" -DLOG_FILE=\"lapps.log\" -DLOG_INFO -DMAX_BUFF_SIZE=512 -DSTATS_ENABLE -DTSAFE_LOG=1 -DLAPPS_TLS_ENABLE -I../libressl/include -I../ITCFramework/include -I../ITCLib/include -I../utils/include -Iinclude -Iinclude/modules -I/usr/local/include -I/usr/local/include/luajit-2.0 -std=c++14 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/getLog.o src/getLog.cpp

${OBJECTDIR}/src/main.o: src/main.cpp nbproject/Makefile-${CND_CONF}.mk
${MKDIR} -p ${OBJECTDIR}/src
${RM} "$@.d"
$(COMPILE.cc) -g -DAPP_NAME=\"LAppS\" -DLOG_FILE=\"lapps.log\" -DLOG_INFO -DMAX_BUFF_SIZE=512 -DSTATS_ENABLE -DTSAFE_LOG=1 -I../libressl/include -I../ITCFramework/include -I../ITCLib/include -I../utils/include -Iinclude -Iinclude/modules -I/usr/local/include -I/usr/local/include/luajit-2.0 -std=c++14 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/main.o src/main.cpp
$(COMPILE.cc) -g -DAPP_NAME=\"LAppS\" -DLOG_FILE=\"lapps.log\" -DLOG_INFO -DMAX_BUFF_SIZE=512 -DSTATS_ENABLE -DTSAFE_LOG=1 -DLAPPS_TLS_ENABLE -I../libressl/include -I../ITCFramework/include -I../ITCLib/include -I../utils/include -Iinclude -Iinclude/modules -I/usr/local/include -I/usr/local/include/luajit-2.0 -std=c++14 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/main.o src/main.cpp

# Subprojects
.build-subprojects:
Expand Down
1 change: 1 addition & 0 deletions nbproject/configurations.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
<commandLine>-pipe -Wall -pthread -O2 -fPIC -march=native -mtune=native -fstack-check -fstack-protector-strong -mfpmath=sse -ftree-vectorize -funroll-loops</commandLine>
<preprocessorList>
<Elem>APP_NAME="LAppS"</Elem>
<Elem>LAPPS_TLS_ENABLE</Elem>
<Elem>LOG_FILE="lapps.log"</Elem>
<Elem>LOG_INFO</Elem>
<Elem>MAX_BUFF_SIZE=512</Elem>
Expand Down

0 comments on commit cd183ad

Please sign in to comment.