Skip to content

Commit

Permalink
Fix a thread.join exception coredump because
Browse files Browse the repository at this point in the history
of lamba function use = to capture parameters
which will cause the shared_ptrstd::thread copied
and then we cannot determine when to destruct the thread.

See #29
  • Loading branch information
zieckey committed Apr 11, 2017
1 parent e15e57e commit 0172f08
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 21 deletions.
2 changes: 2 additions & 0 deletions evpp/event_loop_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ void EventLoopThread::Run(const Functor& pre, const Functor& post) {
assert(event_loop_->IsStopped());
LOG_INFO << "this=" << this << " EventLoopThread stopped";
status_ = kStopped;
usleep(10 * 1000 * 1000);
LOG_INFO << "this=" << this << " EventLoopThread stopped. The end";
}

void EventLoopThread::Stop(bool wait_thread_exit) {
Expand Down
46 changes: 28 additions & 18 deletions evpp/http/http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ bool Server::Init(int listen_port) {
lt.thread = std::make_shared<EventLoopThread>();
lt.thread->SetName(std::string("StandaloneHTTPServer-Main-") + std::to_string(listen_port));

lt.hserver = std::make_shared<Service>(lt.thread->loop());
if (!lt.hserver->Listen(listen_port)) {
lt.hservice = std::make_shared<Service>(lt.thread->loop());
if (!lt.hservice->Listen(listen_port)) {
int serrno = errno;
LOG_ERROR << "this=" << this << " http server listen at port " << listen_port << " failed. errno=" << serrno << " " << strerror(serrno);
lt.hserver->Stop();
lt.hservice->Stop();
return false;
}
listen_threads_.push_back(lt);
Expand Down Expand Up @@ -71,34 +71,44 @@ bool Server::AfterFork() {
bool Server::Start() {
std::shared_ptr<std::atomic<int>> exited_listen_thread_count(new std::atomic<int>(0));
bool rc = tpool_->Start(true);
if (!rc) {
LOG_ERROR << "this=" << this << " start thread pool failed.";
return false;
}
for (auto& lt : listen_threads_) {
auto http_close_fn = [=]() {
lt.hserver->Stop();
LOG_INFO << "this=" << this << " http service at 0.0.0.0:" << lt.hserver->port() << " has stopped.";
OnListenThreadExited(exited_listen_thread_count->fetch_add(1) + 1);
auto& hservice = lt.hservice;
auto& lthread = lt.thread;
auto http_close_fn = [hservice, this, exited_listen_thread_count]() {
hservice->Stop();
LOG_INFO << "this=" << this << " http service at 0.0.0.0:" << hservice->port() << " has stopped.";
this->OnListeningThreadExited(exited_listen_thread_count->fetch_add(1) + 1);
};
rc = rc && lt.thread->Start(true,
rc = lthread->Start(true,
EventLoopThread::Functor(),
http_close_fn);
assert(lt.thread->IsRunning());
if (!rc) {
LOG_ERROR << "this=" << this << " start listening thread failed.";
return false;
}

assert(lthread->IsRunning());
for (auto& c : callbacks_) {
auto cb = std::bind(&Server::Dispatch, this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
c.second);
lt.hserver->RegisterHandler(c.first, cb);
hservice->RegisterHandler(c.first, cb);
}
HTTPRequestCallback cb = std::bind(&Server::Dispatch, this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
default_callback_);
lt.hserver->RegisterDefaultHandler(cb);
if (!rc) {
return false;
}
hservice->RegisterDefaultHandler(cb);
}

assert(rc);
while (!IsRunning()) {
usleep(1);
}
Expand Down Expand Up @@ -157,7 +167,7 @@ void Server::Pause() {
for (auto& lt : listen_threads_) {
EventLoop* loop = lt.thread->loop();
auto f = [&lt]() {
lt.hserver->Pause();
lt.hservice->Pause();
};
loop->RunInLoop(f);
}
Expand All @@ -168,7 +178,7 @@ void Server::Continue() {
for (auto& lt : listen_threads_) {
EventLoop* loop = lt.thread->loop();
auto f = [&lt]() {
lt.hserver->Continue();
lt.hservice->Continue();
};
loop->RunInLoop(f);
}
Expand Down Expand Up @@ -271,7 +281,7 @@ EventLoop* Server::GetNextLoop(EventLoop* default_loop, const ContextPtr& ctx) {
#endif
}

void Server::OnListenThreadExited(int exited_listen_thread_count) {
void Server::OnListeningThreadExited(int exited_listen_thread_count) {
LOG_INFO << "this=" << this << " OnListenThreadExited exited_listen_thread_count=" << exited_listen_thread_count << " listen_threads_.size=" << listen_threads_.size();
if (exited_listen_thread_count == int(listen_threads_.size())) {
LOG_INFO << "this=" << this << " stop the working thread pool.";
Expand All @@ -281,7 +291,7 @@ void Server::OnListenThreadExited(int exited_listen_thread_count) {

Service* Server::service(int index) const {
if (index < int(listen_threads_.size())) {
return listen_threads_[index].hserver.get();
return listen_threads_[index].hservice.get();
}

return nullptr;
Expand Down
4 changes: 2 additions & 2 deletions evpp/http/http_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ class EVPP_EXPORT Server : public ThreadDispatchPolicy {
const HTTPRequestCallback& user_callback);

EventLoop* GetNextLoop(EventLoop* default_loop, const ContextPtr& ctx);
void OnListenThreadExited(int exited_listen_thread_count);
void OnListeningThreadExited(int exited_listen_thread_count);
private:
struct ListenThread {
// The listening main thread
std::shared_ptr<EventLoopThread> thread;

// Every listening main thread runs a HTTP Service to listen, receive, dispatch, send response the HTTP request.
std::shared_ptr<Service> hserver;
std::shared_ptr<Service> hservice;
};

std::vector<ListenThread> listen_threads_;
Expand Down
4 changes: 3 additions & 1 deletion test/http_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,17 @@ static void TestAll() {
}


TEST_UNIT(testHTTPServer1) {
TEST_UNIT(testHTTPServer) {
for (int i = 0; i < 5; ++i) {
LOG_INFO << "Running testHTTPServer i=" << i;
evpp::http::Server ph(i);
ph.RegisterDefaultHandler(&DefaultRequestHandler);
ph.RegisterHandler("/push/boot", &RequestHandler);
bool r = ph.Init(g_listening_port) && ph.Start();
H_TEST_ASSERT(r);
TestAll();
ph.Stop(true);
usleep(1000 * 1000);
}
}

Expand Down

0 comments on commit 0172f08

Please sign in to comment.