From 4a81de146790d54a406bb2ad631800725ac8c07e Mon Sep 17 00:00:00 2001 From: "weizili.build9" Date: Fri, 24 Mar 2017 13:24:51 +0800 Subject: [PATCH] Fix issue 18 : The delay closing feature of an incoming TCPConn will make memory leak. See https://github.com/Qihoo360/evpp/issues/18 --- evpp/tcp_client.cc | 10 +++++++++- evpp/tcp_client.h | 4 +--- evpp/tcp_conn.cc | 29 ++++++++++++++++++++++------- evpp/tcp_conn.h | 16 ++++++++++++++++ evpp/tcp_server.cc | 1 + test/tcp_server_test.cc | 1 + 6 files changed, 50 insertions(+), 11 deletions(-) diff --git a/evpp/tcp_client.cc b/evpp/tcp_client.cc index c3696a03f..0de7cf80d 100644 --- a/evpp/tcp_client.cc +++ b/evpp/tcp_client.cc @@ -43,6 +43,14 @@ void TCPClient::Disconnect() { loop_->RunInLoop(std::bind(&TCPClient::DisconnectInLoop, this)); } +void TCPClient::SetConnectionCallback(const ConnectionCallback& cb) { + conn_fn_ = cb; + auto c = conn(); + if (c) { + c->SetConnectionCallback(cb); + } +} + void TCPClient::DisconnectInLoop() { LOG_WARN << "TCPClient::DisconnectInLoop this=" << this << " remote_addr=" << remote_addr_; assert(loop_->IsInLoopThread()); @@ -101,7 +109,7 @@ void TCPClient::OnConnection(int sockfd, const std::string& laddr) { void TCPClient::OnRemoveConnection(const TCPConnPtr& c) { assert(c.get() == conn_.get()); assert(loop_->IsInLoopThread()); - + conn_.reset(); if (auto_reconnect_.load()) { Reconnect(); } diff --git a/evpp/tcp_client.h b/evpp/tcp_client.h index 0045eb1ab..9ec6ae7db 100644 --- a/evpp/tcp_client.h +++ b/evpp/tcp_client.h @@ -25,9 +25,7 @@ class EVPP_EXPORT TCPClient { // 1. Successfully establish a connection : TCPConn::IsConnected() == true // 2. An exist connection broken down : TCPConn::IsDisconnecting() == true // 3. Failed to establish a connection : TCPConn::IsDisconnected() == true and TCPConn::fd() == -1 - void SetConnectionCallback(const ConnectionCallback& cb) { - conn_fn_ = cb; - } + void SetConnectionCallback(const ConnectionCallback& cb); void SetMessageCallback(const MessageCallback& cb) { msg_fn_ = cb; diff --git a/evpp/tcp_conn.cc b/evpp/tcp_conn.cc index 9c1727020..753110791 100644 --- a/evpp/tcp_conn.cc +++ b/evpp/tcp_conn.cc @@ -6,6 +6,7 @@ #include "evpp/fd_channel.h" #include "evpp/event_loop.h" #include "evpp/sockets.h" +#include "evpp/invoke_timer.h" namespace evpp { TCPConn::TCPConn(EventLoop* l, @@ -28,11 +29,11 @@ TCPConn::TCPConn(EventLoop* l, chan_->SetWriteCallback(std::bind(&TCPConn::HandleWrite, this)); } - LOG_DEBUG << "TCPConn::[" << name_ << "] this=" << this << " channel=" << chan_.get() << " fd=" << sockfd; + LOG_DEBUG << "TCPConn::[" << name_ << "] this=" << this << " channel=" << chan_.get() << " fd=" << sockfd << " addr=" << Addr(); } TCPConn::~TCPConn() { - LOG_TRACE << "TCPConn::~TCPConn() name=" << name() << " this=" << this << " channel=" << chan_.get() << " fd=" << fd_ << " type=" << int(type()) << " status=" << StatusToString(); + LOG_TRACE << "TCPConn::~TCPConn() name=" << name() << " this=" << this << " channel=" << chan_.get() << " fd=" << fd_ << " type=" << int(type()) << " status=" << StatusToString() << " addr=" << Addr();; assert(status_ == kDisconnected); if (fd_ >= 0) { @@ -42,10 +43,12 @@ TCPConn::~TCPConn() { EVUTIL_CLOSESOCKET(fd_); fd_ = INVALID_SOCKET; } + + assert(!delay_close_timer_.get()); } void TCPConn::Close() { - LOG_INFO << "TCPConn::Close this=" << this << " fd=" << fd_ << " status=" << StatusToString() << " remote_addr=" << remote_addr_; + LOG_INFO << "TCPConn::Close this=" << this << " fd=" << fd_ << " status=" << StatusToString() << " addr=" << Addr(); auto c = shared_from_this(); auto f = [c]() { assert(c->loop_->IsInLoopThread()); @@ -177,13 +180,13 @@ void TCPConn::HandleRead() { LOG_DEBUG << "TCPConn::HandleRead this=" << this << " fd=" << fd_ << ". We read 0 bytes and close the socket."; HandleClose(); } else { - // Fix the half-closing problem£ºhttps://github.com/chenshuo/muduo/pull/117 + // Fix the half-closing problem : https://github.com/chenshuo/muduo/pull/117 // This is an incoming connection, we need to preserve the connection for a while so that we can reply to it. // And we set a timer to close the connection eventually. chan_->DisableReadEvent(); - LOG_DEBUG << "TCPConn::HandleRead this=" << this << " channel (fd=" << chan_->fd() << ") DisableReadEvent"; - loop_->RunAfter(close_delay_, std::bind(&TCPConn::HandleClose, shared_from_this())); // TODO leave it to user layer close. + LOG_DEBUG << "TCPConn::HandleRead this=" << this << " channel (fd=" << chan_->fd() << ") DisableReadEvent. And set a timer to delay close this TCPConn"; + delay_close_timer_ = loop_->RunAfter(close_delay_, std::bind(&TCPConn::DelayClose, shared_from_this())); // TODO leave it to user layer close. } } else { if (EVUTIL_ERR_RW_RETRIABLE(serrno)) { @@ -221,8 +224,13 @@ void TCPConn::HandleWrite() { } } +void TCPConn::DelayClose() { + delay_close_timer_.reset(); + HandleClose(); +} + void TCPConn::HandleClose() { - LOG_INFO << "TCPConn::HandleClose this=" << this << " remote_addr=" << remote_addr_ << " fd=" << fd_ << " status_=" << StatusToString(); + LOG_INFO << "TCPConn::HandleClose this=" << this << " addr=" << Addr() << " fd=" << fd_ << " status_=" << StatusToString(); // Avoid multi calling if (status_ == kDisconnected) { @@ -237,6 +245,12 @@ void TCPConn::HandleClose() { TCPConnPtr conn(shared_from_this()); + if (delay_close_timer_) { + LOG_INFO << "Cancel the delay closing timer."; + delay_close_timer_->Cancel(); + delay_close_timer_.reset(); + } + if (conn_fn_) { // This callback must be invoked at status kDisconnecting // e.g. when the TCPClient disconnects with remote server, @@ -246,6 +260,7 @@ void TCPConn::HandleClose() { } close_fn_(conn); + LOG_INFO << "TCPConn::HandleClose exit, this=" << this << " addr=" << Addr() << " fd=" << fd_ << " status_=" << StatusToString() << " use_count=" << conn.use_count(); status_ = kDisconnected; } diff --git a/evpp/tcp_conn.h b/evpp/tcp_conn.h index 7f06e962b..d8b405ae3 100644 --- a/evpp/tcp_conn.h +++ b/evpp/tcp_conn.h @@ -12,6 +12,7 @@ namespace evpp { class EventLoop; class FdChannel; class TCPClient; +class InvokeTimer; class EVPP_EXPORT TCPConn : public std::enable_shared_from_this { public: @@ -46,6 +47,9 @@ class EVPP_EXPORT TCPConn : public std::enable_shared_from_this { EventLoop* loop() const { return loop_; } + int fd() const { + return fd_; + } void set_context(const Any& c) { context_[0] = c; } @@ -82,6 +86,9 @@ class EVPP_EXPORT TCPConn : public std::enable_shared_from_this { Type type() const { return type_; } + bool IsIncommingConn() const { + return type_ == kIncoming; + } Status status() const { return status_; } @@ -122,10 +129,18 @@ class EVPP_EXPORT TCPConn : public std::enable_shared_from_this { void HandleRead(); void HandleWrite(); void HandleClose(); + void DelayClose(); void SendInLoop(const Slice& message); void SendInLoop(const void* data, size_t len); void SendStringInLoop(const std::string& message); std::string StatusToString() const; + std::string Addr() const { + if (IsIncommingConn()) { + return "(" + remote_addr_ + "->" + local_addr_ + "(local))"; + } else { + return "(" + local_addr_ + "(local)->" + remote_addr_ + ")"; + } + } private: EventLoop* loop_; int fd_; @@ -145,6 +160,7 @@ class EVPP_EXPORT TCPConn : public std::enable_shared_from_this { // The delay time to close a incoming connection which has been shutdown by peer normally. // Default is 3 second. Duration close_delay_; + std::shared_ptr delay_close_timer_; // The timer to delay close this TCPConn ConnectionCallback conn_fn_; // This will be called to the user application layer MessageCallback msg_fn_; // This will be called to the user application layer diff --git a/evpp/tcp_server.cc b/evpp/tcp_server.cc index a8b259650..185dfe0f8 100644 --- a/evpp/tcp_server.cc +++ b/evpp/tcp_server.cc @@ -113,6 +113,7 @@ EventLoop* TCPServer::GetNextLoop(const struct sockaddr_in* raddr) { void TCPServer::RemoveConnection(const TCPConnPtr& conn) { auto f = [ = ]() { // Remove the connection in the listening EventLoop + LOG_INFO << "TCPServer::RemoveConnection conn=" << conn.get() << " fd="<< conn->fd(); assert(this->loop_->IsInLoopThread()); this->connections_.erase(conn->name()); }; diff --git a/test/tcp_server_test.cc b/test/tcp_server_test.cc index 4c08d551b..e3516d422 100644 --- a/test/tcp_server_test.cc +++ b/test/tcp_server_test.cc @@ -66,6 +66,7 @@ TEST_UNIT(testTCPServer1) { tcp_client_thread.reset(); loop.reset(); tsrv.reset(); + client.reset(); H_TEST_ASSERT(evpp::GetActiveEventCount() == 0); }