Skip to content

1 建立Tars RPC Server 异步模型框架

Alex edited this page Jan 7, 2019 · 1 revision

1 RPC server

1.1 建立简单的epoll模型,完成与client端的收发测试

获取源码

编译源码:直接执行./run.sh,会生成tar-demo与client

执行代码:(1)执行./tar-demo (2)另外一个窗口执行./client, 输入测试数据 (3)查看两个窗口的结果

运行tar-demo:

tar-demo

运行client:

client

  • tc_epoller.h 里的TC_Epoller是对epoll各个原生函数的封装

  • tc_epoll_server.h里的NetThread负责TC_Epoller的调度。

epoll模型示意图

与平常使用习惯不同的地方,Tars里用epoll_event ev中的ev.data.u64作为ET_LISTEN、ET_CLOSE、ET_NOTIFY、ET_NET类型标识

1.2 重构代码,封装原生socket

获取源码

做完1.1,就已经嗅到了代码中的“坏味道”,NetThread中socket部分的代码是可以复用的,所以可以单独拿出来。

tc_socket.h中的TC_Socket是对原生socket的封装,注意同步修改tc_epoll_server中的相应代码

1.3 增加锁,条件变量和队列的封装,为多线程做准备

获取源码

tc_thread_mutex.h 是对pthread_mutex_t的封装

tc_thread_cond.h 是对pthread_cond_t的封装

tc_monitor.h 与 tc_lock.h 提供了对tc_thread_cond.h与tc_thread_mutex.h的封装

简单梳理下使用逻辑:

一般实际使用的是TC_ThreadLock::Lock,即:

typedef TC_LockT<TC_Monitor<T, P> > Lock;

这里面有两个元素,一是TC_LockT模板类(在tc_lock.h中定义),一是TC_Monitor模板类

TC_Monitor的定义如下:

typedef TC_Monitor<TC_ThreadMutex, TC_ThreadCond> TC_ThreadLock;

举例说明使用逻辑:

TC_LockT的构造函数调用了TC_Monitor的lock()方法:

TC_LockT(const T& mutex) : _mutex(mutex)  

{  
    //_mutex为TC_Monitor  
    _mutex.lock();
    _acquired = true;  
}

TC_Monitor的lock()方法调用了TC_ThreadMutex的lock()方法:

TC_LockT(const T& mutex) : _mutex(mutex)
{
    //_mutex为TC_ThreadMutex
    _mutex.lock();
    _acquired = true;
}

TC_ThreadMutex的lock()方法调用了原生的pthread_mutex_lock

void TC_ThreadMutex::lock() const
{
    int rc = pthread_mutex_lock(&_mutex);
    if(rc != 0)
    {
        if(rc == EDEADLK)
        {
            cout<<"[TC_ThreadMutex::lock] pthread_mutex_lock dead lock error"<<rc<<endl;
        }
        else
        {
            cout<<"[TC_ThreadMutex::lock] pthread_mutex_lock error"<<rc<<endl;
        }
    }
}

1.4 建立多线程模型,主线程NetThread负责请求接受和结果发送,Handle线程负责业务逻辑处理

获取源码

1.3的模型只有一个线程,请求的接收、加工处理、结果的发送都在同一个线程中完成,并不能充分利用io和cpu。所以我们做第一步改进,将请求的接收与结果的发送放在NetThread线程中,将请求的逻辑处理放在Handle线程中,两个线程沟通的桥梁是两个队列。如下图所示:

多线程处理逻辑图

  • 主线程NetThread将客户端请求放入接收队列_rbuffer,Handle线程从_rbuffer中读取请求进行处理

  • Handle线程将处理完的结果放入发送队列_sbuffer,主线程NetThread从_sbuffer中读取结果返回给客户端

在这一节中,我们增加了TC_Thread类作为线程的封装,以便于扩展多线程;将NetThread和Handle两个类放到TC_EpollServer类中,以便于两个线程间的交互。

1.5 继续重构,增加BindAdapter保管ip和port,同时引入标准库智能指针

获取源码

NetThread类在可预见的未来方法会越来越多,所以我们慢慢把一些职责从NetThread中分离出来,作为NetThread的依赖存在。

新增的BindAdapter类接管了原NetThread中的int bind(string& ip, int& port);方法,把对IP和PORT的处理放到了TC_Endpoint中。

insertRecvQueue和waitForRecvQueue出现在了BindAdapter中,但这节我们还没有使用到,后面会用他们替换NetThread中的insertRecvQueue和waitForRecvQueue

1.6 增加Handle数目,完善1接收,n处理模型

获取源码

  • 执行:./tar-demo

  • 打开另一个窗口执行:./client 连续快速输入多个test(回车)

  • 可以在tar-demo窗口观察到类似Handle thread id is 139941899609856这样的信息,并且Handle thread id会有变化

1.4节中我们解耦了线程的收发与业务逻辑处理,一个NetThread线程负责收发,一个Handle线程负责业务逻辑处理。现在又有新需求了,我们认为业务处理逻辑比较复杂,希望有多个Handle线程处理来自NetThread的请求。因为有之前的基础,仅仅需要修改main.cpp中的几行代码就能满足上面的需求。模型示意图如下:

1收发N处理模型

1.7 增加Connection,第一步改造,接管accept文件操作符

获取源码

在NetThread类中增加Connection类,接管NetThread类中对accept文件操作符的管理。NetThread中所有跟accept文件操作符有关的函数(例如TC_EpollServer::NetThread::accept(int fd))都需要进行修改

1.8 利用Connection,第二步改造,接管NetThread的发送与接收职责

获取源码

我们用Connection和BindAdapter中的insertRecvQueue与waitForRecvQueue替换掉了NetThread中的对应方法

Connection的recv与send方法接管了NetThread中的原生read和send方法

梳理下主要逻辑:

  1. TC_EpollServer::NetThread::accept(int fd)方法中,Connection保存了accept文件操作符

  2. TC_EpollServer::NetThread::addTcpConnection(TC_EpollServer::NetThread::Connection *cPtr)方法中,建立起唯一编码(uid)与Connection之间的对应关系(_uid_connection[uid] = cPtr)

  3. uid通过_epoller.add传递到TC_EpollServer::NetThread::processNet(const epoll_event &ev)

  4. 在TC_EpollServer::NetThread::processNet(const epoll_event &ev)中uid住在Connection中,存入vRecvData(最后放入了_rbuffer队列)

  5. TC_EpollServer::Handle::handleImp()从_rbuffer拿到recv的uid信息,通过sendResponse放回到NetThread中的_sbuffer中

  6. TC_EpollServer::NetThread::processPipe()从_sbuffer拿取uid信息,通过uid获取Connection信息,通过Connection返回结果给客户端

1.9 NetThread转为线程,N(NetThread)收发,N(Handle)处理模型建立

获取源码

前面小节里,建立了1收发(NetThread),多(N)处理(Handle)的模型,这里再进一步,收发也改为多线程,Connection按照一定规则(getNetThreadOfFd)分到不同NetThread上完成接收,同时send也按照一定规则(getNetThreadOfFd)分配到不同NetThread上完成发送。这样就由原来的1:N变成了N:N模型,如下图所示:

N:N收发处理模型

上图箭头编号表示处理顺序:

  1. NetThread线程启动了4个,NetThread0作为唯一的监听线程存在, 监听request请求

  2. 当有请求进来时候,NetThread0线程将accept操作符封装为Connection

  3. NetThread0线程按照(getNetThreadOfFd)方法将Connection分配给NetThread0、NetThread1、NetThread2、NetThread3

  4. 不同线程读取完请求后放入到同一个接收队列_rbuffer

  5. 多个Handle线程在监听_rbuffer,有请求进来后会有一个Handle拿到请求并处理

  6. Handle处理完后同样按照(getNetThreadOfFd)方法将结果分配给NetThread0、NetThread1、NetThread2、NetThread3

1.10 将Handle中的业务处理逻辑具体化为HelloImp

获取源码

当前模型的业务处理在Handle类中,处理逻辑也是极为简单,将请求字符串复制到结果字符串,实现的是类似"echo"的功能。

本着“对修改关闭,对扩展开放”的原则,我们应该考虑将具体的处理逻辑抽象出来,即Handle中处理代码应该是不变的,具体逻辑代码单独实现,并可以添加到Handle中进行处理。

HelloImp使用机制说明

  • ServantHelperManager作为单例存在,可全局使用

  • 外界编写新的业务逻辑,只需要像HelloImp一样继承Servant,实现doRequest

  • 注册新的业务逻辑,只需要ServantHelperManager的setAdapterServant和addServant两步

  • 实例化业务逻辑是在ServantHelperManager的create中完成的,是个通用的模板,故而放在了ServantHandle的initialize函数中