From 9f5f1e39ec7d4168b0a4b8751a1687528dc7c65a Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Fri, 22 Dec 2023 22:51:30 +0800 Subject: [PATCH] [improve](move-memtable) avoid using heavy work pool during append data (#28745) --- be/src/runtime/load_stream.cpp | 112 ++++++++++++++++----------------- be/src/runtime/load_stream.h | 1 + 2 files changed, 54 insertions(+), 59 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 0c7991bde97ed92..307cd4ef30b1615 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -45,7 +45,11 @@ namespace doris { TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) - : _id(id), _next_segid(0), _load_id(load_id), _txn_id(txn_id) { + : _id(id), + _next_segid(0), + _load_id(load_id), + _txn_id(txn_id), + _load_stream_mgr(load_stream_mgr) { load_stream_mgr->create_tokens(_flush_tokens); _failed_st = std::make_shared(); _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true); @@ -125,6 +129,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data DCHECK(new_segid != std::numeric_limits::max()); butil::IOBuf buf = data->movable(); auto flush_func = [this, new_segid, eos, buf, header]() { + signal::set_signal_task_id(_load_id); auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf); if (eos && st.ok()) { st = _load_stream_writer->close_segment(new_segid); @@ -166,6 +171,7 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data DCHECK(new_segid != std::numeric_limits::max()); auto add_segment_func = [this, new_segid, stat, flush_schema]() { + signal::set_signal_task_id(_load_id); auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema); if (!st.ok() && _failed_st->ok()) { _failed_st = std::make_shared(st); @@ -181,13 +187,44 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data Status TabletStream::close() { SCOPED_TIMER(_close_wait_timer); - for (auto& token : _flush_tokens) { - token->wait(); + bthread::Mutex mu; + std::unique_lock lock(mu); + bthread::ConditionVariable cv; + auto wait_func = [this, &mu, &cv] { + signal::set_signal_task_id(_load_id); + for (auto& token : _flush_tokens) { + token->wait(); + } + std::lock_guard lock(mu); + cv.notify_one(); + }; + bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func); + if (ret) { + cv.wait(lock); + } else { + return Status::Error( + "there is not enough thread resource for close load"); } + if (!_failed_st->ok()) { return *_failed_st; } - return _load_stream_writer->close(); + + Status st = Status::OK(); + auto close_func = [this, &mu, &cv, &st]() { + signal::set_signal_task_id(_load_id); + st = _load_stream_writer->close(); + std::lock_guard lock(mu); + cv.notify_one(); + }; + ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func); + if (ret) { + cv.wait(lock); + } else { + return Status::Error( + "there is not enough thread resource for close load"); + } + return st; } IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id, @@ -244,13 +281,13 @@ Status IndexStream::close(const std::vector& tablets_to_commit, } } - for (auto& it : _tablet_streams_map) { - auto st = it.second->close(); + for (auto& [_, tablet_stream] : _tablet_streams_map) { + auto st = tablet_stream->close(); if (st.ok()) { - success_tablet_ids->push_back(it.second->id()); + success_tablet_ids->push_back(tablet_stream->id()); } else { - LOG(INFO) << "close tablet stream " << *it.second << ", status=" << st; - failed_tablet_ids->push_back(it.second->id()); + LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st; + failed_tablet_ids->push_back(tablet_stream->id()); } } return Status::OK(); @@ -308,37 +345,13 @@ Status LoadStream::close(int64_t src_id, const std::vector& tablets_t return Status::OK(); } - Status st = Status::OK(); - { - bthread::Mutex mutex; - std::unique_lock lock(mutex); - bthread::ConditionVariable cond; - bool ret = _load_stream_mgr->heavy_work_pool()->try_offer( - [this, &success_tablet_ids, &failed_tablet_ids, &mutex, &cond, &st]() { - signal::set_signal_task_id(_load_id); - for (auto& it : _index_streams_map) { - st = it.second->close(_tablets_to_commit, success_tablet_ids, - failed_tablet_ids); - if (!st.ok()) { - std::unique_lock lock(mutex); - cond.notify_one(); - return; - } - } - LOG(INFO) << "close load " << *this - << ", success_tablet_num=" << success_tablet_ids->size() - << ", failed_tablet_num=" << failed_tablet_ids->size(); - std::unique_lock lock(mutex); - cond.notify_one(); - }); - if (ret) { - cond.wait(lock); - } else { - return Status::Error( - "there is not enough thread resource for close load"); - } + for (auto& [_, index_stream] : _index_streams_map) { + RETURN_IF_ERROR( + index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablet_ids)); } - return st; + LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size() + << ", failed_tablet_num=" << failed_tablet_ids->size(); + return Status::OK(); } void LoadStream::_report_result(StreamId stream, const Status& st, @@ -424,26 +437,7 @@ Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data) index_stream = it->second; } - Status st = Status::OK(); - { - bthread::Mutex mutex; - std::unique_lock lock(mutex); - bthread::ConditionVariable cond; - bool ret = _load_stream_mgr->heavy_work_pool()->try_offer( - [this, &index_stream, &header, &data, &mutex, &cond, &st] { - signal::set_signal_task_id(_load_id); - st = index_stream->append_data(header, data); - std::unique_lock lock(mutex); - cond.notify_one(); - }); - if (ret) { - cond.wait(lock); - } else { - return Status::Error( - "there is not enough thread resource for append data"); - } - } - return st; + return index_stream->append_data(header, data); } int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) { diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index a4d359dc0eac0bf..7e16fe417ca32b0 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -69,6 +69,7 @@ class TabletStream { RuntimeProfile::Counter* _append_data_timer = nullptr; RuntimeProfile::Counter* _add_segment_timer = nullptr; RuntimeProfile::Counter* _close_wait_timer = nullptr; + LoadStreamMgr* _load_stream_mgr = nullptr; }; using TabletStreamSharedPtr = std::shared_ptr;