diff --git a/dbms/src/Common/LooseBoundedMPMCQueue.h b/dbms/src/Common/LooseBoundedMPMCQueue.h index 6611d9bf37e..bead1e5e1c1 100644 --- a/dbms/src/Common/LooseBoundedMPMCQueue.h +++ b/dbms/src/Common/LooseBoundedMPMCQueue.h @@ -50,10 +50,20 @@ class LooseBoundedMPMCQueue , push_callback(std::move(push_callback_)) {} + String getAddr() const + { + std::stringstream ss; + ss << this; + return ss.str(); + } + void registerPipeReadTask(TaskPtr && task) { { + const auto pipe_task_cnt = pipe_reader_cv.getTaskCnt(); std::lock_guard lock(mu); + LOG_DEBUG(Logger::get(), "gjt debug registerPipeReadTask queue size: {}, status: {}, pipe_task_cnt: {}, workqueue: {}", + queue.size(), status == MPMCQueueStatus::NORMAL, pipe_task_cnt, getAddr()); if (queue.empty() && status == MPMCQueueStatus::NORMAL) { pipe_reader_cv.registerTask(std::move(task)); @@ -66,7 +76,10 @@ class LooseBoundedMPMCQueue void registerPipeWriteTask(TaskPtr && task) { { + const auto pipe_task_cnt = pipe_writer_cv.getTaskCnt(); std::lock_guard lock(mu); + LOG_DEBUG(Logger::get(), "gjt debug registerPipeWriteTask isFullWithoutLock: {}, status: {}, pipe_task_cnt: {}, workqueue: {}", + isFullWithoutLock(), status == MPMCQueueStatus::NORMAL, pipe_task_cnt, getAddr()); if (isFullWithoutLock() && status == MPMCQueueStatus::NORMAL) { pipe_writer_cv.registerTask(std::move(task)); @@ -259,18 +272,24 @@ class LooseBoundedMPMCQueue private: void notifyOneReader() { + LOG_DEBUG(Logger::get(), "gjt debug notifyOneReader: {}", + getAddr()); reader_cv.notify_one(); pipe_reader_cv.notifyOne(); } void notifyOneWriter() { + LOG_DEBUG(Logger::get(), "gjt debug notifyOneWriter: {}", + getAddr()); writer_cv.notify_one(); pipe_writer_cv.notifyOne(); } void notifyAll() { + LOG_DEBUG(Logger::get(), "gjt debug notifyAll: {}", + getAddr()); reader_cv.notify_all(); pipe_reader_cv.notifyAll(); diff --git a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp index 072fcc2bee3..6450f491559 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp @@ -432,8 +432,8 @@ void MPPTunnelSetWriterBase::fineGrainedShuffleWrite( compression_method, original_size); - if unlikely (tracked_packet->getPacket().chunks_size() <= 0) - return; + // if unlikely (tracked_packet->getPacket().chunks_size() <= 0) + // return; auto packet_bytes = tracked_packet->getPacket().ByteSizeLong(); checkPacketSize(packet_bytes); @@ -457,8 +457,8 @@ void MPPTunnelSetWriterBase::fineGrainedShuffleWrite( num_columns, result_field_types); - if unlikely (tracked_packet->getPacket().chunks_size() <= 0) - return; + // if unlikely (tracked_packet->getPacket().chunks_size() <= 0) + // return; auto packet_bytes = tracked_packet->getPacket().ByteSizeLong(); checkPacketSize(packet_bytes); diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.cpp b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.cpp index 515155aee6d..82159d66cef 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.cpp @@ -115,29 +115,34 @@ bool ResourceControlQueue::take(TaskPtr & task) continue; UInt64 wait_dura = LocalAdmissionController::DEFAULT_FETCH_GAC_INTERVAL_MS; + LOG_DEBUG(logger, "gjt debug RCQ::take() wait_dura: {}, resource_group_infos: {}", + wait_dura, resource_group_infos.size()); if (!resource_group_infos.empty()) { const ResourceGroupInfo & group_info = resource_group_infos.top(); const bool ru_exhausted = LocalAdmissionController::isRUExhausted(group_info.priority); - LOG_TRACE( - logger, - "trying to schedule task of resource group {}, priority: {}, ru exhausted: {}, is_finished: {}, " - "task_queue.empty(): {}", - group_info.name, - group_info.priority, - ru_exhausted, - is_finished, - group_info.task_queue->empty()); - // When highest priority of resource group is less than zero, means RU of all resource groups are exhausted. // Should not take any task from nested task queue for this situation. if (!ru_exhausted) { + LOG_DEBUG(logger, "gjt debug RCQ::take() ru enough, returns ok"); mustTakeTask(group_info.task_queue, task); return true; } wait_dura = LocalAdmissionController::global_instance->estWaitDuraMS(group_info.name); + + LOG_DEBUG( + logger, + "trying to schedule task of resource group {}, priority: {}, ru exhausted: {}, is_finished: {}, " + "task_queue.empty(): {}, wait_dura: {}", + group_info.name, + group_info.priority, + ru_exhausted, + is_finished, + group_info.task_queue->empty(), + wait_dura); + } assert(!task); diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h index 196d325462d..f9e384ed025 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h @@ -80,8 +80,13 @@ class PipeConditionVariable TaskScheduler::instance->submit(std::move(task)); } + size_t getTaskCnt() const + { + std::lock_guard lock(mu); + return tasks.size(); + } private: - std::mutex mu; + mutable std::mutex mu; std::deque tasks; }; } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp index e482a541024..9219dc2f9d6 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp @@ -156,6 +156,7 @@ bool SegmentReadTaskScheduler::needScheduleToRead(const SegmentReadTaskPoolPtr & { if (pool->getFreeBlockSlots() <= 0) { + // LOG_DEBUG(Logger::get(), "gjt debug needScheduleToRead failed: {}", pool->getFreeBlockSlotsInfo()); GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_slot).Increment(); return false; } diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h index 048bbd38a6a..599e94a1456 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h @@ -20,6 +20,7 @@ #include #include #include +#include namespace DB::DM { @@ -71,10 +72,20 @@ class WorkQueue , pop_empty_times(0) {} + String getAddr() const + { + std::stringstream ss; + ss << this; + return ss.str(); + } + void registerPipeTask(TaskPtr && task) { { + const auto pipe_task_cnt = pipe_cv.getTaskCnt(); std::lock_guard lock(mu); + LOG_DEBUG(Logger::get(), "gjt debug registerPipTask queue size: {}, done: {}, pipe_task_cnt: {}, workqueue: {}", + queue.size(), done, pipe_task_cnt, getAddr()); if (queue.empty() && !done) { pipe_cv.registerTask(std::move(task)); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index c53990034d5..0e51a016db5 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -202,6 +202,11 @@ class SegmentReadTaskPool const LoggerPtr & getLogger() const { return log; } + String getFreeBlockSlotsInfo() const + { + return fmt::format("block_slot_limit: {}, pending: {}, workqueue: {}", + block_slot_limit, blk_stat.pendingCount(), q.getAddr()); + } private: Int64 getFreeActiveSegmentsUnlock() const; bool exceptionHappened() const;