diff --git a/dbms/src/Common/LooseBoundedMPMCQueue.h b/dbms/src/Common/LooseBoundedMPMCQueue.h index 6611d9bf37e..bead1e5e1c1 100644 --- a/dbms/src/Common/LooseBoundedMPMCQueue.h +++ b/dbms/src/Common/LooseBoundedMPMCQueue.h @@ -50,10 +50,20 @@ class LooseBoundedMPMCQueue , push_callback(std::move(push_callback_)) {} + String getAddr() const + { + std::stringstream ss; + ss << this; + return ss.str(); + } + void registerPipeReadTask(TaskPtr && task) { { + const auto pipe_task_cnt = pipe_reader_cv.getTaskCnt(); std::lock_guard lock(mu); + LOG_DEBUG(Logger::get(), "gjt debug registerPipeReadTask queue size: {}, status: {}, pipe_task_cnt: {}, workqueue: {}", + queue.size(), status == MPMCQueueStatus::NORMAL, pipe_task_cnt, getAddr()); if (queue.empty() && status == MPMCQueueStatus::NORMAL) { pipe_reader_cv.registerTask(std::move(task)); @@ -66,7 +76,10 @@ class LooseBoundedMPMCQueue void registerPipeWriteTask(TaskPtr && task) { { + const auto pipe_task_cnt = pipe_writer_cv.getTaskCnt(); std::lock_guard lock(mu); + LOG_DEBUG(Logger::get(), "gjt debug registerPipeWriteTask isFullWithoutLock: {}, status: {}, pipe_task_cnt: {}, workqueue: {}", + isFullWithoutLock(), status == MPMCQueueStatus::NORMAL, pipe_task_cnt, getAddr()); if (isFullWithoutLock() && status == MPMCQueueStatus::NORMAL) { pipe_writer_cv.registerTask(std::move(task)); @@ -259,18 +272,24 @@ class LooseBoundedMPMCQueue private: void notifyOneReader() { + LOG_DEBUG(Logger::get(), "gjt debug notifyOneReader: {}", + getAddr()); reader_cv.notify_one(); pipe_reader_cv.notifyOne(); } void notifyOneWriter() { + LOG_DEBUG(Logger::get(), "gjt debug notifyOneWriter: {}", + getAddr()); writer_cv.notify_one(); pipe_writer_cv.notifyOne(); } void notifyAll() { + LOG_DEBUG(Logger::get(), "gjt debug notifyAll: {}", + getAddr()); reader_cv.notify_all(); pipe_reader_cv.notifyAll();