Skip to content

Commit

Permalink
Added support for LAppS protocol messaging (input messages validation)
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavel Kraynyukhov committed Apr 18, 2018
1 parent 675792a commit 928521f
Showing 1 changed file with 153 additions and 84 deletions.
237 changes: 153 additions & 84 deletions include/ApplicationContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ extern "C" {

}

#include <modules/nljson.h>

#include <WebSocket.h>
#include <abstract/Worker.h>
#include <WSWorkersPool.h>

#include <modules/nljson.h>
#include <modules/wsSend.h>

using json = nlohmann::json;

Expand All @@ -48,6 +53,8 @@ namespace LAppS
public:

private:
typedef std::shared_ptr<::abstract::Worker> WorkerSPtrType;
enum LAppSInMessageType { INVALID, CN, CN_WITH_PARAMS, REQUEST, REQUEST_WITH_PARAMS };
itc::utils::Int2Type<Tproto> mProtocol;
std::mutex mMutex;
std::string mName;
Expand All @@ -61,9 +68,10 @@ namespace LAppS
lua_pop(mLState,1);
}
}

void callAppOnMessage()
{
int ret = lua_pcall (mLState, 1, 1, 0);
int ret = lua_pcall (mLState, 3, 1, 0);
if(ret != 0)
{
switch(ret)
Expand Down Expand Up @@ -140,6 +148,7 @@ namespace LAppS
cleanLuaStack();
return true;
}

void shutdownApp()
{
lua_getglobal(mLState, mName.c_str());
Expand All @@ -163,7 +172,6 @@ namespace LAppS

void startApp()
{
itc::getLog()->trace(__FILE__,__LINE__,"-> ApplicationContext::startApp()");
lua_getglobal(mLState, mName.c_str());
lua_getfield(mLState, -1, "onStart");
int ret = lua_pcall (mLState, 0, 0, 0);
Expand All @@ -182,111 +190,159 @@ namespace LAppS
}
}
cleanLuaStack();
itc::getLog()->trace(__FILE__,__LINE__,"<- ApplicationContext::startApp()");
}

static const LAppSInMessageType getLAppSInMessageType(const json& msg)
{
auto lapps=msg.find("lapps");
// is it a proper protocol version?
if((lapps!=msg.end()) && (!lapps.value().is_null()) && (lapps.value().is_number()) && lapps.value() == 1 )
{
auto method=msg.find("method");
if(method!=msg.end())
{
if((!method.value().is_null())&&method.value().is_string())
{
const std::string& method_name=method.value();
if(method_name.size()>0)
{
if(method_name[0] != '_')
{
if(method_name.find('.') != std::string::npos)
return INVALID;


bool hasParams=false;

auto params=msg.find("params");
if(params != msg.end())
{
if((!params.value().is_null()) && params.value().is_array() && (params.value().size() > 0))
hasParams=true;
}

auto cid=msg.find("cid");
if(cid!=msg.end()) // Client's Notification
{
if(cid.value().is_number())
if(hasParams)
return CN_WITH_PARAMS;
else return CN;
else return INVALID;
}
else // REQUEST
{
if(hasParams)
return REQUEST_WITH_PARAMS;
else return REQUEST;
}
}
else return INVALID;
}
else return INVALID;
}
else return INVALID;
}
else return INVALID;
}
else return INVALID;
}

const TaggedEvent onMessage(const size_t workerID, const int32_t socketfd, const WSEvent& event, const itc::utils::Int2Type<ApplicationProtocol::RAW>& protocol_is_raw)
const bool onMessage(const size_t workerID, const int32_t socketfd, const WSEvent& event, const itc::utils::Int2Type<ApplicationProtocol::RAW>& protocol_is_raw)
{
itc::getLog()->trace(__FILE__,__LINE__,"Application %s -> ApplicationContext::onMessage(PROTO::RAW)",mName.c_str());
cleanLuaStack();

lua_getfield(mLState, LUA_GLOBALSINDEX, mName.c_str());
lua_getfield(mLState,-1,"onMessage");

lua_pushinteger(mLState,(workerID<<32)|socketfd); // handler
lua_pushinteger(mLState,event.type); // opcode
lua_pushlstring(mLState,(const char*)(event.message->data()),event.message->size());

callAppOnMessage(); // expecting only one result
callAppOnMessage();

uint32_t argc=lua_gettop(mLState);
/*
if(argc != 1)

if(argc != 2)
{
throw std::system_error(EINVAL,std::system_category(),mName+"::onMessage() returned "+std::to_string(argc)+",but only one result was expected");
throw std::system_error(
EINVAL,std::system_category(),
mName+"::onMessage() is returned "+std::to_string(argc)+
" values, but only one boolean value is expected"
);
}
*/

size_t len;
const char *str=lua_tolstring(mLState,argc,&len);
auto out=std::make_shared<MSGBufferType>(len);

WebSocketProtocol::ServerMessage(*out,event.type,str,len);
cleanLuaStack();
itc::getLog()->trace(__FILE__,__LINE__,"Application %s <- ApplicationContext::onMessage(PROTO::RAW)",mName.c_str());
return {workerID,socketfd,{event.type,out}};
if(lua_isboolean(mLState,argc))
{
return lua_toboolean(mLState,argc);
}
return false;
}
const TaggedEvent onMessage(const size_t workerID, const int32_t socketfd, const WSEvent& event, const itc::utils::Int2Type<ApplicationProtocol::LAPPS>& protocol_is_lapps)

void pushRequest(const std::shared_ptr<json>& request)
{
auto udjsptr=static_cast<UDJSPTR**>(lua_newuserdata(mLState,sizeof(UDJSPTR*)));
// allocate UDJSPTR and store its address to allocated block
(*udjsptr)=new UDJSPTR(SHARED_PTR,new JSPTR());
// make JSPTR by parsing the string and store it to *(Userdata->ptr);
*((*udjsptr)->ptr)=request;
luaL_getmetatable(mLState, "nljson");
lua_pushcfunction(mLState, destroy);
lua_setfield(mLState, -2, "__gc");
lua_setmetatable(mLState, -2);
}

const bool onMessage(const size_t workerID, const int32_t socketfd, const WSEvent& event, const itc::utils::Int2Type<ApplicationProtocol::LAPPS>& protocol_is_lapps)
{
itc::getLog()->trace(__FILE__,__LINE__,"Application %s -> ApplicationContext::onMessage(PROTO::LAPPS)",mName.c_str());
auto retval=std::make_shared<MSGBufferType>(0);
// exceptions from json and from lua stack MUST be handled in the Application class
// We must prevent possibility to kill app with inappropriate message, therefore
// content errors may not throw the exceptions.

// Protocol violation, LAppS protocol accepts only binary messages (CBOR encoded)
if(event.type != WebSocketProtocol::OpCode::BINARY)
throw std::system_error(EINVAL,std::system_category(),"Protocol violation, LAppS protocol accepts only binary messages");

try
return false;

cleanLuaStack();

std::shared_ptr<json> message=std::make_shared<json>(json::from_cbor(*event.message));

auto msg_type=getLAppSInMessageType(*message);

// "Protocol violation, LAppS protocol accepts only binary messages (CBOR encoded)"
if(msg_type == INVALID)
return false;

lua_getfield(mLState, LUA_GLOBALSINDEX, mName.c_str());
lua_getfield(mLState,-1,"onMessage");

lua_pushinteger(mLState,(workerID<<32)|socketfd); // socket handler for ws::send
lua_pushinteger(mLState,msg_type);
pushRequest(message);

callAppOnMessage(); // handler, type, userdata

uint32_t argc=lua_gettop(mLState);

if(argc != 1)
{
lua_getfield(mLState, LUA_GLOBALSINDEX, mName.c_str());
lua_getfield(mLState,-1,"onMessage");

auto udjsptr=static_cast<UDJSPTR**>(lua_newuserdata(mLState,sizeof(UDJSPTR*)));
// allocate UDJSPTR and store its address to allocated block
(*udjsptr)=new UDJSPTR(SHARED_PTR,new JSPTR());
// make JSPTR by parsing the string and store it to *(Userdata->ptr);
auto toValidate=std::make_shared<json>(json::from_cbor(event.message.get()->data()));
// TODO: Validate LAppS protocol Request or Notification
*((*udjsptr)->ptr)=toValidate;
luaL_getmetatable(mLState, "nljson");
lua_pushcfunction(mLState, destroy);
lua_setfield(mLState, -2, "__gc");
lua_setmetatable(mLState, -2);

callAppOnMessage(); // expecting userdata object (1 value only);
uint32_t argc=lua_gettop(mLState);

if(argc > 1)
{
throw std::system_error(EINVAL,std::system_category(),mName+"::onMessage() returned "+std::to_string(argc)+",but zero or one result were expected");
}
switch(argc)
{
case 0:
{
(*retval)=json::to_cbor(json("{}"));
}
break;
case 1:
{
void *ptr=luaL_checkudata(mLState,-1,"nljson");

if((*static_cast<UDJSON**>(ptr))->type == SHARED_PTR)
{
auto udptr=static_cast<UDJSPTR**>(ptr);
auto jsptr=(*udptr)->ptr; // std::shared_ptr<json>*
(*retval)=json::to_cbor(**jsptr);
}
else
{
auto udptr=static_cast<UDJSON**>(ptr);
auto jsptr=(*udptr)->ptr; // json*
(*retval)=json::to_cbor(*jsptr);
}
}
break;
}
throw std::system_error(
EINVAL,std::system_category(),
mName+"::onMessage() is returned "+std::to_string(argc)+
" values, but only one boolean value is expected"
);
}
catch( const std::exception& e)

if(lua_isboolean(mLState,-1))
{
itc::getLog()->error(__FILE__,__LINE__,"Exception on client's message CBOR parsing, socket will be closed for the protocol violation");
throw std::system_error(EINVAL,std::system_category(),"Protocol violation, LAppS accepts only valid CBOR encoded messages");
return lua_toboolean(mLState,-1);
}
itc::getLog()->trace(__FILE__,__LINE__,"Application %s <- ApplicationContext::onMessage(PROTO::LAPPS)",mName.c_str());
return {workerID,socketfd,{event.type,retval}};
return false;
}
public:
ApplicationContext(const ApplicationContext&)=delete;
ApplicationContext(ApplicationContext&)=delete;

explicit ApplicationContext(const std::string& appname)
: mMutex(), mName(appname), mLState(luaL_newstate()) /**,mParent(parent)**/
explicit ApplicationContext(const std::string& appname, abstract::Application* parent)
: mMutex(), mName(appname), mLState(luaL_newstate()),mParent(parent)
{
/**
if(mParent == nullptr)
Expand All @@ -297,21 +353,31 @@ namespace LAppS

luaopen_nljson(mLState);
lua_setfield(mLState,LUA_GLOBALSINDEX,"nljson");
cleanLuaStack();

luaopen_wssend(mLState);
lua_setfield(mLState,LUA_GLOBALSINDEX,"ws");
// itc::Singleton<WSWorkersPool<TLSEnable,StatsEnable>>::getInstance()->getWorkers(workersCache);

if(require(mName))
{
itc::getLog()->info(__FILE__,__LINE__,"Lua module %s is registered",mName.c_str());
if(isAppModuleValid(mName))
{
itc::getLog()->info(
__FILE__,__LINE__,
"Lua module %s is a valid application (e.a. provides required methods, though there is no warranty it works properly)",mName.c_str()
"Lua module %s is a valid application (e.a. provides required methods, though there is no warranty it works properly)",
mName.c_str()
);
try {
this->startApp();
}catch(const std::exception& e)
{
itc::getLog()->error(__FILE__,__LINE__,"Unable to start application %s, because of an exception: %s",mName.c_str(),e.what());
itc::getLog()->error(
__FILE__,__LINE__,
"Unable to start application %s, because of an exception: %s",
mName.c_str(),e.what()
);
throw;
}
}
Expand All @@ -321,6 +387,7 @@ namespace LAppS
throw std::logic_error("Invalid application "+mName);
}
}
cleanLuaStack();
}
void shutdown()
{
Expand All @@ -339,8 +406,10 @@ namespace LAppS
this->shutdown();
}
}
const TaggedEvent onMessage(const size_t workerID, const int32_t socketfd, const WSEvent& event)
const bool onMessage(const size_t workerID, const int32_t socketfd, const WSEvent& event)
{
if(workersCache.empty())
itc::Singleton<WSWorkersPool<TLSEnable,StatsEnable>>::getInstance()->getWorkers(workersCache);
return onMessage(workerID,socketfd,event,mProtocol);
}
};
Expand Down

0 comments on commit 928521f

Please sign in to comment.