diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index e91fa4cd935..61b11a8d716 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -3,8 +3,8 @@ // TODO: remove this include after move MPPTask to a single file #include #include -#include #include +#include #include namespace DB @@ -15,8 +15,7 @@ namespace FailPoints extern const char exception_during_mpp_close_tunnel[]; } // namespace FailPoints -MPPTunnel::MPPTunnel( - const mpp::TaskMeta & receiver_meta_, +MPPTunnel::MPPTunnel(const mpp::TaskMeta & receiver_meta_, const mpp::TaskMeta & sender_meta_, const std::chrono::seconds timeout_, const std::shared_ptr & current_task_) @@ -26,8 +25,7 @@ MPPTunnel::MPPTunnel( current_task(current_task_), tunnel_id(fmt::format("tunnel{}+{}", sender_meta_.task_id(), receiver_meta_.task_id())), log(&Logger::get(tunnel_id)) -{ -} +{} MPPTunnel::~MPPTunnel() { @@ -112,6 +110,8 @@ void MPPTunnel::connect(::grpc::ServerWriter<::mpp::MPPDataPacket> * writer_) { throw Exception("has connected"); } + if (finished) + throw Exception("has finished"); LOG_DEBUG(log, "ready to connect"); connected = true; writer = writer_; @@ -149,4 +149,3 @@ void MPPTunnel::finishWithLock() } } // namespace DB -