From 717e7a3932461395df75d1ba3e9a63703e1ed866 Mon Sep 17 00:00:00 2001 From: bestwoody <89765764+bestwoody@users.noreply.github.com> Date: Tue, 15 Mar 2022 11:43:51 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #4241 Signed-off-by: ti-chi-bot --- dbms/src/Flash/Mpp/MPPTunnel.cpp | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index e91fa4cd935..71b781cb34a 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -110,7 +110,33 @@ void MPPTunnel::connect(::grpc::ServerWriter<::mpp::MPPDataPacket> * writer_) std::lock_guard lk(mu); if (connected) { +<<<<<<< HEAD throw Exception("has connected"); +======= + std::unique_lock lk(mu); + if (connected) + throw Exception("MPPTunnel has connected"); + if (finished) + throw Exception("MPPTunnel has finished"); + + LOG_TRACE(log, "ready to connect"); + if (is_local) + assert(writer_ == nullptr); + else + { + writer = writer_; + if (!is_async) + { + // communicate send_thread through `consumer_state` + // NOTE: if the thread creation failed, `connected` will still be `false`. + thread_manager->schedule(true, "MPPTunnel", [this] { + sendJob(); + }); + } + } + connected = true; + cv_for_connected_or_finished.notify_all(); +>>>>>>> b3a2cd587b (Fix a bug that MPP tasks may leak threads forever (#4241)) } LOG_DEBUG(log, "ready to connect"); connected = true; From 7a0e5bb30598d206146073a43ee74d359d7c5a05 Mon Sep 17 00:00:00 2001 From: bestwoody <89765764+bestwoody@users.noreply.github.com> Date: Wed, 13 Apr 2022 19:49:54 +0800 Subject: [PATCH 2/3] Update MPPTunnel.cpp --- dbms/src/Flash/Mpp/MPPTunnel.cpp | 28 ++-------------------------- 1 file changed, 2 insertions(+), 26 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 71b781cb34a..a57a74d3333 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -110,34 +110,10 @@ void MPPTunnel::connect(::grpc::ServerWriter<::mpp::MPPDataPacket> * writer_) std::lock_guard lk(mu); if (connected) { -<<<<<<< HEAD throw Exception("has connected"); -======= - std::unique_lock lk(mu); - if (connected) - throw Exception("MPPTunnel has connected"); - if (finished) - throw Exception("MPPTunnel has finished"); - - LOG_TRACE(log, "ready to connect"); - if (is_local) - assert(writer_ == nullptr); - else - { - writer = writer_; - if (!is_async) - { - // communicate send_thread through `consumer_state` - // NOTE: if the thread creation failed, `connected` will still be `false`. - thread_manager->schedule(true, "MPPTunnel", [this] { - sendJob(); - }); - } - } - connected = true; - cv_for_connected_or_finished.notify_all(); ->>>>>>> b3a2cd587b (Fix a bug that MPP tasks may leak threads forever (#4241)) } + if (finished) + throw Exception("has finished"); LOG_DEBUG(log, "ready to connect"); connected = true; writer = writer_; From 1ddbbc40ccd12f703a5b7fc99f84406ad3b5e413 Mon Sep 17 00:00:00 2001 From: bestwoody Date: Thu, 14 Apr 2022 18:41:10 +0800 Subject: [PATCH 3/3] format --- dbms/src/Flash/Mpp/MPPTunnel.cpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index a57a74d3333..61b11a8d716 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -3,8 +3,8 @@ // TODO: remove this include after move MPPTask to a single file #include #include -#include #include +#include #include namespace DB @@ -15,8 +15,7 @@ namespace FailPoints extern const char exception_during_mpp_close_tunnel[]; } // namespace FailPoints -MPPTunnel::MPPTunnel( - const mpp::TaskMeta & receiver_meta_, +MPPTunnel::MPPTunnel(const mpp::TaskMeta & receiver_meta_, const mpp::TaskMeta & sender_meta_, const std::chrono::seconds timeout_, const std::shared_ptr & current_task_) @@ -26,8 +25,7 @@ MPPTunnel::MPPTunnel( current_task(current_task_), tunnel_id(fmt::format("tunnel{}+{}", sender_meta_.task_id(), receiver_meta_.task_id())), log(&Logger::get(tunnel_id)) -{ -} +{} MPPTunnel::~MPPTunnel() { @@ -151,4 +149,3 @@ void MPPTunnel::finishWithLock() } } // namespace DB -