Skip to content

Commit

Permalink
(cold_on_s3) Isolate local and remote queries using different scanner…
Browse files Browse the repository at this point in the history
… thread pools (apache#276)
  • Loading branch information
luwei16 committed Jul 19, 2022
1 parent d5fa66d commit a7b0b8e
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 2 deletions.
5 changes: 5 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,11 @@ CONF_Int32(s3_transfer_executor_pool_size, "2");

CONF_Bool(enable_time_lut, "true");

// number of s3 scanner thread pool size
CONF_Int32(doris_s3_scanner_thread_pool_thread_num, "48");
// number of s3 scanner thread pool queue size
CONF_Int32(doris_s3_scanner_thread_pool_queue_size, "102400");

} // namespace config

} // namespace doris
12 changes: 11 additions & 1 deletion be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
* 4. Regularly increase the priority of the remaining tasks in the queue to avoid starvation for large queries
*********************************/
PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
PriorityThreadPool* s3_thread_pool = state->exec_env()->s3_scan_thread_pool();
_total_assign_num = 0;
_nice = 18 + std::max(0, 2 - (int)_olap_scanners.size() / 5);
std::list<OlapScanner*> olap_scanners;
Expand Down Expand Up @@ -1582,8 +1583,17 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
task.priority = _nice;
task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk());
(*iter)->start_wait_worker_timer();

TabletStorageType type = (*iter)->get_storage_type();
bool ret = false;
COUNTER_UPDATE(_scanner_sched_counter, 1);
if (thread_pool->offer(task)) {
if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
ret = thread_pool->offer(task);
} else {
ret = s3_thread_pool->offer(task);
}

if (ret) {
olap_scanners.erase(iter++);
} else {
LOG(FATAL) << "Failed to assign scanner task to thread pool!";
Expand Down
17 changes: 17 additions & 0 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,23 @@ Status OlapScanner::prepare(
return Status::OK();
}

TabletStorageType OlapScanner::get_storage_type() {
int local_reader = 0;
for(const auto& reader: _tablet_reader_params.rs_readers) {
if (reader->rowset()->rowset_meta()->resource_id().empty()) {
local_reader++;
}
}
int total_reader = _tablet_reader_params.rs_readers.size();

if (local_reader == total_reader) {
return TabletStorageType::STORAGE_TYPE_LOCAL;
} else if (local_reader == 0) {
return TabletStorageType::STORAGE_TYPE_S3;
}
return TabletStorageType::STORAGE_TYPE_S3_AND_LOCAL;
}

Status OlapScanner::open() {
auto span = _runtime_state->get_tracer()->StartSpan("OlapScanner::open");
auto scope = opentelemetry::trace::Scope {span};
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class OlapScanner {

const std::shared_ptr<MemTracker>& mem_tracker() const { return _mem_tracker; }

TabletStorageType get_storage_type();

protected:
Status _init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ struct RowsetWriterContext;

using TabletSharedPtr = std::shared_ptr<Tablet>;

enum TabletStorageType {
STORAGE_TYPE_LOCAL,
STORAGE_TYPE_S3,
STORAGE_TYPE_S3_AND_LOCAL
};

class Tablet : public BaseTablet {
public:
static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class ExecEnv {
}
ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; }
PriorityThreadPool* s3_scan_thread_pool() { return _s3_scan_thread_pool; }
ThreadPool* limited_scan_thread_pool() { return _limited_scan_thread_pool.get(); }
PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; }
ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); }
Expand Down Expand Up @@ -200,6 +201,7 @@ class ExecEnv {

// TODO(cmy): find a better way to unify these 2 pools.
PriorityThreadPool* _scan_thread_pool = nullptr;
PriorityThreadPool* _s3_scan_thread_pool = nullptr;
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;

std::unique_ptr<ThreadPool> _send_batch_thread_pool;
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
LOG(INFO) << "scan thread pool use PriorityThreadPool";
}

_s3_scan_thread_pool = new PriorityThreadPool(config::doris_s3_scanner_thread_pool_thread_num,
config::doris_s3_scanner_thread_pool_queue_size);

ThreadPoolBuilder("LimitedScanThreadPool")
.set_min_threads(1)
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
Expand Down Expand Up @@ -336,6 +339,7 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_cgroups_mgr);
SAFE_DELETE(_etl_thread_pool);
SAFE_DELETE(_scan_thread_pool);
SAFE_DELETE(_s3_scan_thread_pool);
SAFE_DELETE(_thread_mgr);
SAFE_DELETE(_broker_client_cache);
SAFE_DELETE(_frontend_client_cache);
Expand Down
12 changes: 11 additions & 1 deletion be/src/vec/exec/volap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1867,6 +1867,7 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
// post volap scanners to thread-pool
PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan();
PriorityThreadPool* s3_thread_pool = state->exec_env()->s3_scan_thread_pool();
auto iter = olap_scanners.begin();
while (iter != olap_scanners.end()) {
PriorityThreadPool::Task task;
Expand All @@ -1877,8 +1878,17 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
task.priority = _nice;
task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk());
(*iter)->start_wait_worker_timer();

TabletStorageType type = (*iter)->get_storage_type();
bool ret = false;
COUNTER_UPDATE(_scanner_sched_counter, 1);
if (thread_pool->offer(task)) {
if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
ret = thread_pool->offer(task);
} else {
ret = s3_thread_pool->offer(task);
}

if (ret) {
olap_scanners.erase(iter++);
} else {
LOG(FATAL) << "Failed to assign scanner task to thread pool!";
Expand Down
17 changes: 17 additions & 0 deletions be/src/vec/exec/volap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,23 @@ Status VOlapScanner::open() {
return Status::OK();
}

TabletStorageType VOlapScanner::get_storage_type() {
int local_reader = 0;
for(const auto& reader: _tablet_reader_params.rs_readers) {
if (reader->rowset()->rowset_meta()->resource_id().empty()) {
local_reader++;
}
}
int total_reader = _tablet_reader_params.rs_readers.size();

if (local_reader == total_reader) {
return TabletStorageType::STORAGE_TYPE_LOCAL;
} else if (local_reader == 0) {
return TabletStorageType::STORAGE_TYPE_S3;
}
return TabletStorageType::STORAGE_TYPE_S3_AND_LOCAL;
}

// it will be called under tablet read lock because capture rs readers need
Status VOlapScanner::_init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/volap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class VOlapScanner {

const std::shared_ptr<MemTracker>& mem_tracker() const { return _mem_tracker; }

TabletStorageType get_storage_type();

private:
Status _init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
Expand Down

0 comments on commit a7b0b8e

Please sign in to comment.