Skip to content

Commit

Permalink
Fix a race condition bug when TCPServer::Stop. See #26
Browse files Browse the repository at this point in the history
  • Loading branch information
zieckey committed Apr 10, 2017
1 parent 673dfac commit c0e02e9
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 12 deletions.
1 change: 1 addition & 0 deletions evpp/event_loop_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ void EventLoopThread::Stop(bool wait_thread_exit) {
while (!IsStopped()) {
usleep(1);
}

if (thread_->joinable()) {
try {
thread_->join();
Expand Down
4 changes: 2 additions & 2 deletions evpp/event_loop_thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ bool EventLoopThreadPool::IsStopped() const {
EventLoop* EventLoopThreadPool::GetNextLoop() {
EventLoop* loop = base_loop_;

if (!threads_.empty()) {
if (started_ && !threads_.empty()) {
// No need to lock here
int64_t next = next_.fetch_add(1);
next = next % threads_.size();
Expand All @@ -109,7 +109,7 @@ EventLoop* EventLoopThreadPool::GetNextLoop() {
EventLoop* EventLoopThreadPool::GetNextLoopWithHash(uint64_t hash) {
EventLoop* loop = base_loop_;

if (!threads_.empty()) {
if (started_ && !threads_.empty()) {
uint64_t next = hash % threads_.size();
loop = (threads_[next])->event_loop();
}
Expand Down
31 changes: 31 additions & 0 deletions evpp/server_status.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include <atomic>

#include "evpp/inner_pre.h"

namespace evpp {
class ServerStatus {
public:
enum Status {
kNull = 0,
kInitialized = 1,
kRunning = 2,
kStopping = 3,
kStopped = 4,
};

std::string ToString() const {
H_CASE_STRING_BIGIN(status_);
H_CASE_STRING(kNull);
H_CASE_STRING(kInitialized);
H_CASE_STRING(kRunning);
H_CASE_STRING(kStopping);
H_CASE_STRING(kStopped);
H_CASE_STRING_END();
}

protected:
std::atomic<int> status_ = { kNull };
};
}
39 changes: 32 additions & 7 deletions evpp/tcp_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ TCPServer::~TCPServer() {
}

bool TCPServer::Init() {
assert(status_ == kNull);
listener_.reset(new Listener(loop_, listen_addr_));
listener_->Listen();
listener_->SetNewConnectionCallback(
Expand All @@ -35,16 +36,25 @@ bool TCPServer::Init() {
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
status_.store(kInitialized);
return true;
}

bool TCPServer::Start() {
assert(status_ == kInitialized);
assert(listener_.get());
return tpool_->Start(true);
bool rc = tpool_->Start(true);
if (rc) {
status_.store(kRunning);
}
return rc;
}


bool TCPServer::IsRunning() const {
if (status_ != kRunning) {
return false;
}

if (!loop_->IsRunning()) {
return false;
}
Expand All @@ -71,6 +81,8 @@ bool TCPServer::IsStopped() const {
}

void TCPServer::Stop() {
assert(status_ == kRunning);
status_.store(kStopping);
loop_->RunInLoop(std::bind(&TCPServer::StopInLoop, this));
}

Expand All @@ -79,19 +91,26 @@ void TCPServer::StopInLoop() {
listener_->Stop();
listener_.reset();

for (auto& c : connections_) {
c.second->Close();
if (connections_.empty()) {
// Stop all the working threads now.
tpool_->Stop(true);
assert(tpool_->IsStopped());
status_.store(kStopped);
} else {
for (auto& c : connections_) {
c.second->Close();
}
// The working threads will be stopped after all the connections closed.
}

tpool_->Stop(true);
assert(tpool_->IsStopped());
LOG_TRACE << "TCPServer::StopInLoop exited";
LOG_TRACE << "TCPServer::StopInLoop exited, status=" << ToString();
}

void TCPServer::HandleNewConn(int sockfd,
const std::string& remote_addr/*ip:port*/,
const struct sockaddr_in* raddr) {
assert(loop_->IsInLoopThread());
assert(IsRunning());
EventLoop* io_loop = GetNextLoop(raddr);
std::string n = name_ + "-" + remote_addr + "#" + std::to_string(next_conn_id_++); // TODO use string buffer
TCPConnPtr conn(new TCPConn(io_loop, n, sockfd, listen_addr_, remote_addr));
Expand All @@ -117,6 +136,12 @@ void TCPServer::RemoveConnection(const TCPConnPtr& conn) {
LOG_INFO << "TCPServer::RemoveConnection conn=" << conn.get() << " fd="<< conn->fd();
assert(this->loop_->IsInLoopThread());
this->connections_.erase(conn->name());
if (status_ == kStopping && this->connections_.empty()) {
// At last, we stop all the working threads
tpool_->Stop(true);
assert(tpool_->IsStopped());
status_.store(kStopped);
}
};
loop_->RunInLoop(f);
}
Expand Down
6 changes: 4 additions & 2 deletions evpp/tcp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
#include "evpp/event_loop.h"
#include "evpp/event_loop_thread_pool.h"
#include "evpp/tcp_callbacks.h"

#include "evpp/thread_dispatch_policy.h"
#include "evpp/server_status.h"

#include <map>

namespace evpp {
class Listener;

class EVPP_EXPORT TCPServer : public ThreadDispatchPolicy {
class EVPP_EXPORT TCPServer : public ThreadDispatchPolicy, public ServerStatus {
public:
TCPServer(EventLoop* loop,
const std::string& listen_addr/*ip:port*/,
Expand Down Expand Up @@ -57,7 +59,7 @@ class EVPP_EXPORT TCPServer : public ThreadDispatchPolicy {
MessageCallback msg_fn_;

// always in the listening loop thread
uint64_t next_conn_id_;
uint64_t next_conn_id_ = 0;
typedef std::map<std::string/*the name of the connection*/, TCPConnPtr> ConnectionMap;
ConnectionMap connections_;
};
Expand Down
2 changes: 1 addition & 1 deletion evpp/thread_dispatch_policy.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

namespace evpp {
class EVPP_EXPORT ThreadDispatchPolicy {
class ThreadDispatchPolicy {
public:
enum Policy {
kRoundRobin,
Expand Down
5 changes: 5 additions & 0 deletions evpp/udp/udp_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,16 @@ bool Server::Start() {
LOG_ERROR << "MessageHandler DO NOT set!";
return false;
}

for (auto& rt : recv_threads_) {
if (!rt->Run()) {
return false;
}
}

while (!IsRunning()) {
usleep(1);
}
return true;
}

Expand Down
1 change: 1 addition & 0 deletions vsprojects/libevpp.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
<ClInclude Include="..\evpp\log_config.h" />
<ClInclude Include="..\evpp\memmem.h" />
<ClInclude Include="..\evpp\platform_config.h" />
<ClInclude Include="..\evpp\server_status.h" />
<ClInclude Include="..\evpp\slice.h" />
<ClInclude Include="..\evpp\sockets.h" />
<ClInclude Include="..\evpp\sys_addrinfo.h" />
Expand Down
3 changes: 3 additions & 0 deletions vsprojects/libevpp.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -246,5 +246,8 @@
<ClInclude Include="..\evpp\thread_dispatch_policy.h">
<Filter>common</Filter>
</ClInclude>
<ClInclude Include="..\evpp\server_status.h">
<Filter>common</Filter>
</ClInclude>
</ItemGroup>
</Project>

0 comments on commit c0e02e9

Please sign in to comment.