Skip to content

Commit

Permalink
[improve](move-memtable) limit task num in load stream flush token (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored and stephen committed Dec 28, 2023
1 parent ad9a2a1 commit feda61c
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 2 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,8 @@ DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
DEFINE_Int32(load_stream_messages_in_batch, "128");
// brpc streaming StreamWait seconds on EAGAIN
DEFINE_Int32(load_stream_eagain_wait_seconds, "60");
// max tasks per flush token in load stream
DEFINE_Int32(load_stream_flush_token_max_tasks, "2");

// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,8 @@ DECLARE_Int64(load_stream_max_buf_size);
DECLARE_Int32(load_stream_messages_in_batch);
// brpc streaming StreamWait seconds on EAGAIN
DECLARE_Int32(load_stream_eagain_wait_seconds);
// max tasks per flush token in load stream
DECLARE_Int32(load_stream_flush_token_max_tasks);

// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
Expand Down
13 changes: 11 additions & 2 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "runtime/load_stream.h"

#include <brpc/stream.h>
#include <bthread/bthread.h>
#include <bthread/condition_variable.h>
#include <bthread/mutex.h>
#include <olap/rowset/rowset_factory.h>
Expand Down Expand Up @@ -136,7 +137,11 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
LOG(INFO) << "write data failed " << *this;
}
};
return _flush_tokens[new_segid % _flush_tokens.size()]->submit_func(flush_func);
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
while (flush_token->num_tasks() >= config::load_stream_flush_token_max_tasks) {
bthread_usleep(10 * 1000); // 10ms
}
return flush_token->submit_func(flush_func);
}

Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
Expand Down Expand Up @@ -170,7 +175,11 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
LOG(INFO) << "add segment failed " << *this;
}
};
return _flush_tokens[new_segid % _flush_tokens.size()]->submit_func(add_segment_func);
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
while (flush_token->num_tasks() >= config::load_stream_flush_token_max_tasks) {
bthread_usleep(10 * 1000); // 10ms
}
return flush_token->submit_func(add_segment_func);
}

Status TabletStream::close() {
Expand Down

0 comments on commit feda61c

Please sign in to comment.