Skip to content

Commit

Permalink
[improve](move-memtable) avoid using heavy work pool during append da…
Browse files Browse the repository at this point in the history
…ta (apache#28745)
  • Loading branch information
kaijchen authored and HappenLee committed Jan 12, 2024
1 parent e97aef7 commit 9f5f1e3
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 59 deletions.
112 changes: 53 additions & 59 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ namespace doris {

TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
: _id(id), _next_segid(0), _load_id(load_id), _txn_id(txn_id) {
: _id(id),
_next_segid(0),
_load_id(load_id),
_txn_id(txn_id),
_load_stream_mgr(load_stream_mgr) {
load_stream_mgr->create_tokens(_flush_tokens);
_failed_st = std::make_shared<Status>();
_profile = profile->create_child(fmt::format("TabletStream {}", id), true, true);
Expand Down Expand Up @@ -125,6 +129,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
butil::IOBuf buf = data->movable();
auto flush_func = [this, new_segid, eos, buf, header]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf);
if (eos && st.ok()) {
st = _load_stream_writer->close_segment(new_segid);
Expand Down Expand Up @@ -166,6 +171,7 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());

auto add_segment_func = [this, new_segid, stat, flush_schema]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema);
if (!st.ok() && _failed_st->ok()) {
_failed_st = std::make_shared<Status>(st);
Expand All @@ -181,13 +187,44 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data

Status TabletStream::close() {
SCOPED_TIMER(_close_wait_timer);
for (auto& token : _flush_tokens) {
token->wait();
bthread::Mutex mu;
std::unique_lock<bthread::Mutex> lock(mu);
bthread::ConditionVariable cv;
auto wait_func = [this, &mu, &cv] {
signal::set_signal_task_id(_load_id);
for (auto& token : _flush_tokens) {
token->wait();
}
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func);
if (ret) {
cv.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
}

if (!_failed_st->ok()) {
return *_failed_st;
}
return _load_stream_writer->close();

Status st = Status::OK();
auto close_func = [this, &mu, &cv, &st]() {
signal::set_signal_task_id(_load_id);
st = _load_stream_writer->close();
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
if (ret) {
cv.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
}
return st;
}

IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
Expand Down Expand Up @@ -244,13 +281,13 @@ Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
}
}

for (auto& it : _tablet_streams_map) {
auto st = it.second->close();
for (auto& [_, tablet_stream] : _tablet_streams_map) {
auto st = tablet_stream->close();
if (st.ok()) {
success_tablet_ids->push_back(it.second->id());
success_tablet_ids->push_back(tablet_stream->id());
} else {
LOG(INFO) << "close tablet stream " << *it.second << ", status=" << st;
failed_tablet_ids->push_back(it.second->id());
LOG(INFO) << "close tablet stream " << *tablet_stream << ", status=" << st;
failed_tablet_ids->push_back(tablet_stream->id());
}
}
return Status::OK();
Expand Down Expand Up @@ -308,37 +345,13 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t
return Status::OK();
}

Status st = Status::OK();
{
bthread::Mutex mutex;
std::unique_lock<bthread::Mutex> lock(mutex);
bthread::ConditionVariable cond;
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
[this, &success_tablet_ids, &failed_tablet_ids, &mutex, &cond, &st]() {
signal::set_signal_task_id(_load_id);
for (auto& it : _index_streams_map) {
st = it.second->close(_tablets_to_commit, success_tablet_ids,
failed_tablet_ids);
if (!st.ok()) {
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
return;
}
}
LOG(INFO) << "close load " << *this
<< ", success_tablet_num=" << success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablet_ids->size();
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
});
if (ret) {
cond.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
}
for (auto& [_, index_stream] : _index_streams_map) {
RETURN_IF_ERROR(
index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablet_ids));
}
return st;
LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size()
<< ", failed_tablet_num=" << failed_tablet_ids->size();
return Status::OK();
}

void LoadStream::_report_result(StreamId stream, const Status& st,
Expand Down Expand Up @@ -424,26 +437,7 @@ Status LoadStream::_append_data(const PStreamHeader& header, butil::IOBuf* data)
index_stream = it->second;
}

Status st = Status::OK();
{
bthread::Mutex mutex;
std::unique_lock<bthread::Mutex> lock(mutex);
bthread::ConditionVariable cond;
bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
[this, &index_stream, &header, &data, &mutex, &cond, &st] {
signal::set_signal_task_id(_load_id);
st = index_stream->append_data(header, data);
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
});
if (ret) {
cond.wait(lock);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for append data");
}
}
return st;
return index_stream->append_data(header, data);
}

int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) {
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class TabletStream {
RuntimeProfile::Counter* _append_data_timer = nullptr;
RuntimeProfile::Counter* _add_segment_timer = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
};

using TabletStreamSharedPtr = std::shared_ptr<TabletStream>;
Expand Down

0 comments on commit 9f5f1e3

Please sign in to comment.