Skip to content

Commit

Permalink
[improve](move-memtable) remove heavy work pool from load stream
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Dec 22, 2023
1 parent c1457f9 commit d627659
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 75 deletions.
84 changes: 30 additions & 54 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <runtime/exec_env.h>

#include <memory>
#include <ranges>

#include "common/signal_handler.h"
#include "exec/tablet_info.h"
Expand Down Expand Up @@ -184,13 +185,30 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data

Status TabletStream::close() {
SCOPED_TIMER(_close_wait_timer);
for (auto& token : _flush_tokens) {
token->wait();
}
bthread::Mutex mu;
std::unique_lock<bthread::Mutex> lo(mu);
bthread::ConditionVariable cv;
auto wait_func = [this, &mu, &cv] {
for (auto& token : _flush_tokens | std::ranges::views::drop(1)) {
token->wait();
}
std::lock_guard<bthread::Mutex> lo(mu);
cv.notify_one();
};
RETURN_IF_ERROR(_flush_tokens[0]->submit_func(wait_func));
cv.wait(lo);
if (!_failed_st->ok()) {
return *_failed_st;
}
return _load_stream_writer->close();
Status st = Status::OK();
auto close_func = [this, &mu, &cv, &st]() {
st = _load_stream_writer->close();
std::lock_guard<bthread::Mutex> lo(mu);
cv.notify_one();
};
RETURN_IF_ERROR(_flush_tokens[0]->submit_func(close_func));
cv.wait(lo);
return st;
}

IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
Expand Down Expand Up @@ -311,37 +329,13 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t
return Status::OK();
}

Status st = Status::OK();
{
bthread::Mutex mutex;
std::unique_lock<bthread::Mutex> lock(mutex);
bthread::ConditionVariable cond;
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
[this, &success_tablet_ids, &failed_tablet_ids, &mutex, &cond, &st]() {
signal::set_signal_task_id(_load_id);
for (auto& it : _index_streams_map) {
st = it.second->close(_tablets_to_commit, success_tablet_ids,
failed_tablet_ids);
if (!st.ok()) {
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
return;
}
}
LOG(INFO) << "close load " << *this
<< ", success_tablet_num=" << success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablet_ids->size();
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
});
if (ret) {
cond.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
}
signal::set_signal_task_id(_load_id);
for (auto& [_, index] : _index_streams_map) {
RETURN_IF_ERROR(index->close(_tablets_to_commit, success_tablet_ids, failed_tablet_ids));
}
return st;
LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablet_ids->size();
return Status::OK();
}

void LoadStream::_report_result(StreamId stream, const Status& st,
Expand Down Expand Up @@ -427,26 +421,8 @@ Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data)
index_stream = it->second;
}

Status st = Status::OK();
{
bthread::Mutex mutex;
std::unique_lock<bthread::Mutex> lock(mutex);
bthread::ConditionVariable cond;
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
[this, &index_stream, &header, &data, &mutex, &cond, &st] {
signal::set_signal_task_id(_load_id);
st = index_stream->append_data(header, data);
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
});
if (ret) {
cond.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for append data");
}
}
return st;
signal::set_signal_task_id(_load_id);
return index_stream->append_data(header, data);
}

int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) {
Expand Down
4 changes: 1 addition & 3 deletions be/src/runtime/load_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@

namespace doris {

LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num,
FifoThreadPool* heavy_work_pool, FifoThreadPool* light_work_pool)
: _heavy_work_pool(heavy_work_pool), _light_work_pool(light_work_pool) {
LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num) {
static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool")
.set_min_threads(segment_file_writer_thread_num)
.set_max_threads(segment_file_writer_thread_num)
Expand Down
9 changes: 1 addition & 8 deletions be/src/runtime/load_stream_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class POpenStreamSinkRequest;

class LoadStreamMgr {
public:
LoadStreamMgr(uint32_t segment_file_writer_thread_num, FifoThreadPool* heavy_work_pool,
FifoThreadPool* light_work_pool);
LoadStreamMgr(uint32_t segment_file_writer_thread_num);
~LoadStreamMgr();

Status open_load_stream(const POpenLoadStreamRequest* request,
Expand All @@ -48,16 +47,10 @@ class LoadStreamMgr {
// only used by ut
size_t get_load_stream_num() { return _load_streams_map.size(); }

FifoThreadPool* heavy_work_pool() { return _heavy_work_pool; }
FifoThreadPool* light_work_pool() { return _light_work_pool; }

private:
std::mutex _lock;
std::unordered_map<UniqueId, LoadStreamSharedPtr> _load_streams_map;
std::unique_ptr<ThreadPool> _file_writer_thread_pool;

FifoThreadPool* _heavy_work_pool = nullptr;
FifoThreadPool* _light_work_pool = nullptr;
};

} // namespace doris
5 changes: 2 additions & 3 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,8 @@ PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
? config::brpc_light_work_pool_max_queue_size
: std::max(10240, CpuInfo::num_cores() * 320),
"brpc_light"),
_load_stream_mgr(new LoadStreamMgr(
exec_env->store_paths().size() * config::flush_thread_num_per_store,
&_heavy_work_pool, &_light_work_pool)) {
_load_stream_mgr(new LoadStreamMgr(exec_env->store_paths().size() *
config::flush_thread_num_per_store)) {
REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
[this]() { return _heavy_work_pool.get_queue_size(); });
REGISTER_HOOK_METRIC(light_work_pool_queue_size,
Expand Down
9 changes: 2 additions & 7 deletions be/test/runtime/load_stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,7 @@ class LoadStreamMgrTest : public testing::Test {
Handler _handler;
};

LoadStreamMgrTest()
: _heavy_work_pool(4, 32, "load_stream_test_heavy"),
_light_work_pool(4, 32, "load_stream_test_light") {}
LoadStreamMgrTest() = default;

void close_load(MockSinkClient& client, uint32_t sender_id = NORMAL_SENDER_ID) {
butil::IOBuf append_buf;
Expand Down Expand Up @@ -602,7 +600,7 @@ class LoadStreamMgrTest : public testing::Test {

static_cast<void>(k_engine->start_bg_threads());

_load_stream_mgr = std::make_unique<LoadStreamMgr>(4, &_heavy_work_pool, &_light_work_pool);
_load_stream_mgr = std::make_unique<LoadStreamMgr>(4);
_stream_service = new StreamService(_load_stream_mgr.get());
CHECK_EQ(0, _server->AddService(_stream_service, brpc::SERVER_OWNS_SERVICE));
brpc::ServerOptions server_options;
Expand Down Expand Up @@ -658,9 +656,6 @@ class LoadStreamMgrTest : public testing::Test {
brpc::Server* _server;
StreamService* _stream_service;

FifoThreadPool _heavy_work_pool;
FifoThreadPool _light_work_pool;

std::unique_ptr<LoadStreamMgr> _load_stream_mgr;
};

Expand Down

0 comments on commit d627659

Please sign in to comment.