Skip to content

Commit

Permalink
+1
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Jul 19, 2024
1 parent a95e1f5 commit de33794
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 18 deletions.
2 changes: 1 addition & 1 deletion be/src/util/s3_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_s3_client(
aws_config.maxConnections = config::doris_scanner_thread_pool_thread_num;
#else
aws_config.maxConnections =
ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_size();
ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_thread_num();
#endif
}

Expand Down
29 changes: 12 additions & 17 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
return;
}

// Submit scanners to thread pool
// TODO(cmy): How to handle this "nice"?
int nice = 1;
if (ctx->thread_token != nullptr) {
std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
Expand Down Expand Up @@ -174,21 +171,19 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
scan_sched =
is_local ? _local_scan_thread_pool.get() : _remote_scan_thread_pool.get();
}
if (scan_sched) {
auto work_func = [scanner_ref = scan_task, ctx]() {
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
}();
auto work_func = [scanner_ref = scan_task, ctx]() {
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
}();

if (!status.ok()) {
scanner_ref->set_status(status);
ctx->append_block_to_queue(scanner_ref);
}
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
return scan_sched->submit_scan_task(simple_scan_task);
}
if (!status.ok()) {
scanner_ref->set_status(status);
ctx->append_block_to_queue(scanner_ref);
}
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
return scan_sched->submit_scan_task(simple_scan_task);
};

if (auto ret = sumbit_task(); !ret) {
Expand Down

0 comments on commit de33794

Please sign in to comment.