Skip to content

Commit

Permalink
Update: Net module surrported create a coroutine for handling
Browse files Browse the repository at this point in the history
  • Loading branch information
i0gan committed Mar 31, 2024
1 parent 06269da commit b3f7477
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 25 deletions.
23 changes: 14 additions & 9 deletions src/node/proxy/logic/logic_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ bool LogicModule::Start() { return true; }

bool LogicModule::Destory() { return true; }

bool LogicModule::Update() { return true; }
bool LogicModule::Update() {

return true;
}

bool LogicModule::AfterStart() {
m_class_ = pm_->FindModule<IClassModule>();
Expand All @@ -23,6 +26,7 @@ bool LogicModule::AfterStart() {
m_net_->AddReceiveCallBack(rpc::ProxyRPC::REQ_HEARTBEAT, this, &LogicModule::OnHeartbeat);
m_net_->AddReceiveCallBack(rpc::ProxyRPC::REQ_CONNECT_PROXY, this, &LogicModule::OnReqConnectWithTcp);
m_net_->AddReceiveCallBack(rpc::TestRPC::REQ_TEST_PROXY, this, &LogicModule::OnReqTestProxy);
m_net_client_->AddReceiveCallBack(ServerType::ST_MASTER, this, &LogicModule::OnNnAckMinWorkloadNodeInfo);
m_ws_->AddReceiveCallBack(rpc::ProxyRPC::REQ_CONNECT_PROXY, this, &LogicModule::OnReqConnectWithWS);
m_ws_->AddReceiveCallBack(this, &LogicModule::OnOtherMessage);

Expand Down Expand Up @@ -51,13 +55,13 @@ void LogicModule::OnWebSocketClientEvent(socket_t sock, const SQUICK_NET_EVENT e
}
}

void LogicModule::OnWS(const socket_t sock, const int msg_id, const char* msg, const uint32_t len) {
dout << "On Websocket: " << std::string(msg, len) << endl;
dout << "Websocket recived size: " << len << endl;
rpc::AckConnectProxy ack;
ack.set_code(0);
m_ws_->SendMsgPB(rpc::ProxyRPC::ACK_CONNECT_PROXY, ack, sock);
return;
// request per 5 sec
void LogicModule::NnReqMinWorkloadNodeInfo() {

}

void LogicModule::OnNnAckMinWorkloadNodeInfo(const socket_t sock, const int msg_id, const char* msg, const uint32_t len) {

}

void LogicModule::OnClientDisconnected(const socket_t sock) {
Expand Down Expand Up @@ -315,8 +319,9 @@ bool LogicModule::TryEnter(string guid) {
return true;
}

void LogicModule::OnReqConnectWithTcp(const socket_t sock, const int msg_id, const char* msg, const uint32_t len) {
Coroutine<bool> LogicModule::OnReqConnectWithTcp(const socket_t sock, const int msg_id, const char* msg, const uint32_t len) {
OnReqConnect(ProtocolType::Tcp, sock, msg_id, msg, len);
co_return;
}

void LogicModule::OnReqConnectWithWS(const socket_t sock, const int msg_id, const char* msg, const uint32_t len) {
Expand Down
12 changes: 7 additions & 5 deletions src/node/proxy/logic/logic_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,23 @@ class LogicModule : public ILogicModule {
virtual bool Start();
virtual bool Destory();
virtual bool Update();

virtual bool AfterStart();

protected:
void OnOtherMessage(const socket_t sock, const int msg_id, const char *msg, const uint32_t len);

void OnHeartbeat(const socket_t sock, const int msg_id, const char *msg, const uint32_t len);
void OnReqTestProxy(const socket_t sock, const int msg_id, const char* msg, const uint32_t len);

void OnReqConnectWithTcp(const socket_t sock, const int msg_id, const char *msg, const uint32_t len);
Coroutine<bool> OnReqConnectWithTcp(const socket_t sock, const int msg_id, const char *msg, const uint32_t len);
void OnReqConnectWithWS(const socket_t sock, const int msg_id, const char* msg, const uint32_t len);
void OnReqConnect(ProtocolType type, const socket_t sock, const int msg_id, const char* msg, const uint32_t len);

virtual void OnAckConnectVerify(const int msg_id, const char *msg, const uint32_t len) override;
void OnReqEnter(const socket_t sock, const int msg_id, const char *msg, const uint32_t len);

void NnReqMinWorkloadNodeInfo();
void OnNnAckMinWorkloadNodeInfo(const socket_t sock, const int msg_id, const char* msg, const uint32_t len);

bool TryEnter(string guid);
int EnterSuccessEvent(const string account_id, const string player_id);
bool SendToPlayer(string player_id, const int msg_id, const string& data);
Expand All @@ -59,8 +61,6 @@ class LogicModule : public ILogicModule {
PlayerHeatbeatTimeout,
};



struct KeepAlive {
ProtocolType protocol_type = ProtocolType::Tcp;
string account;
Expand Down Expand Up @@ -89,6 +89,8 @@ class LogicModule : public ILogicModule {

map<INT64, Session> sessions_;

map<int, int> min_workload_nodes_; // min workload nodes, key: node type, value: node id

protected:
ILogModule *m_log_;
IClassModule *m_class_;
Expand Down
4 changes: 2 additions & 2 deletions src/proto/micro.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ package rpc;
// GameLobbyRPC RPC 15000 ~ 16000
enum MicroRPC {
MICRO_RPC_NONE = 0;
REQ_CHAT = 1;
ACK_CHAT = 2;
CN_REQ_CHAT = 1;
NC_ACK_CHAT = 2;
}
5 changes: 4 additions & 1 deletion src/squick/plugin/net/i_net.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include <memory>
#include <squick/core/guid.h>
#include <vector>

#include "coroutine.h"
#if PLATFORM == PLATFORM_WIN
#include <WinSock2.h>
#elif PLATFORM == PLATFORM_APPLE || PLATFORM == PLATFORM_LINUX || PLATFORM == PLATFORM_ANDROID
Expand Down Expand Up @@ -201,6 +201,9 @@ typedef std::shared_ptr<NET_EVENT_FUNCTOR> NET_EVENT_FUNCTOR_PTR;
typedef std::function<void(int severity, const char *msg)> NET_EVENT_LOG_FUNCTOR;
typedef std::shared_ptr<NET_EVENT_LOG_FUNCTOR> NET_EVENT_LOG_FUNCTOR_PTR;

typedef std::function<Coroutine<bool>(const socket_t sock, const int msg_id, const char* msg, const uint32_t len)> NET_CORO_RECEIVE_FUNCTOR;
typedef std::shared_ptr<NET_CORO_RECEIVE_FUNCTOR> NET_CORO_RECEIVE_FUNCTOR_PTR;

class NetObject {
public:
NetObject(INet *pNet, socket_t sock, sockaddr_in &addr, void *pBev) {
Expand Down
13 changes: 13 additions & 0 deletions src/squick/plugin/net/i_net_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#include <squick/core/vector4.h>
#include <squick/plugin/log/i_log_module.h>
#include <struct/struct.h>
#include "coroutine.h"

#define NET_COROTINE_MAX_SURVIVAL_TIME 10

////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -136,6 +139,15 @@ class INetModule : public IModule {
return AddReceiveCallBack(functorPtr);
}

template <typename BaseType>
bool AddReceiveCallBack(const int msg_id, BaseType* pBase, Coroutine<bool> (BaseType::* handleReceiver)(const socket_t, const int, const char*, const uint32_t)) {
NET_CORO_RECEIVE_FUNCTOR functor =
std::bind(handleReceiver, pBase, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);
NET_CORO_RECEIVE_FUNCTOR_PTR functorPtr(new NET_CORO_RECEIVE_FUNCTOR(functor));

return AddReceiveCallBack(msg_id, functorPtr);
}

template <typename BaseType> bool AddEventCallBack(BaseType *pBase, void (BaseType::*handler)(const socket_t, const SQUICK_NET_EVENT, INet *)) {
NET_EVENT_FUNCTOR functor = std::bind(handler, pBase, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
NET_EVENT_FUNCTOR_PTR functorPtr(new NET_EVENT_FUNCTOR(functor));
Expand Down Expand Up @@ -223,6 +235,7 @@ class INetModule : public IModule {
virtual void RemoveReceiveCallBack(const int msg_id) = 0;

virtual bool AddReceiveCallBack(const int msg_id, const NET_RECEIVE_FUNCTOR_PTR &cb) = 0;
virtual bool AddReceiveCallBack(const int msg_id, const NET_CORO_RECEIVE_FUNCTOR_PTR& cb) = 0;

virtual bool AddReceiveCallBack(const NET_RECEIVE_FUNCTOR_PTR &cb) = 0;

Expand Down
3 changes: 1 addition & 2 deletions src/squick/plugin/net/net_client_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
#include <squick/core/consistent_hash.h>
#include <squick/core/i_plugin_manager.h>
#include <squick/core/queue.h>
#include <squick/plugin/log/i_log_module.h>
#include <squick/plugin/log/export.h>

#include "i_net_client_module.h"
#include "i_net_module.h"

class NetClientModule : public INetClientModule {
public:
Expand Down
68 changes: 68 additions & 0 deletions src/squick/plugin/net/net_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ bool NetModule::AddReceiveCallBack(const int msg_id, const NET_RECEIVE_FUNCTOR_P
return true;
}

bool NetModule::AddReceiveCallBack(const int msg_id, const NET_CORO_RECEIVE_FUNCTOR_PTR& cb) {
if (coro_funcs_.find(msg_id) == coro_funcs_.end()) {
std::list<NET_CORO_RECEIVE_FUNCTOR_PTR> xList;
xList.push_back(cb);
coro_funcs_.insert(std::map<int, std::list<NET_CORO_RECEIVE_FUNCTOR_PTR>>::value_type(msg_id, xList));
return true;
}

std::map<int, std::list<NET_CORO_RECEIVE_FUNCTOR_PTR>>::iterator it = coro_funcs_.find(msg_id);
it->second.push_back(cb);

return true;
}

bool NetModule::AddReceiveCallBack(const NET_RECEIVE_FUNCTOR_PTR &cb) {
mxCallBackList.push_back(cb);

Expand All @@ -90,6 +104,12 @@ bool NetModule::Update() {

m_pNet->Update();

time_t now_time = time(nullptr);
if (now_time - last_check_coroutines_time_ > 0) {
FixCoroutines(now_time);
last_check_coroutines_time_ = now_time;
}

return true;
}

Expand Down Expand Up @@ -291,6 +311,37 @@ bool NetModule::SendMsgPB(const uint16_t msg_id, const std::string &strData, con

INet *NetModule::GetNet() { return m_pNet; }

int NetModule::FixCoroutines(time_t now_time) {
// Check coroutine states and free coroutines objects
int num = 0;
for (auto iter = coroutines_.begin(); iter != coroutines_.end();) {
auto now_iter = iter;
++iter;
auto co = *now_iter;
if (co.GetHandle().done()) {
co.GetHandle().destroy();
#ifdef SQUICK_DEV
dout << "Destoy coroutine: " << co.GetHandle().address() << endl;
#endif
coroutines_.erase(now_iter);
num++;
continue;
}

if (now_time - co.GetStartTime() > NET_COROTINE_MAX_SURVIVAL_TIME) {
#ifdef SQUICK_DEV
dout << " This corotine has time out: " << co.GetHandle().address() << std::endl;
#endif
// do not destroy
if (!co.GetHandle().done()) {
co.GetHandle().resume();
}
}
}

return num;
}

void NetModule::OnReceiveNetPack(const socket_t sock, const int msg_id, const char *msg, const uint32_t len) {
//m_log_->LogInfo(pm_->GetAppName() + std::to_string(pm_->GetAppID()) + " NetModule::OnReceiveNetPack " +
//std::to_string(msg_id), __FILE__, __LINE__);
Expand All @@ -301,6 +352,23 @@ void NetModule::OnReceiveNetPack(const socket_t sock, const int msg_id, const ch
SQUICK_CRASH_TRY
#endif

// corotine first handle

auto co_it = coro_funcs_.find(msg_id);
if (coro_funcs_.end() != co_it) {
std::list<NET_CORO_RECEIVE_FUNCTOR_PTR>& funcs = co_it->second;
for (auto func_iter = funcs.begin(); func_iter != funcs.end(); ++func_iter) {
NET_CORO_RECEIVE_FUNCTOR_PTR& pFunPtr = *func_iter;
NET_CORO_RECEIVE_FUNCTOR* pFunc = pFunPtr.get();
auto co = pFunc->operator()(sock, msg_id, msg, len);
coroutines_.push_back(co);
#ifdef SQUICK_DEV
dout << "Net Module create a new coroutine: " << co.GetHandle().address() << endl;
#endif
co.GetHandle().resume();
}
}

std::map<int, std::list<NET_RECEIVE_FUNCTOR_PTR>>::iterator it = mxReceiveCallBack.find(msg_id);
if (mxReceiveCallBack.end() != it) {
std::list<NET_RECEIVE_FUNCTOR_PTR> &xFunList = it->second;
Expand Down
13 changes: 7 additions & 6 deletions src/squick/plugin/net/net_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

#include "i_net_module.h"
#include <squick/core/exception.h>
#include <squick/core/i_plugin_manager.h>
#include <squick/plugin/log/i_log_module.h>
#include <squick/core/base.h>
#include <squick/plugin/log/export.h>

class NetModule : public INetModule {
public:
Expand All @@ -32,6 +32,7 @@ class NetModule : public INetModule {

virtual void RemoveReceiveCallBack(const int msg_id);
virtual bool AddReceiveCallBack(const int msg_id, const NET_RECEIVE_FUNCTOR_PTR &cb);
virtual bool AddReceiveCallBack(const int msg_id, const NET_CORO_RECEIVE_FUNCTOR_PTR& cb);
virtual bool AddReceiveCallBack(const NET_RECEIVE_FUNCTOR_PTR &cb);
virtual bool AddEventCallBack(const NET_EVENT_FUNCTOR_PTR &cb);
virtual bool SendMsgWithOutHead(const int msg_id, const std::string &msg, const socket_t sock);
Expand All @@ -50,10 +51,8 @@ class NetModule : public INetModule {

protected:
void OnReceiveNetPack(const socket_t sock, const int msg_id, const char *msg, const uint32_t len);

void OnSocketNetEvent(const socket_t sock, const SQUICK_NET_EVENT eEvent, INet *pNet);

void KeepAlive();
int FixCoroutines(time_t now_time);

private:
unsigned int mnBufferSize;
Expand All @@ -62,6 +61,8 @@ class NetModule : public INetModule {
std::map<int, std::list<NET_RECEIVE_FUNCTOR_PTR>> mxReceiveCallBack;
std::list<NET_EVENT_FUNCTOR_PTR> mxEventCallBackList;
std::list<NET_RECEIVE_FUNCTOR_PTR> mxCallBackList;

std::map<int, std::list< NET_CORO_RECEIVE_FUNCTOR_PTR>> coro_funcs_;
list<Coroutine<bool>> coroutines_;
ILogModule *m_log_;
time_t last_check_coroutines_time_ = 0;
};

0 comments on commit b3f7477

Please sign in to comment.