Skip to content

Commit

Permalink
LAppS-0.6.2. New features: option for inbound message limit per servi…
Browse files Browse the repository at this point in the history
…ce; optioanl outbound messages auto-fragmentation; option for workers to limit amount of connections
  • Loading branch information
Pavel Kraynyukhov committed May 9, 2018
1 parent f339cc7 commit e35dbd6
Show file tree
Hide file tree
Showing 20 changed files with 371 additions and 43 deletions.
32 changes: 32 additions & 0 deletions examples/broadcast_blob/broadcast_blob.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
--[[
-- auto fragmentation test
--]]
--
broadcast_blob={}
broadcast_blob.__index=broadcast_blob

broadcast_blob["init"]=function()
bcast:create(2000);
broadcast_blob["nap"]=nap:new();
end

broadcast_blob["run"]=function()
print("running")
local oon=nljson.decode([[{
"cid" : 6,
"message" : [ 1, 2, 3 ,4 ,5,
"ooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooo______oooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\noooooo___oooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\noooooo___oooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\noooooo___oooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\noooooo___oooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\noooooo___oooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\noooooo___ooooooooo OOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\noooooo_____________OOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOooooooo oooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOooooo oo ooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooo oooo oooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoo oooooo oooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOo oooooooo ooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOo oooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOO oooooooooooo ooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOO oooooooooo ooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOooo oooooooOO OOOOooooooooooo\nooooooooooooooooooOOOOOOOOOooo ooooooooo ooooooOO OOOOOOOO OOOooooooooooo\nooooooooooooooooooOOOOOOOOOooo oooooooooo oooooOO OOOOOOOOO OOooooooooooo\nooooooooooooooooooOOOOOOOOOooo oooooooooo oooooOO OOOOOOOOO OOooooooooooo\nooooooooooooooooooOOOOOOOOOooo ooooooooo ooooooOO OOOOOOOO OOOooooooooooo\nooooooooooooooooooOOOOOOOOOooo oooooooOO OOOOooooooooooo\nooooooooooooooooooOOOOOOOOOooo ooooooooooooooooOO OOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOooo ooooooooooooooooOO OOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOooo ooooooooooooooooOO OOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOooo ooooooooooooooooOO OOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOO oooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOO OOOOOOOooo ooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOO OOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOO OOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOO OOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOO OOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOO OOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOO oooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOO OOOOOOOOOo ooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOO OOOOOOOOOo ooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOO oooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo\nooooooooooooooooooOOOOOOOOOoooooooooooooooooooooOOOOOOOOOOOOOOOOooooooooooo"
]
}]]);
while true
do
broadcast_blob.nap:usleep(5000000);
bcast:send(2000,oon);
if(must_stop())
then
break;
end
end
end

return broadcast_blob;
2 changes: 1 addition & 1 deletion examples/echo_lapps/echo_lapps.lua
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ echo_lapps["onMessage"]=function(handler,msg_type, message)
end,
[2] = function() -- a CN with params, does not require any response.
local method=methods._cn_w_params_method[message.method] or echo.method_not_found;
method(handler,message.params);
method(handler,message.params,message.cid);
end,
[3] = function() -- requests without params are unsupported by this app
local method=echo.method_not_found;
Expand Down
14 changes: 11 additions & 3 deletions examples/echo_lapps/lapps_echo_methods.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,29 @@ lapps_echo_methods["_cn_w_params_method"]={
ws:close(handler,1000); -- normal close code
end
end,
["subscribe"]=function(handler,params)
["subscribe"]=function(handler,params, cid)
local not_authenticated=nljson.decode([[{
"status" : 0,
"error" : {
"code" : -32000,
"message" : "Not authenticated. Permission deneid"
},
"cid" : 0
"cid" : 3
}]]);

local authkey=nljson.find(params[1],"authkey");
local current_session_authkey=nljson.find(maps.keys,handler);
if(authkey ~= nil) and (current_session_authkey ~=nil) and (authkey == current_session_authkey)
then
bcast:subscribe(1000,handler);
if(cid == 5)
then
bcast:subscribe(1000,handler);
end

if(cid == 6)
then
bcast:subscribe(2000,handler);
end
else
ws:send(handler,not_authenticated);
ws:close(handler,1000); -- WebSocket close code here. 1000 - "normal close"
Expand Down
6 changes: 6 additions & 0 deletions examples/echo_lapps_client/client.html
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@
cid: 5
};
websocket.send(CBOR.encode(subscribe));
subscribe.cid=6; // subscribe to blob broadcast
websocket.send(CBOR.encode(subscribe));
window.subscribed=true;
}
}
Expand All @@ -150,6 +152,10 @@
$$("barChart").remove($$("barChart").getFirstId());
}
}
else if(message.cid === 6) // blob was broadcasted by server
{
console.log("OON. Received a blob of size: "+message.message[5]);
}
else // other OONs are just printed to console
{
console.log("OON: "+JSON.stringify(message));
Expand Down
11 changes: 9 additions & 2 deletions include/Application.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ namespace LAppS
std::mutex mMutex;
std::string mName;
std::string mTarget;
size_t max_inbound_message_size;
ApplicationContext<TLSEnable,StatsEnable,Tproto> mAppContext;
itc::tsbqueue<TaggedEvent> mEvents;
std::vector<WorkerSPtrType> mWorkers;
Expand Down Expand Up @@ -94,12 +95,18 @@ namespace LAppS
mMayRun.store(false);
}

explicit Application(const std::string& appName,const std::string& target)
: mMayRun(true),mMutex(),mName(appName), mTarget(target),mAppContext(appName,this)
explicit Application(const std::string& appName,const std::string& target,const size_t mims)
: mMayRun(true),mMutex(),mName(appName), mTarget(target),
max_inbound_message_size(mims),mAppContext(appName,this)
{
itc::Singleton<WSWorkersPool<TLSEnable,StatsEnable>>::getInstance()->getWorkers(mWorkers);
}

const size_t getMaxMSGSize() const
{
return max_inbound_message_size;
}

auto getWorker(const size_t wid)
{
if( wid<mWorkers.size() )
Expand Down
15 changes: 10 additions & 5 deletions include/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,13 @@ namespace LAppS
{"connection_weight", 0.7},
{"ip","0.0.0.0"},
{"port",5083},
{"workers",{ {"workers",4}, {"max_connections", 1000 }}},
{"workers",{ {"workers",4}, {"max_connections", 10000 },{"auto_fragment",true}}},
#ifdef LAPPS_TLS_ENABLE
{"tls",true},
#else
{"tls",false},
#endif
{"tls_certificates",{ {"ca","/opt/lapps/etc/ssl/cert.pem"},{"cert", "/opt/lapps/conf/ssl/cert.pem"}, {"key","/opt/lapps/conf/ssl/key.pem" } }},
{"auto_fragment",true}, // Not yet implemented
{"max_inbound_message_size",300000} // 300 000 bytes. Not yet implemented.No message limit so far.
{"tls_certificates",{ {"ca","/opt/lapps/etc/ssl/cert.pem"},{"cert", "/opt/lapps/conf/ssl/cert.pem"}, {"key","/opt/lapps/conf/ssl/key.pem" } }}
}),
lapps_config({
{
Expand All @@ -75,17 +73,24 @@ namespace LAppS
{"internal", false},
{"request_target", "/echo"},
{"protocol", "raw"},
{"max_inbound_message_size",16*1024*1024}, // autobahn-testsuite uses up to 16MB messages in fuzzingclinet.
{"instances", 4}
}}},
{{"echo_lapps", {
{"request_target", "/echo_lapps"},
{"protocol", "LAppS"},
{"max_inbound_message_size",1024},
{"instances", 3}
}}},
{{"time_broadcast", {
{"internal", true},
{"instances", 1}
}}}/**,
}}},
{{"broadcast_blob", {
{"internal", true},
{"instances", 1}
}}}
/**,
{{"data_source", {
{"internal", true},
{"request_queue", "http_request"}, // optional request queue for apps ipc. if declared, may not be null
Expand Down
4 changes: 2 additions & 2 deletions include/HTTPRequestParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ class HTTPRequestParser
}
cursor+=HTTPVersion.size()+2;

cursor+=collectHeaders((const char*)(&ref.data()[cursor]),bufflen-cursor);
assert(cursor <= bufflen);
collectHeaders((const char*)(&ref.data()[cursor]),bufflen-cursor);

} else throwMalformedHTTPRequest("No space after MODE[GET]");
} else throwMalformedHTTPRequest("Only GET requests are accepted");
}
Expand Down
14 changes: 11 additions & 3 deletions include/IOWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ namespace LAppS
typedef WebSocket<TLSEnable,StatsEnable> WSType;
typedef std::shared_ptr<WSType> WSSPtr;

explicit IOWorker(const size_t id, const size_t maxConnections)
: Worker(id,maxConnections), mMayRun(true),mCanStop(false),
explicit IOWorker(const size_t id, const size_t maxConnections,const bool af)
: Worker(id,maxConnections,af), mMayRun(true),mCanStop(false),
mConnectionsMutex(), mInboundMutex(), mOutMutex(),
mShakespeer(),
mEPollThr(std::make_shared<LAppS::ePollController>(1000)),
Expand Down Expand Up @@ -316,7 +316,15 @@ namespace LAppS
switch(current->getState())
{
case WSType::HANDSHAKE:
mShakespeer.handshake(current);
if(mConnections.size()>mMaxConnections)
{
mShakespeer.sendForbidden(current);
}
else
{
mShakespeer.handshake(current);
}

if(current->getState()==WSType::CLOSED)
{
deleteConnection(fd);
Expand Down
18 changes: 17 additions & 1 deletion include/Shakespeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
#include <Config.h>
#include <ApplicationRegistry.h>

static const std::string forbidden("HTTP/1.1 403 Forbidden");

namespace LAppS
{
template <bool TLSEnable=false, bool StatsEnable=false> class Shakespeer
Expand All @@ -55,6 +57,21 @@ namespace LAppS
Shakespeer(const Shakespeer&)=delete;
Shakespeer(Shakespeer&)=delete;

void sendForbidden(const WSSPtr& wssocket)
{
std::string peer_ip;
wssocket->getPeerIP(peer_ip);

itc::getLog()->info(
__FILE__,__LINE__,
"Connection denied for fd %d, peer %s",
wssocket->getFileDescriptor(),
peer_ip.c_str()
);

wssocket->send(forbidden);
wssocket->setState(WSType::CLOSED);
}
void handshake(const WSSPtr& wssocket)
{
if(wssocket->getState() == WSType::HANDSHAKE)
Expand Down Expand Up @@ -111,7 +128,6 @@ namespace LAppS
"Shakespeer::handshake() was unsuccessful for peer %s. Closing this WebSocket.",
peer_addr.c_str()
);
static const std::string forbidden("HTTP/1.1 403 Forbidden");
wssocket->send(forbidden);
wssocket->setState(WSType::CLOSED);
return;
Expand Down
129 changes: 129 additions & 0 deletions include/WSServerMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <vector>
#include <string>
#include <WSProtocol.h>
#include <queue>

/**
* Fast and bad implementation of server side messages.
Expand All @@ -47,6 +48,134 @@ namespace WebSocketProtocol
MakeMessageHeader(MakeMessageHeader&)=delete;
MakeMessageHeader()=delete;
};

struct FragmentedServerMessage
{
// Ethernet frame Upper Layer protocol payload typically 1500 bytes
// TCP frame 20 bytes without options up to 60 bytes with options
// IP header 24 bytes max
// 24+60 == 84 bytes
// 1500 - 84 = 1416 max WebSockets Frame size (inclusive headers)
// max WebSocket header size 24 bytes
// 1416 - 24 = 1392 max fame payload size
//

typedef std::vector<uint8_t> msgtype;
typedef std::queue<MSGBufferTypeSPtr> msgQType;

FragmentedServerMessage(msgQType& out,const WebSocketProtocol::OpCode oc,const char* src, const size_t len)
{
int fragments = len/1392;
size_t bytes_left=len;
size_t offset=0;

auto outmsg=std::make_shared<MSGBufferType>();

if(fragments>1)
{
// first fragment
outmsg->clear();
outmsg->push_back(oc);
WS::putLength(1392,*outmsg);
offset=outmsg->size();
outmsg->resize(offset+1392);
memcpy(outmsg->data()+offset,src,1392);
bytes_left=bytes_left-1392;
out.push(outmsg);
--fragments;

// continuation fragments
while((fragments-1)>0)
{
outmsg=std::make_shared<MSGBufferType>();
outmsg->clear();
outmsg->push_back(0);
WS::putLength(1392,*outmsg);
offset=outmsg->size();
outmsg->resize(offset+1392);
memcpy(outmsg->data()+offset,src+(len-bytes_left),1392);
bytes_left=bytes_left-1392;
out.push(outmsg);
--fragments;
}
// fin fragment;
outmsg=std::make_shared<MSGBufferType>();
outmsg->clear();
outmsg->push_back(128);
WS::putLength(bytes_left,*outmsg);
offset=outmsg->size();
outmsg->resize(offset+bytes_left);
memcpy(outmsg->data()+offset,src+(len-bytes_left),bytes_left);
out.push(outmsg);
}else{
outmsg->clear();
outmsg->push_back(128|oc);
WS::putLength(len,*outmsg);
size_t offset=outmsg->size();
outmsg->resize(offset+len);
memcpy(outmsg->data()+offset,src,len);
out.push(outmsg);
}
}

FragmentedServerMessage(msgQType& out,const WebSocketProtocol::OpCode oc,const std::vector<uint8_t>& src)
{
const size_t len=src.size();

int fragments = len/1392;
size_t bytes_left=len;
size_t offset=0;

auto outmsg=std::make_shared<MSGBufferType>();

if(fragments>1)
{
// first fragment
outmsg->clear();
outmsg->push_back(oc);
WS::putLength(1392,*outmsg);
offset=outmsg->size();
outmsg->resize(offset+1392);
memcpy(outmsg->data()+offset,src.data(),1392);
bytes_left=bytes_left-1392;
out.push(outmsg);
--fragments;

// continuation fragments
while((fragments-1)>0)
{
outmsg=std::make_shared<MSGBufferType>();
outmsg->clear();
outmsg->push_back(0);
WS::putLength(1392,*outmsg);
offset=outmsg->size();
outmsg->resize(offset+1392);
memcpy(outmsg->data()+offset,src.data()+(len-bytes_left),1392);
bytes_left=bytes_left-1392;
out.push(outmsg);
--fragments;
}
// fin fragment;
outmsg=std::make_shared<MSGBufferType>();
outmsg->clear();
outmsg->push_back(128);
WS::putLength(bytes_left,*outmsg);
offset=outmsg->size();
outmsg->resize(offset+bytes_left);
memcpy(outmsg->data()+offset,src.data()+(len-bytes_left),bytes_left);
out.push(outmsg);
}else{
outmsg->clear();
outmsg->push_back(128|oc);
WS::putLength(len,*outmsg);
size_t offset=outmsg->size();
outmsg->resize(offset+len);
memcpy(outmsg->data()+offset,src.data(),len);
out.push(outmsg);
}
}
};

struct ServerMessage
{
ServerMessage(
Expand Down
Loading

0 comments on commit e35dbd6

Please sign in to comment.