diff --git a/evpp/event_loop_thread.cc b/evpp/event_loop_thread.cc index 16d4a9232..1959aecd6 100644 --- a/evpp/event_loop_thread.cc +++ b/evpp/event_loop_thread.cc @@ -63,6 +63,7 @@ void EventLoopThread::Stop(bool wait_thread_exit) { while (!IsStopped()) { usleep(1); } + if (thread_->joinable()) { try { thread_->join(); diff --git a/evpp/event_loop_thread_pool.cc b/evpp/event_loop_thread_pool.cc index 511fe55a6..a54db0051 100644 --- a/evpp/event_loop_thread_pool.cc +++ b/evpp/event_loop_thread_pool.cc @@ -96,7 +96,7 @@ bool EventLoopThreadPool::IsStopped() const { EventLoop* EventLoopThreadPool::GetNextLoop() { EventLoop* loop = base_loop_; - if (!threads_.empty()) { + if (started_ && !threads_.empty()) { // No need to lock here int64_t next = next_.fetch_add(1); next = next % threads_.size(); @@ -109,7 +109,7 @@ EventLoop* EventLoopThreadPool::GetNextLoop() { EventLoop* EventLoopThreadPool::GetNextLoopWithHash(uint64_t hash) { EventLoop* loop = base_loop_; - if (!threads_.empty()) { + if (started_ && !threads_.empty()) { uint64_t next = hash % threads_.size(); loop = (threads_[next])->event_loop(); } diff --git a/evpp/server_status.h b/evpp/server_status.h new file mode 100644 index 000000000..59e524901 --- /dev/null +++ b/evpp/server_status.h @@ -0,0 +1,31 @@ +#pragma once + +#include + +#include "evpp/inner_pre.h" + +namespace evpp { +class ServerStatus { +public: + enum Status { + kNull = 0, + kInitialized = 1, + kRunning = 2, + kStopping = 3, + kStopped = 4, + }; + + std::string ToString() const { + H_CASE_STRING_BIGIN(status_); + H_CASE_STRING(kNull); + H_CASE_STRING(kInitialized); + H_CASE_STRING(kRunning); + H_CASE_STRING(kStopping); + H_CASE_STRING(kStopped); + H_CASE_STRING_END(); + } + +protected: + std::atomic status_ = { kNull }; +}; +} \ No newline at end of file diff --git a/evpp/tcp_server.cc b/evpp/tcp_server.cc index 73e0a8917..a7bf1d150 100644 --- a/evpp/tcp_server.cc +++ b/evpp/tcp_server.cc @@ -27,6 +27,7 @@ TCPServer::~TCPServer() { } bool TCPServer::Init() { + assert(status_ == kNull); listener_.reset(new Listener(loop_, listen_addr_)); listener_->Listen(); listener_->SetNewConnectionCallback( @@ -35,16 +36,25 @@ bool TCPServer::Init() { std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + status_.store(kInitialized); return true; } bool TCPServer::Start() { + assert(status_ == kInitialized); assert(listener_.get()); - return tpool_->Start(true); + bool rc = tpool_->Start(true); + if (rc) { + status_.store(kRunning); + } + return rc; } - bool TCPServer::IsRunning() const { + if (status_ != kRunning) { + return false; + } + if (!loop_->IsRunning()) { return false; } @@ -71,6 +81,8 @@ bool TCPServer::IsStopped() const { } void TCPServer::Stop() { + assert(status_ == kRunning); + status_.store(kStopping); loop_->RunInLoop(std::bind(&TCPServer::StopInLoop, this)); } @@ -79,19 +91,26 @@ void TCPServer::StopInLoop() { listener_->Stop(); listener_.reset(); - for (auto& c : connections_) { - c.second->Close(); + if (connections_.empty()) { + // Stop all the working threads now. + tpool_->Stop(true); + assert(tpool_->IsStopped()); + status_.store(kStopped); + } else { + for (auto& c : connections_) { + c.second->Close(); + } + // The working threads will be stopped after all the connections closed. } - tpool_->Stop(true); - assert(tpool_->IsStopped()); - LOG_TRACE << "TCPServer::StopInLoop exited"; + LOG_TRACE << "TCPServer::StopInLoop exited, status=" << ToString(); } void TCPServer::HandleNewConn(int sockfd, const std::string& remote_addr/*ip:port*/, const struct sockaddr_in* raddr) { assert(loop_->IsInLoopThread()); + assert(IsRunning()); EventLoop* io_loop = GetNextLoop(raddr); std::string n = name_ + "-" + remote_addr + "#" + std::to_string(next_conn_id_++); // TODO use string buffer TCPConnPtr conn(new TCPConn(io_loop, n, sockfd, listen_addr_, remote_addr)); @@ -117,6 +136,12 @@ void TCPServer::RemoveConnection(const TCPConnPtr& conn) { LOG_INFO << "TCPServer::RemoveConnection conn=" << conn.get() << " fd="<< conn->fd(); assert(this->loop_->IsInLoopThread()); this->connections_.erase(conn->name()); + if (status_ == kStopping && this->connections_.empty()) { + // At last, we stop all the working threads + tpool_->Stop(true); + assert(tpool_->IsStopped()); + status_.store(kStopped); + } }; loop_->RunInLoop(f); } diff --git a/evpp/tcp_server.h b/evpp/tcp_server.h index 189560f57..ef3ffe4b1 100644 --- a/evpp/tcp_server.h +++ b/evpp/tcp_server.h @@ -4,14 +4,16 @@ #include "evpp/event_loop.h" #include "evpp/event_loop_thread_pool.h" #include "evpp/tcp_callbacks.h" + #include "evpp/thread_dispatch_policy.h" +#include "evpp/server_status.h" #include namespace evpp { class Listener; -class EVPP_EXPORT TCPServer : public ThreadDispatchPolicy { +class EVPP_EXPORT TCPServer : public ThreadDispatchPolicy, public ServerStatus { public: TCPServer(EventLoop* loop, const std::string& listen_addr/*ip:port*/, @@ -57,7 +59,7 @@ class EVPP_EXPORT TCPServer : public ThreadDispatchPolicy { MessageCallback msg_fn_; // always in the listening loop thread - uint64_t next_conn_id_; + uint64_t next_conn_id_ = 0; typedef std::map ConnectionMap; ConnectionMap connections_; }; diff --git a/evpp/thread_dispatch_policy.h b/evpp/thread_dispatch_policy.h index e5762c2a4..b0fb7b3c1 100644 --- a/evpp/thread_dispatch_policy.h +++ b/evpp/thread_dispatch_policy.h @@ -1,7 +1,7 @@ #pragma once namespace evpp { -class EVPP_EXPORT ThreadDispatchPolicy { +class ThreadDispatchPolicy { public: enum Policy { kRoundRobin, diff --git a/evpp/udp/udp_server.cc b/evpp/udp/udp_server.cc index f242e1f46..63edff394 100644 --- a/evpp/udp/udp_server.cc +++ b/evpp/udp/udp_server.cc @@ -145,11 +145,16 @@ bool Server::Start() { LOG_ERROR << "MessageHandler DO NOT set!"; return false; } + for (auto& rt : recv_threads_) { if (!rt->Run()) { return false; } } + + while (!IsRunning()) { + usleep(1); + } return true; } diff --git a/vsprojects/libevpp.vcxproj b/vsprojects/libevpp.vcxproj index 850882670..38935a549 100644 --- a/vsprojects/libevpp.vcxproj +++ b/vsprojects/libevpp.vcxproj @@ -68,6 +68,7 @@ + diff --git a/vsprojects/libevpp.vcxproj.filters b/vsprojects/libevpp.vcxproj.filters index 092b001b0..9be33350b 100644 --- a/vsprojects/libevpp.vcxproj.filters +++ b/vsprojects/libevpp.vcxproj.filters @@ -246,5 +246,8 @@ common + + common + \ No newline at end of file