diff --git a/evpp/event_loop.cc b/evpp/event_loop.cc index 017ef17ea..9e19a9988 100644 --- a/evpp/event_loop.cc +++ b/evpp/event_loop.cc @@ -66,15 +66,19 @@ void EventLoop::Init() { tid_ = std::this_thread::get_id(); // The default thread id - // Initialized task queue watcher + InitNotifyPipeWatcher(); + + status_.store(kInitialized); +} + +void EventLoop::InitNotifyPipeWatcher() { + // Initialized task queue notify pipe watcher watcher_.reset(new PipeEventWatcher(this, std::bind(&EventLoop::DoPendingFunctors, this))); int rc = watcher_->Init(); assert(rc); if (!rc) { LOG_FATAL << "PipeEventWatcher init failed."; } - - status_.store(kInitialized); } void EventLoop::Run() { @@ -149,6 +153,20 @@ void EventLoop::AfterFork() { LOG_FATAL << "event_reinit failed!"; abort(); } + + // We create EventLoopThread and initialize it in father process, + // but we use it in child process. + // If we have only one child process, everything goes well. + // + // But if we have multi child processes, something goes wrong. + // Because EventLoop::watcher_ is created and initialized in father process + // all children processes inherited father's pipe. + // + // When we use the pipe to do a notification in one child process + // the notification may be received by another child process randomly. + // + // So we need to reinitialize the watcher_ + InitNotifyPipeWatcher(); } InvokeTimerPtr EventLoop::RunAfter(double delay_ms, const Functor& f) { diff --git a/evpp/event_loop.h b/evpp/event_loop.h index a7e4fbca2..ee38f305a 100644 --- a/evpp/event_loop.h +++ b/evpp/event_loop.h @@ -52,7 +52,7 @@ class EVPP_EXPORT EventLoop : public ServerStatus { // @brief Stop the event loop void Stop(); - // @brief Reinitialize the event_base object after a fork + // @brief Reinitialize some data fields after a fork void AfterFork(); InvokeTimerPtr RunAfter(double delay_ms, const Functor& f); @@ -105,6 +105,7 @@ class EVPP_EXPORT EventLoop : public ServerStatus { } private: void Init(); + void InitNotifyPipeWatcher(); void StopInLoop(); void DoPendingFunctors(); size_t GetPendingQueueSize(); diff --git a/evpp/tcp_server.cc b/evpp/tcp_server.cc index c99c6e982..e9efabba0 100644 --- a/evpp/tcp_server.cc +++ b/evpp/tcp_server.cc @@ -39,6 +39,10 @@ bool TCPServer::Init() { return true; } +void TCPServer::AfterFork() { + // Nothing to do right now. +} + bool TCPServer::Start() { DLOG_TRACE; assert(status_ == kInitialized); diff --git a/evpp/tcp_server.h b/evpp/tcp_server.h index 794f02692..103d31fd6 100644 --- a/evpp/tcp_server.h +++ b/evpp/tcp_server.h @@ -76,6 +76,9 @@ class EVPP_EXPORT TCPServer : public ThreadDispatchPolicy, public ServerStatus { // the TCP server is totally stopped void Stop(DoneCallback cb = DoneCallback()); + // @brief Reinitialize some data fields after a fork + void AfterFork(); + public: // Set a connection event relative callback when the TCPServer // receives a new connection or an exist connection breaks down. diff --git a/evpp/udp/udp_server.cc b/evpp/udp/udp_server.cc index d864f49e8..8273fe6d9 100644 --- a/evpp/udp/udp_server.cc +++ b/evpp/udp/udp_server.cc @@ -140,6 +140,10 @@ bool Server::Init(const std::string& listen_ports/*like "53,5353,1053"*/) { return Init(v); } +void Server::AfterFork() { + // Nothing to do right now. +} + bool Server::Start() { if (!message_handler_) { LOG_ERROR << "MessageHandler DO NOT set!"; diff --git a/evpp/udp/udp_server.h b/evpp/udp/udp_server.h index 76543d34c..cbde2c1a6 100644 --- a/evpp/udp/udp_server.h +++ b/evpp/udp/udp_server.h @@ -30,6 +30,9 @@ class EVPP_EXPORT Server : public ThreadDispatchPolicy { void Pause(); void Continue(); + // @brief Reinitialize some data fields after a fork + void AfterFork(); + bool IsRunning() const; bool IsStopped() const; diff --git a/test/event_loop_test.cc b/test/event_loop_test.cc index c4c977a6a..203ff1c30 100644 --- a/test/event_loop_test.cc +++ b/test/event_loop_test.cc @@ -34,7 +34,7 @@ static void PeriodicFunc() { } } -TEST_UNIT(testEventLoop) { +TEST_UNIT(TestEventLoop1) { using namespace evloop; std::thread th(MyEventThread); usleep(delay.Microseconds()); @@ -50,15 +50,13 @@ TEST_UNIT(testEventLoop) { H_TEST_ASSERT(evpp::GetActiveEventCount() == 0); } - namespace { void OnTimer(evpp::EventLoop* loop) { } } - -TEST_UNIT(testEventLoop2) { +TEST_UNIT(TestEventLoop2) { evpp::EventLoop loop; auto timer = [&loop]() { auto close = [&loop]() { @@ -72,7 +70,7 @@ TEST_UNIT(testEventLoop2) { } // Test std::move of C++11 -TEST_UNIT(testEventLoop3) { +TEST_UNIT(TestEventLoop3) { evpp::EventLoop loop; auto timer = [&loop]() { auto close = [&loop]() { @@ -108,7 +106,7 @@ void NewEventLoop(struct event_base* base) { } // Test creating EventLoop from a exist event_base -TEST_UNIT(testEventLoop4) { +TEST_UNIT(TestEventLoop4) { struct event_base* base = event_base_new(); auto timer = std::make_shared(base, std::bind(&NewEventLoop, base), evpp::Duration(1.0)); timer->Init(); @@ -125,7 +123,7 @@ TEST_UNIT(testEventLoop4) { // Test EventLoop::QueueInLoop() before EventLoop::Run() -TEST_UNIT(testEventLoop5) { +TEST_UNIT(TestEventLoop5) { evpp::EventLoop loop; auto fn = [&loop]() { LOG_INFO << "Entering fn"; @@ -141,6 +139,16 @@ TEST_UNIT(testEventLoop5) { } +// Test EventLoop's constructor and destructor +TEST_UNIT(TestEventLoop6) { + evpp::EventLoop* loop = new evpp::EventLoop; + LOG_INFO << "loop=" << loop; + delete loop; +} + + + +