Skip to content

Commit

Permalink
[ISSUE #137] split TcpRemotingClient::m_ioService into m_dispatchServ…
Browse files Browse the repository at this point in the history
…ice and m_handleService
  • Loading branch information
jonnxu authored Jul 8, 2019
2 parents 94d97a0 + df4d218 commit a14580f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 23 deletions.
53 changes: 35 additions & 18 deletions src/transport/TcpRemotingClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,29 @@ namespace rocketmq {

//<!************************************************************************
TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout)
: m_pullThreadNum(pullThreadNum),
: m_dispatchThreadNum(1),
m_pullThreadNum(pullThreadNum),
m_tcpConnectTimeout(tcpConnectTimeout),
m_tcpTransportTryLockTimeout(tcpTransportTryLockTimeout),
m_namesrvIndex(0),
m_ioServiceWork(m_ioService) {
m_dispatchServiceWork(m_dispatchService),
m_handleServiceWork(m_handleService) {
#if !defined(WIN32) && !defined(__APPLE__)
string taskName = UtilAll::getProcessName();
prctl(PR_SET_NAME, "DispatchTP", 0, 0, 0);
#endif
for (int i = 0; i != m_dispatchThreadNum; ++i) {
m_dispatchThreadPool.create_thread(boost::bind(&boost::asio::io_service::run, &m_dispatchService));
}
#if !defined(WIN32) && !defined(__APPLE__)
prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
#endif

#if !defined(WIN32) && !defined(__APPLE__)
prctl(PR_SET_NAME, "NetworkTP", 0, 0, 0);
#endif
for (int i = 0; i != m_pullThreadNum; ++i) {
m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &m_ioService));
m_handleThreadPool.create_thread(boost::bind(&boost::asio::io_service::run, &m_handleService));
}
#if !defined(WIN32) && !defined(__APPLE__)
prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
Expand All @@ -48,7 +60,7 @@ TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeo
LOG_INFO("m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, m_pullThreadNum:%d", m_tcpConnectTimeout,
m_tcpTransportTryLockTimeout, m_pullThreadNum);

m_async_service_thread.reset(new boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this)));
m_timerServiceThread.reset(new boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this)));
}

void TcpRemotingClient::boost_asio_work() {
Expand All @@ -59,9 +71,9 @@ void TcpRemotingClient::boost_asio_work() {
#endif

// avoid async io service stops after first timer timeout callback
boost::asio::io_service::work work(m_async_ioService);
boost::asio::io_service::work work(m_timerService);

m_async_ioService.run();
m_timerService.run();
}

TcpRemotingClient::~TcpRemotingClient() {
Expand All @@ -75,20 +87,24 @@ TcpRemotingClient::~TcpRemotingClient() {
void TcpRemotingClient::stopAllTcpTransportThread() {
LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin");

m_async_ioService.stop();
m_async_service_thread->interrupt();
m_async_service_thread->join();
m_timerService.stop();
m_timerServiceThread->interrupt();
m_timerServiceThread->join();
removeAllTimerCallback();

{
std::lock_guard<std::timed_mutex> lock(m_tcpTableLock);
for (const auto& trans : m_tcpTable) {
trans.second->disconnect(trans.first);
}
m_tcpTable.clear();
}

m_ioService.stop();
m_threadpool.join_all();
m_handleService.stop();
m_handleThreadPool.join_all();

m_dispatchService.stop();
m_dispatchThreadPool.join_all();

{
std::lock_guard<std::mutex> lock(m_futureTableLock);
Expand All @@ -98,7 +114,7 @@ void TcpRemotingClient::stopAllTcpTransportThread() {
}
}

LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End");
LOG_ERROR("TcpRemotingClient::stopAllTcpTransportThread End, m_tcpTable:%lu", m_tcpTable.size());
}

void TcpRemotingClient::updateNameServerAddressList(const string& addrs) {
Expand Down Expand Up @@ -226,13 +242,13 @@ bool TcpRemotingClient::invokeAsync(const string& addr,

if (callback) {
boost::asio::deadline_timer* t =
new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(timeoutMillis));
new boost::asio::deadline_timer(m_timerService, boost::posix_time::milliseconds(timeoutMillis));
addTimerCallback(t, opaque);
t->async_wait(
boost::bind(&TcpRemotingClient::handleAsyncRequestTimeout, this, boost::asio::placeholders::error, opaque));
}

// Even if send failed, asyncTimerThread will trigger next pull request or report send msg failed
// even if send failed, asyncTimerThread will trigger next pull request or report send msg failed
if (SendCommand(pTcp, request)) {
LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d", addr.c_str(), code, opaque);
responseFuture->setSendRequestOK(true);
Expand Down Expand Up @@ -453,7 +469,7 @@ void TcpRemotingClient::static_messageReceived(void* context, const MemoryBlock&
}

void TcpRemotingClient::messageReceived(const MemoryBlock& mem, const string& addr) {
m_ioService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem, addr));
m_dispatchService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem, addr));
}

void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr) {
Expand Down Expand Up @@ -482,7 +498,7 @@ void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr)
LOG_DEBUG("find_response opaque:%d", opaque);
processResponseCommand(pRespondCmd, pFuture);
} else {
processRequestCommand(pRespondCmd, addr);
m_handleService.post(boost::bind(&TcpRemotingClient::processRequestCommand, this, pRespondCmd, addr));
}
}

Expand All @@ -503,7 +519,8 @@ void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd, std::share

if (pFuture->getAsyncFlag()) {
cancelTimerCallback(opaque);
pFuture->invokeCompleteCallback();

m_handleService.post(boost::bind(&ResponseFuture::invokeCompleteCallback, pFuture));
}
}

Expand All @@ -520,7 +537,7 @@ void TcpRemotingClient::handleAsyncRequestTimeout(const boost::system::error_cod
LOG_ERROR("no response got for opaque:%d", opaque);
eraseTimerCallback(opaque);
if (pFuture->getAsyncCallbackWrap()) {
pFuture->invokeExceptionCallback();
m_handleService.post(boost::bind(&ResponseFuture::invokeExceptionCallback, pFuture));
}
}
}
Expand Down
15 changes: 10 additions & 5 deletions src/transport/TcpRemotingClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class TcpRemotingClient {
AsyncTimerMap m_asyncTimerTable;
std::mutex m_asyncTimerTableLock;

int m_dispatchThreadNum;
int m_pullThreadNum;
uint64_t m_tcpConnectTimeout; // ms
uint64_t m_tcpTransportTryLockTimeout; // s
Expand All @@ -119,12 +120,16 @@ class TcpRemotingClient {
string m_namesrvAddrChoosed;
unsigned int m_namesrvIndex;

boost::asio::io_service m_ioService;
boost::asio::io_service::work m_ioServiceWork;
boost::thread_group m_threadpool;
boost::asio::io_service m_dispatchService;
boost::asio::io_service::work m_dispatchServiceWork;
boost::thread_group m_dispatchThreadPool;

boost::asio::io_service m_async_ioService;
unique_ptr<boost::thread> m_async_service_thread;
boost::asio::io_service m_handleService;
boost::asio::io_service::work m_handleServiceWork;
boost::thread_group m_handleThreadPool;

boost::asio::io_service m_timerService;
unique_ptr<boost::thread> m_timerServiceThread;
};

//<!************************************************************************
Expand Down

0 comments on commit a14580f

Please sign in to comment.