From c75aa2e047660e29c95a5b8e7f28bd1aeddce0ba Mon Sep 17 00:00:00 2001 From: qiye Date: Mon, 13 Nov 2023 14:14:25 +0800 Subject: [PATCH] [fix](broker load) pass loadToSingleTablet to olapTableSink (#26680) --- .../java/org/apache/doris/load/loadv2/LoadLoadingTask.java | 2 +- .../org/apache/doris/load/loadv2/LoadingTaskPlanner.java | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 9a2fdb647f4d02..3bdbe52209b15b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -111,7 +111,7 @@ public void init(TUniqueId loadId, List> fileStatusList, this.loadId = loadId; planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, strictMode, isPartialUpdate, timezone, this.timeoutS, this.loadParallelism, this.sendBatchParallelism, - this.useNewLoadScanNode, userInfo); + this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink); planner.plan(loadId, fileStatusList, fileNum); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index a11ea7634f20a8..304dd2478c7f24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -72,6 +72,7 @@ public class LoadingTaskPlanner { private final int loadParallelism; private final int sendBatchParallelism; private final boolean useNewLoadScanNode; + private final boolean singleTabletLoadPerSink; private UserIdentity userInfo; // Something useful // ConnectContext here is just a dummy object to avoid some NPE problem, like ctx.getDatabase() @@ -87,7 +88,8 @@ public class LoadingTaskPlanner { public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table, BrokerDesc brokerDesc, List brokerFileGroups, boolean strictMode, boolean isPartialUpdate, String timezone, long timeoutS, int loadParallelism, - int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity userInfo) { + int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity userInfo, + boolean singleTabletLoadPerSink) { this.loadJobId = loadJobId; this.txnId = txnId; this.dbId = dbId; @@ -101,6 +103,7 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table this.loadParallelism = loadParallelism; this.sendBatchParallelism = sendBatchParallelism; this.useNewLoadScanNode = useNewLoadScanNode; + this.singleTabletLoadPerSink = singleTabletLoadPerSink; this.userInfo = userInfo; if (Env.getCurrentEnv().getAccessManager() .checkDbPriv(userInfo, Env.getCurrentInternalCatalog().getDbNullable(dbId).getFullName(), @@ -211,7 +214,7 @@ public void plan(TUniqueId loadId, List> fileStatusesLis List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds, Config.enable_single_replica_load); - olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, false, strictMode); + olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, singleTabletLoadPerSink, strictMode); olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); olapTableSink.complete(analyzer);