From fa4440a4ce2acd6e7d3bb0e1c24ae5f8cda8c13c Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Fri, 22 Dec 2023 18:01:44 +0800 Subject: [PATCH] [improve](move-memtable) tweak load stream flush token num and max tasks --- be/src/common/config.cpp | 2 +- be/src/runtime/load_stream.cpp | 5 +---- be/src/runtime/load_stream_mgr.cpp | 4 +++- be/src/runtime/load_stream_mgr.h | 9 +++++++-- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 0ce0f63ba4e602..b2da3a19597337 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -777,7 +777,7 @@ DEFINE_Int32(load_stream_messages_in_batch, "128"); // brpc streaming StreamWait seconds on EAGAIN DEFINE_Int32(load_stream_eagain_wait_seconds, "60"); // max tasks per flush token in load stream -DEFINE_Int32(load_stream_flush_token_max_tasks, "2"); +DEFINE_Int32(load_stream_flush_token_max_tasks, "5"); // max send batch parallelism for OlapTableSink // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job, diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 313728091c1739..0c7991bde97ed9 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -46,10 +46,7 @@ namespace doris { TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) : _id(id), _next_segid(0), _load_id(load_id), _txn_id(txn_id) { - for (int i = 0; i < 10; i++) { - _flush_tokens.emplace_back(load_stream_mgr->new_token()); - } - + load_stream_mgr->create_tokens(_flush_tokens); _failed_st = std::make_shared(); _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true); _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); diff --git a/be/src/runtime/load_stream_mgr.cpp b/be/src/runtime/load_stream_mgr.cpp index b3553046aec9f8..8d9d37c5d3aa77 100644 --- a/be/src/runtime/load_stream_mgr.cpp +++ b/be/src/runtime/load_stream_mgr.cpp @@ -34,7 +34,9 @@ namespace doris { LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num, FifoThreadPool* heavy_work_pool, FifoThreadPool* light_work_pool) - : _heavy_work_pool(heavy_work_pool), _light_work_pool(light_work_pool) { + : _num_threads(segment_file_writer_thread_num), + _heavy_work_pool(heavy_work_pool), + _light_work_pool(light_work_pool) { static_cast(ThreadPoolBuilder("SegmentFileWriterThreadPool") .set_min_threads(segment_file_writer_thread_num) .set_max_threads(segment_file_writer_thread_num) diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h index 466a23c8c5cfd3..ff742012774b1a 100644 --- a/be/src/runtime/load_stream_mgr.h +++ b/be/src/runtime/load_stream_mgr.h @@ -41,8 +41,11 @@ class LoadStreamMgr { Status open_load_stream(const POpenLoadStreamRequest* request, LoadStreamSharedPtr& load_stream); void clear_load(UniqueId loadid); - std::unique_ptr new_token() { - return _file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL); + void create_tokens(std::vector>& tokens) { + for (int i = 0; i < _num_threads * 2; i++) { + tokens.push_back( + _file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL)); + } } // only used by ut @@ -56,6 +59,8 @@ class LoadStreamMgr { std::unordered_map _load_streams_map; std::unique_ptr _file_writer_thread_pool; + uint32_t _num_threads = 0; + FifoThreadPool* _heavy_work_pool = nullptr; FifoThreadPool* _light_work_pool = nullptr; };