From a0cae321a0a139fd70c1fbd2a36c9d2d40e31238 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 23 Jul 2019 10:52:06 +0800 Subject: [PATCH 1/5] Add more logs to trace the broker load process. The Operator wants to known when the job being scheduled as PENDING and LOADING. And how long it takes to finish these sub states. Also add 2 metrics on BE to monitor the memtable's flush time. `memtable_flush_total` and `memtable_flush_duration_us` --- be/src/olap/memtable.cpp | 19 ++++++++++++------- be/src/olap/txn_manager.cpp | 2 +- be/src/runtime/tablet_writer_mgr.cpp | 9 ++++++--- be/src/util/doris_metrics.cpp | 6 ++++++ be/src/util/doris_metrics.h | 3 +++ .../load/loadv2/BrokerLoadPendingTask.java | 4 ++++ .../doris/load/loadv2/LoadLoadingTask.java | 3 +-- .../java/org/apache/doris/qe/QeProcessor.java | 3 ++- .../org/apache/doris/qe/QeProcessorImpl.java | 8 +++++--- .../doris/service/FrontendServiceImpl.java | 2 +- 10 files changed, 41 insertions(+), 18 deletions(-) diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 4a1e495332a63f..c56d7c55c603cc 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -151,14 +151,19 @@ void MemTable::insert(Tuple* tuple) { } OLAPStatus MemTable::flush(RowsetWriterSharedPtr rowset_writer) { - Table::Iterator it(_skip_list); - for (it.SeekToFirst(); it.Valid(); it.Next()) { - const char* row = it.key(); - _schema->finalize(row); - RETURN_NOT_OK(rowset_writer->add_row(row, _schema)); + int64_t duration_ns = 0; + { + SCOPED_RAW_TIMER(&duration_ns); + Table::Iterator it(_skip_list); + for (it.SeekToFirst(); it.Valid(); it.Next()) { + const char* row = it.key(); + _schema->finalize(row); + RETURN_NOT_OK(rowset_writer->add_row(row, _schema)); + } + RETURN_NOT_OK(rowset_writer->flush()); } - - RETURN_NOT_OK(rowset_writer->flush()); + DorisMetrics::memtable_flush_total.increment(1); + DorisMetrics::memtable_flush_duration_us.increment(duration_ns / 1000); return OLAP_SUCCESS; } diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index fef2e98e5e6ed3..80dc877749c995 100755 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -166,7 +166,7 @@ OLAPStatus TxnManager::commit_txn( && load_info.rowset != nullptr && load_info.rowset->rowset_id() != rowset_ptr->rowset_id()) { // find a rowset with different rowset id, then it should not happen, just return errors - LOG(WARNING) << "find transaction exists when add to engine." + LOG(WARNING) << "find transaction exists when add to engine. but rowset ids are not same." << "partition_id: " << key.first << ", transaction_id: " << key.second << ", tablet: " << tablet_info.to_string() diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index 0e397cc6db9282..8c384f8dd5c4a3 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -27,6 +27,7 @@ #include "runtime/mem_tracker.h" #include "runtime/row_batch.h" #include "runtime/tuple_row.h" +#include "service/backend_options.h" #include "util/bitmap.h" #include "util/stopwatch.hpp" #include "olap/delta_writer.h" @@ -152,9 +153,11 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) { } auto st = it->second->write(row_batch.get_row(i)->get_tuple(0)); if (st != OLAP_SUCCESS) { - LOG(WARNING) << "tablet writer writer failed, tablet_id=" << it->first - << ", transaction_id=" << _txn_id; - return Status::InternalError("tablet writer write failed"); + std::stringstream ss; + ss << "tablet writer write failed, tablet_id=" << it->first + << ", transaction_id=" << _txn_id << ", status=" << st; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str() + ", be: " + BackendOptions::get_localhost()); } } _next_seqs[params.sender_id()]++; diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 57e9c35ce76621..7efdad8deedb69 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -94,6 +94,9 @@ IntCounter DorisMetrics::txn_exec_plan_total; IntCounter DorisMetrics::stream_receive_bytes_total; IntCounter DorisMetrics::stream_load_rows_total; +IntCounter DorisMetrics::memtable_flush_total; +IntCounter DorisMetrics::memtable_flush_duration_us; + // gauges IntGauge DorisMetrics::memory_pool_bytes_total; IntGauge DorisMetrics::process_thread_num; @@ -138,6 +141,9 @@ void DorisMetrics::initialize( REGISTER_DORIS_METRIC(query_scan_rows); REGISTER_DORIS_METRIC(ranges_processed_total); + REGISTER_DORIS_METRIC(memtable_flush_total); + REGISTER_DORIS_METRIC(memtable_flush_duration_us); + // push request _metrics->register_metric( "push_requests_total", MetricLabels().add("status", "SUCCESS"), diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 71808667b7c89f..c9ba75b2c09726 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -114,6 +114,9 @@ class DorisMetrics { static IntCounter stream_receive_bytes_total; static IntCounter stream_load_rows_total; + static IntCounter memtable_flush_total; + static IntCounter memtable_flush_duration_us; + // Gauges static IntGauge memory_pool_bytes_total; static IntGauge process_thread_num; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index 6db49560015dc8..9f01687d28fac0 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -53,11 +53,13 @@ public BrokerLoadPendingTask(BrokerLoadJob loadTaskCallback, @Override void executeTask() throws UserException { + LOG.info("begin to execute broker pending task. job: {}", callback.getCallbackId()); getAllFileStatus(); } private void getAllFileStatus() throws UserException { + long start = System.currentTimeMillis(); for (Map.Entry> entry : tableToBrokerFileList.entrySet()) { long tableId = entry.getKey(); @@ -79,6 +81,8 @@ private void getAllFileStatus() } ((BrokerPendingTaskAttachment) attachment).addFileStatus(tableId, fileStatusList); + LOG.info("get {} files to be loaded. cost: {} ms, job: {}", + fileStatusList.size(), (System.currentTimeMillis() - start), callback.getCallbackId()); } } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 7c3211fa2e723e..e9450b87db3df6 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -55,8 +55,6 @@ public class LoadLoadingTask extends LoadTask { private LoadingTaskPlanner planner; - private String errMsg; - public LoadLoadingTask(Database db, OlapTable table, BrokerDesc brokerDesc, List fileGroups, long jobDeadlineMs, long execMemLimit, boolean strictMode, @@ -81,6 +79,7 @@ public void init(List> fileStatusList, int fileNum) thro @Override protected void executeTask() throws Exception{ + LOG.info("begin to execute loading task. job: {}. left retry: {}", callback.getCallbackId(), retryTime); retryTime--; executeOnce(); } diff --git a/fe/src/main/java/org/apache/doris/qe/QeProcessor.java b/fe/src/main/java/org/apache/doris/qe/QeProcessor.java index a94d4fd43821c2..704ccdfd8f8e67 100644 --- a/fe/src/main/java/org/apache/doris/qe/QeProcessor.java +++ b/fe/src/main/java/org/apache/doris/qe/QeProcessor.java @@ -18,6 +18,7 @@ package org.apache.doris.qe; import org.apache.doris.common.UserException; +import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; import org.apache.doris.thrift.TUniqueId; @@ -26,7 +27,7 @@ public interface QeProcessor { - TReportExecStatusResult reportExecStatus(TReportExecStatusParams params); + TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr); void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException; diff --git a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 5a08cb97d43127..bd916263d38d78 100644 --- a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -20,6 +20,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; import org.apache.doris.thrift.TStatus; @@ -94,9 +95,10 @@ public Map getQueryStatistics() { } @Override - public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) { - LOG.info("ReportExecStatus(): fragment_instance_id=" + DebugUtil.printId(params.fragment_instance_id) - + ", query id=" + DebugUtil.printId(params.query_id)); + public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr) { + LOG.info("ReportExecStatus(): fragment_instance_id={}, query id={}, backend num: {}, ip: {}", + DebugUtil.printId(params.fragment_instance_id), DebugUtil.printId(params.query_id), + params.backend_num, beAddr); LOG.debug("params: {}", params); final TReportExecStatusResult result = new TReportExecStatusResult(); final QueryInfo info = coordinatorMap.get(params.query_id); diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index d6f5e7447d9e2d..450d83ef58071e 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -317,7 +317,7 @@ public TShowVariableResult showVariables(TShowVariableRequest params) throws TEx @Override public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) throws TException { - return QeProcessorImpl.INSTANCE.reportExecStatus(params); + return QeProcessorImpl.INSTANCE.reportExecStatus(params, getClientAddr()); } @Override From 825ca0028ef779e9ea558d677c485f925dba257f Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 23 Jul 2019 14:42:09 +0800 Subject: [PATCH 2/5] add add batch timer --- be/src/exec/tablet_sink.cpp | 17 +++++++++++-- be/src/exec/tablet_sink.h | 6 +++++ be/src/service/internal_service.cpp | 19 ++++++++------ .../apache/doris/planner/BrokerScanNode.java | 25 ++++++++++++++----- gensrc/proto/internal_service.proto | 1 + 5 files changed, 53 insertions(+), 15 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 9cbdf76959e02f..fda9bedd364a8c 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -206,6 +206,10 @@ Status NodeChannel::_wait_in_flight_packet() { << ", error_text=" << _add_batch_closure->cntl.ErrorText(); return Status::InternalError("failed to send batch"); } + + if (_add_batch_closure->result.has_execution_time_us()) { + _parent->increase_node_add_batch_time_us(_node_id, _add_batch_closure->result.execution_time_us()); + } return {_add_batch_closure->result.status()}; } @@ -599,7 +603,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { for (auto channel : _channels) { status = channel->close(state); if (!status.ok()) { - LOG(WARNING) << "close channel failed, load_id=" << _load_id + LOG(WARNING) << "close channel failed, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id; } } @@ -612,8 +616,17 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { COUNTER_SET(_validate_data_timer, _validate_data_ns); COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns); COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns); - state->update_num_rows_load_filtered(_number_filtered_rows); + + // print log of add batch time of all node, for tracing load performance easily + std::stringstream ss; + ss << "finished to close olap table sink. load_id=" << print_id(_load_id) + << ", txn_id=" << _txn_id << ", node add batch time(ns): "; + for (auto const& pair: _node_add_batch_time_map) { + ss << "{" << pair.first << "=" << pair.second << "}"; + } + LOG(INFO) << ss.str(); + } else { for (auto channel : _channels) { channel->cancel(); diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index dc7dc370412f61..95e85efc11e365 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -172,6 +172,9 @@ class OlapTableSink : public DataSink { // at a time can modify them. int64_t* mutable_wait_in_flight_packet_ns() { return &_wait_in_flight_packet_ns; } int64_t* mutable_serialize_batch_ns() { return &_serialize_batch_ns; } + void increase_node_add_batch_time_us(int64_t be_id, int64_t time_ns) { + _node_add_batch_time_map[be_id] += time_ns; + } private: // convert input batch to output batch which will be loaded into OLAP table. @@ -257,6 +260,9 @@ class OlapTableSink : public DataSink { RuntimeProfile::Counter* _close_timer = nullptr; RuntimeProfile::Counter* _wait_in_flight_packet_timer = nullptr; RuntimeProfile::Counter* _serialize_batch_timer = nullptr; + + // BE id -> execution time of add batch in us + std::unordered_map _node_add_batch_time_map; }; } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 687d7935f29bb8..afe7461b13355a 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -102,14 +102,19 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcContr _tablet_worker_pool.offer( [request, response, done, this] () { brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->tablet_writer_mgr()->add_batch(*request, response->mutable_tablet_vec()); - if (!st.ok()) { - LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg() - << ", id=" << request->id() - << ", index_id=" << request->index_id() - << ", sender_id=" << request->sender_id(); + int64_t execution_time_ns = 0; + { + SCOPED_RAW_TIMER(&execution_time_ns); + auto st = _exec_env->tablet_writer_mgr()->add_batch(*request, response->mutable_tablet_vec()); + if (!st.ok()) { + LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg() + << ", id=" << request->id() + << ", index_id=" << request->index_id() + << ", sender_id=" << request->sender_id(); + } + st.to_protobuf(response->mutable_status()); } - st.to_protobuf(response->mutable_status()); + response->set_execution_time_us(execution_time_ns / 1000); }); } diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 5dbf775ee6ac5e..4a774275ad451b 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -17,10 +17,6 @@ package org.apache.doris.planner; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.ArithmeticExpr; import org.apache.doris.analysis.BinaryPredicate; @@ -66,6 +62,12 @@ import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -106,6 +108,8 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) { private long bytesPerInstance; // Parameters need to process + private long loadJobId = -1; // -1 means this scan node is not for a load job + private long txnId = -1; private Table targetTable; private BrokerDesc brokerDesc; private List fileGroups; @@ -187,10 +191,14 @@ public void setLoadInfo(Table targetTable, this.fileGroups = fileGroups; } - public void setLoadInfo(Table targetTable, + public void setLoadInfo(long loadJobId, + long txnId, + Table targetTable, BrokerDesc brokerDesc, List fileGroups, boolean strictMode) { + this.loadJobId = loadJobId; + this.txnId = txnId; this.targetTable = targetTable; this.brokerDesc = brokerDesc; this.fileGroups = fileGroups; @@ -466,7 +474,6 @@ private TScanRangeLocations newLocations(TBrokerScanRangeParams params, String b // Generate on broker scan range TBrokerScanRange brokerScanRange = new TBrokerScanRange(); brokerScanRange.setParams(params); - // TODO(zc): int numBroker = Math.min(3, numBe); for (int i = 0; i < numBroker; ++i) { FsBroker broker = null; @@ -674,6 +681,12 @@ public void finalize(Analyzer analyzer) throws UserException { LOG.debug("Scan range is {}", locations); } } + + if (loadJobId != -1) { + LOG.info("broker load job {} with txn {} has {} scan range: {}", + loadJobId, txnId, locationsList.size(), + locationsList.stream().map(loc -> loc.locations.get(0).backend_id).toArray()); + } } @Override diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index be36bc82189142..877d3b28212a7d 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -95,6 +95,7 @@ message PTabletWriterAddBatchRequest { message PTabletWriterAddBatchResult { required PStatus status = 1; repeated PTabletInfo tablet_vec = 2; + optional int64 execution_time_us = 3; }; // tablet writer cancel From d9d563ec06b403988b333163e6322047aa51a1fe Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 23 Jul 2019 14:45:44 +0800 Subject: [PATCH 3/5] fix bug --- .../org/apache/doris/load/loadv2/LoadingTaskPlanner.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 18ff25831b6bdf..2de5afcdb269aa 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -60,6 +60,7 @@ public class LoadingTaskPlanner { private static final Logger LOG = LogManager.getLogger(LoadingTaskPlanner.class); // Input params + private final long loadJobId; private final long txnId; private final long dbId; private final OlapTable table; @@ -77,9 +78,10 @@ public class LoadingTaskPlanner { private int nextNodeId = 0; - public LoadingTaskPlanner(long txnId, long dbId, OlapTable table, + public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table, BrokerDesc brokerDesc, List brokerFileGroups, boolean strictMode) { + this.loadJobId = loadJobId; this.txnId = txnId; this.dbId = dbId; this.table = table; @@ -108,7 +110,7 @@ public void plan(List> fileStatusesList, int filesAdded) // 1. Broker scan node BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "BrokerScanNode", fileStatusesList, filesAdded); - scanNode.setLoadInfo(table, brokerDesc, fileGroups, strictMode); + scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode); scanNode.init(analyzer); scanNode.finalize(analyzer); scanNodes.add(scanNode); From d063e4b20c868a748f3dbf0baac4d2679a07bc3b Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 23 Jul 2019 14:51:16 +0800 Subject: [PATCH 4/5] fix bug2 --- .../main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java | 2 +- fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index e9450b87db3df6..b0af066e160255 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -73,7 +73,7 @@ public LoadLoadingTask(Database db, OlapTable table, } public void init(List> fileStatusList, int fileNum) throws UserException { - planner = new LoadingTaskPlanner(txnId, db.getId(), table, brokerDesc, fileGroups, strictMode); + planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, strictMode); planner.plan(fileStatusList, fileNum); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java index b153337855dce6..35176a38bab86d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java @@ -36,7 +36,7 @@ public abstract class LoadTask extends MasterTask { protected FailMsg failMsg = new FailMsg(); protected int retryTime = 1; - public LoadTask(LoadTaskCallback callback){ + public LoadTask(LoadTaskCallback callback) { this.signature = Catalog.getCurrentCatalog().getNextId(); this.callback = callback; } From 772077892c299d9f1fce1e4eaec63449af3ae739 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 23 Jul 2019 15:44:10 +0800 Subject: [PATCH 5/5] add config 'async_load_task_pool_size' --- .../org/apache/doris/catalog/Catalog.java | 2 +- .../java/org/apache/doris/common/Config.java | 21 ++++++++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index d7bd3754f5701a..5886036404c26a 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -481,7 +481,7 @@ private Catalog() { this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat); this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat); - this.loadTaskScheduler = new MasterTaskExecutor(10); + this.loadTaskScheduler = new MasterTaskExecutor(Config.async_load_task_pool_size); this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager); diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 53eee42b6e1720..bafc64306974ab 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -259,13 +259,6 @@ public class Config extends ConfigBase { * minimal intervals between two publish version action */ @ConfField public static int publish_version_interval_ms = 100; - - /* - * maximun concurrent running txn num including prepare, commit txns under a single db - * txn manager will reject coming txns - */ - @ConfField(mutable = true, masterOnly = true) - public static int max_running_txn_num_per_db = 100; /* * Maximal wait seconds for straggler node in load @@ -393,6 +386,20 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int desired_max_waiting_jobs = 100; + /* + * maximun concurrent running txn num including prepare, commit txns under a single db + * txn manager will reject coming txns + */ + @ConfField(mutable = true, masterOnly = true) + public static int max_running_txn_num_per_db = 100; + + /* + * The load task executor pool size. This pool size limits the max running load tasks. + * Currently, it only limits the load task of broker load, pending and loading phases. + * It should be less than 'max_running_txn_num_per_db' + */ + public static int async_load_task_pool_size = 10; + /* * Same meaning as *tablet_create_timeout_second*, but used when delete a tablet. */