Skip to content

Commit

Permalink
There is a change that a TCPServer may have accepted a new connection
Browse files Browse the repository at this point in the history
before the working thread pool started. Suppose that:

Time 1 : TCPServer::listener_::Listen
Time 2 : TCPServer::tpool_ is starting
Time 3 : A new connection is comming
Time 4 : TCPServer::HandleNewConn is invoked and we assert that
         tpool_ must be running that would crash

So we changed the implementation of Listener::Listen() to two method :
    Listen() and Accept()

And we invoke Listener::Listen() in TCPServer::Init() and then invoke
TCPServer::Start() where we start thread pool firstly, and after the
thread pool has started, we invoke Listener::Accept().

#30
  • Loading branch information
zieckey committed Apr 12, 2017
1 parent c9c0509 commit b8bd2a9
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 38 deletions.
48 changes: 32 additions & 16 deletions evpp/event_loop_thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ namespace evpp {

EventLoopThreadPool::EventLoopThreadPool(EventLoop* base_loop, uint32_t thread_number)
: base_loop_(base_loop),
started_(false),
thread_num_(thread_number),
next_(0) {}
thread_num_(thread_number) {}

EventLoopThreadPool::~EventLoopThreadPool() {
assert(thread_num_ == threads_.size());
Expand All @@ -21,28 +19,38 @@ EventLoopThreadPool::~EventLoopThreadPool() {
}

bool EventLoopThreadPool::Start(bool wait_until_thread_started) {
assert(!started_);
assert(!started_.load());
if (started_.load()) {
return true;
}

if (started_) {
if (thread_num_ == 0) {
started_.store(true);
return true;
}

std::shared_ptr<std::atomic<uint32_t>> count(new std::atomic<uint32_t>(0));
for (uint32_t i = 0; i < thread_num_; ++i) {
std::stringstream ss;
ss << "EventLoopThreadPool-thread-" << i << "th";
auto fn = [this, count]() {
LOG_INFO << "this=" << this << " a working thread started tid=" << std::this_thread::get_id();
this->OnThreadStarted(count->fetch_add(1) + 1);
};
EventLoopThreadPtr t(new EventLoopThread());

if (!t->Start(wait_until_thread_started)) {
if (!t->Start(wait_until_thread_started, fn)) {
//FIXME error process
LOG_ERROR << "start thread failed!";
return false;
}

std::stringstream ss;
ss << "EventLoopThreadPool-thread-" << i << "th";
t->SetName(ss.str());
threads_.push_back(t);
}

started_ = true;
// when all the working thread have started,
// started_ will be stored with true in method OnThreadStarted

return true;
}

Expand All @@ -58,11 +66,11 @@ void EventLoopThreadPool::Stop(bool wait_thread_exit) {
}
}

started_ = false;
started_.store(false);
}

bool EventLoopThreadPool::IsRunning() const {
if (!started_) {
if (!started_.load()) {
return false;
}

Expand All @@ -74,12 +82,12 @@ bool EventLoopThreadPool::IsRunning() const {
}
}

return started_;
return started_.load();
}

bool EventLoopThreadPool::IsStopped() const {
if (thread_num_ == 0) {
return !started_;
return !started_.load();
}

for (uint32_t i = 0; i < thread_num_; ++i) {
Expand All @@ -96,7 +104,7 @@ bool EventLoopThreadPool::IsStopped() const {
EventLoop* EventLoopThreadPool::GetNextLoop() {
EventLoop* loop = base_loop_;

if (started_ && !threads_.empty()) {
if (started_.load() && !threads_.empty()) {
// No need to lock here
int64_t next = next_.fetch_add(1);
next = next % threads_.size();
Expand All @@ -109,7 +117,7 @@ EventLoop* EventLoopThreadPool::GetNextLoop() {
EventLoop* EventLoopThreadPool::GetNextLoopWithHash(uint64_t hash) {
EventLoop* loop = base_loop_;

if (started_ && !threads_.empty()) {
if (started_.load() && !threads_.empty()) {
uint64_t next = hash % threads_.size();
loop = (threads_[next])->loop();
}
Expand All @@ -121,4 +129,12 @@ uint32_t EventLoopThreadPool::thread_num() const {
return thread_num_;
}

void EventLoopThreadPool::OnThreadStarted(uint32_t count) {
LOG_INFO << "this=" << this << " tid=" << std::this_thread::get_id() << " count=" << count << " started.";
if (count == thread_num_) {
LOG_INFO << "this=" << this << " thread pool totally started.";
started_.store(true);
}
}

}
9 changes: 6 additions & 3 deletions evpp/event_loop_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ class EVPP_EXPORT EventLoopThreadPool {
uint32_t thread_num() const;

private:
void OnThreadStarted(uint32_t count);
private:

EventLoop* base_loop_;
bool started_;
uint32_t thread_num_;
std::atomic<int64_t> next_;
std::atomic<bool> started_ = { false };
uint32_t thread_num_ = 0;
std::atomic<int64_t> next_ = { 0 };

typedef std::shared_ptr<EventLoopThread> EventLoopThreadPtr;
std::vector<EventLoopThreadPtr> threads_;
Expand Down
13 changes: 8 additions & 5 deletions evpp/listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace evpp {
Listener::Listener(EventLoop* l, const std::string& addr)
: fd_(-1), loop_(l), listening_(false), addr_(addr) {}
: loop_(l), addr_(addr) {}

Listener::~Listener() {
LOG_TRACE << "Listener::~Listener fd=" << chan_->fd();
Expand All @@ -20,31 +20,35 @@ Listener::~Listener() {
void Listener::Listen() {
fd_ = sock::CreateNonblockingSocket();
if (fd_ < 0) {
int serrno = errno;
LOG_FATAL << "Create a nonblocking socket failed " << strerror(serrno);
return;
}

struct sockaddr_in addr = sock::ParseFromIPPort(addr_.data());
int ret = ::bind(fd_, sock::sockaddr_cast(&addr), static_cast<socklen_t>(sizeof addr));
int serrno = errno;
if (ret < 0) {
int serrno = errno;
LOG_FATAL << "bind error :" << strerror(serrno);
}

ret = ::listen(fd_, SOMAXCONN);
if (ret < 0) {
serrno = errno;
int serrno = errno;
LOG_FATAL << "Listen failed " << strerror(serrno);
}
}

void Listener::Accept() {
chan_.reset(new FdChannel(loop_, fd_, true, false));
chan_->SetReadCallback(std::bind(&Listener::HandleAccept, this));
loop_->RunInLoop(std::bind(&FdChannel::AttachToLoop, chan_.get()));
listening_ = true;
LOG_INFO << "TCPServer is running at " << addr_;
}

void Listener::HandleAccept() {
LOG_INFO << __FUNCTION__ << " New connection";
assert(loop_->IsInLoopThread());
struct sockaddr_storage ss;
socklen_t addrlen = sizeof(ss);
int nfd = -1;
Expand Down Expand Up @@ -83,6 +87,5 @@ void Listener::Stop() {
assert(loop_->IsInLoopThread());
chan_->DisableAllEvent();
chan_->Close();
listening_ = false;
}
}
11 changes: 6 additions & 5 deletions evpp/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,24 @@ class EVPP_EXPORT Listener {
Listener(EventLoop* loop, const std::string& addr/*local listening address : ip:port*/);
~Listener();

// socket listen
void Listen();

// nonblocking accept
void Accept();

void Stop();

void SetNewConnectionCallback(NewConnectionCallback cb) {
new_conn_fn_ = cb;
}

bool listening() const {
return listening_;
}
private:
void HandleAccept();

private:
int fd_;// The listening socket fd
int fd_ = -1;// The listening socket fd
EventLoop* loop_;
bool listening_;
std::string addr_;
std::unique_ptr<FdChannel> chan_;
NewConnectionCallback new_conn_fn_;
Expand Down
7 changes: 4 additions & 3 deletions evpp/server_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ class ServerStatus {
enum Status {
kNull = 0,
kInitialized = 1,
kRunning = 2,
kStopping = 3,
kStopped = 4,
kStarting = 2,
kRunning = 3,
kStopping = 4,
kStopped = 5,
};

std::string ToString() const {
Expand Down
23 changes: 17 additions & 6 deletions evpp/tcp_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "evpp/tcp_server.h"
#include "evpp/listener.h"
#include "evpp/tcp_conn.h"
#include "evpp/libevent_headers.h"

namespace evpp {
TCPServer::TCPServer(EventLoop* loop,
Expand Down Expand Up @@ -30,21 +31,24 @@ bool TCPServer::Init() {
assert(status_ == kNull);
listener_.reset(new Listener(loop_, listen_addr_));
listener_->Listen();
listener_->SetNewConnectionCallback(
std::bind(&TCPServer::HandleNewConn,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
status_.store(kInitialized);
return true;
}

bool TCPServer::Start() {
assert(status_ == kInitialized);
status_.store(kStarting);
assert(listener_.get());
bool rc = tpool_->Start(true);
if (rc) {
assert(tpool_->IsRunning());
listener_->SetNewConnectionCallback(
std::bind(&TCPServer::HandleNewConn,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
listener_->Accept();
status_.store(kRunning);
}
return rc;
Expand Down Expand Up @@ -100,6 +104,7 @@ void TCPServer::StopInLoop() {
for (auto& c : connections_) {
c.second->Close();
}

// The working threads will be stopped after all the connections closed.
}

Expand All @@ -110,6 +115,12 @@ void TCPServer::HandleNewConn(int sockfd,
const std::string& remote_addr/*ip:port*/,
const struct sockaddr_in* raddr) {
assert(loop_->IsInLoopThread());
if (status_.load() == kStopping) {
LOG_WARN << "The server is at stopping status. Discard this socket fd=" << sockfd << " remote_addr=" << remote_addr;
EVUTIL_CLOSESOCKET(sockfd);
return;
}

assert(IsRunning());
EventLoop* io_loop = GetNextLoop(raddr);
std::string n = name_ + "-" + remote_addr + "#" + std::to_string(next_conn_id_++); // TODO use string buffer
Expand Down
1 change: 1 addition & 0 deletions evpp/tcp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <map>

namespace evpp {

class Listener;

class EVPP_EXPORT TCPServer : public ThreadDispatchPolicy, public ServerStatus {
Expand Down
24 changes: 24 additions & 0 deletions test/event_loop_thread_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,27 @@ TEST_UNIT(testEventLoopThreadPool) {
H_TEST_ASSERT(evpp::GetActiveEventCount() == 0);
}


TEST_UNIT(testEventLoopThreadPool2) {
std::unique_ptr<evpp::EventLoopThread> loop(new evpp::EventLoopThread);
loop->Start(true);
assert(loop->IsRunning());

int thread_num = 24;
for (int i = 0; i < thread_num; i++) {
std::unique_ptr<evpp::EventLoopThreadPool> pool(new evpp::EventLoopThreadPool(loop->loop(), thread_num));
auto rc = pool->Start(true);
H_TEST_ASSERT(rc);
H_TEST_ASSERT(pool->IsRunning());
pool->Stop(true);
H_TEST_ASSERT(pool->IsStopped());
pool.reset();
}

assert(loop->IsRunning());
loop->Stop(true);
assert(loop->IsStopped());
loop.reset();
H_TEST_ASSERT(evpp::GetActiveEventCount() == 0);
}

0 comments on commit b8bd2a9

Please sign in to comment.