Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement](group commit)Optimize be select for group commit #35558

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,13 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
request.__set_db_id(db_id);
request.__set_table_id(table_id);
request.__set_txnId(txn_id);
request.__set_groupCommit(true);
request.__set_receiveBytes(state->num_bytes_load_total());
if (_exec_env->master_info()->__isset.backend_id) {
request.__set_backendId(_exec_env->master_info()->backend_id);
} else {
LOG(WARNING) << "_exec_env->master_info not set backend_id";
}
if (state) {
request.__set_commitInfos(state->tablet_commit_infos());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,18 @@
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class CloudGroupCommitPlanner extends GroupCommitPlanner {
private static final Logger LOG = LogManager.getLogger(CloudGroupCommitPlanner.class);
Expand All @@ -50,36 +44,11 @@ public CloudGroupCommitPlanner(Database db, OlapTable table, List<String> target

@Override
protected void selectBackends(ConnectContext ctx) throws DdlException {
backend = ctx.getInsertGroupCommit(this.table.getId());
if (backend != null && backend.isAlive() && !backend.isDecommissioned()
&& backend.getCloudClusterName().equals(ctx.getCloudCluster())) {
return;
try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(this.table.getId(), ctx, true);
} catch (LoadException e) {
throw new DdlException("No suitable backend");
}

String cluster = ctx.getCloudCluster();
if (Strings.isNullOrEmpty(cluster)) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR);
}

// select be
List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster)
.values().stream().collect(Collectors.toList());
Collections.shuffle(backends);
for (Backend backend : backends) {
if (backend.isActive() && !backend.isDecommissioned()) {
this.backend = backend;
ctx.setInsertGroupCommit(this.table.getId(), backend);
LOG.debug("choose new be {}", backend.getId());
return;
}
}

List<String> backendsInfo = backends.stream()
.map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive()
+ ", decommission=" + be.isDecommissioned() + " }")
.collect(Collectors.toList());
throw new DdlException("No suitable backend for cloud cluster=" + cluster + ", backends = " + backendsInfo);
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.util;

import java.util.concurrent.atomic.AtomicLongArray;

public class SlidingWindowCounter {
private final int windowSizeInSeconds;
private final int numberOfBuckets;
private final AtomicLongArray buckets;
private final AtomicLongArray bucketTimestamps;

public SlidingWindowCounter(int windowSizeInSeconds) {
this.windowSizeInSeconds = windowSizeInSeconds;
this.numberOfBuckets = windowSizeInSeconds; // Each bucket represents 1 second
this.buckets = new AtomicLongArray(numberOfBuckets);
this.bucketTimestamps = new AtomicLongArray(numberOfBuckets);
}

private int getCurrentBucketIndex() {
long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds
return (int) (currentTime % numberOfBuckets);
}

private void updateCurrentBucket() {
long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds
int currentBucketIndex = getCurrentBucketIndex();
long bucketTimestamp = bucketTimestamps.get(currentBucketIndex);

if (currentTime - bucketTimestamp >= 1) {
buckets.set(currentBucketIndex, 0);
bucketTimestamps.set(currentBucketIndex, currentTime);
}
}

public void add(long value) {
updateCurrentBucket();
int bucketIndex = getCurrentBucketIndex();
buckets.addAndGet(bucketIndex, value);
}

public long get() {
updateCurrentBucket();
long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds
long count = 0;

for (int i = 0; i < numberOfBuckets; i++) {
if (currentTime - bucketTimestamps.get(i) < windowSizeInSeconds) {
count += buckets.get(i);
}
}
return count;
}

public String toString() {
return String.valueOf(get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
Expand Down Expand Up @@ -143,11 +144,16 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse
String sql = request.getHeader("sql");
LOG.info("streaming load sql={}", sql);
boolean groupCommit = false;
long tableId = -1;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
String[] pair = parseDbAndTb(sql);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
tableId = tbl.getId();
if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + GroupCommitPlanner.SCHEMA_CHANGE;
return new RestBaseResult(msg);
Expand All @@ -165,7 +171,7 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse
}

String label = request.getHeader(LABEL_KEY);
TNetworkAddress redirectAddr = selectRedirectBackend(request, groupCommit);
TNetworkAddress redirectAddr = selectRedirectBackend(request, groupCommit, tableId);

LOG.info("redirect load action to destination={}, label: {}",
redirectAddr.toString(), label);
Expand Down Expand Up @@ -287,7 +293,9 @@ private Object executeWithoutPassword(HttpServletRequest request,
return new RestBaseResult(e.getMessage());
}
} else {
redirectAddr = selectRedirectBackend(request, groupCommit);
long tableId = ((OlapTable) ((Database) Env.getCurrentEnv().getCurrentCatalog().getDb(dbName)
.get()).getTable(tableName).get()).getId();
redirectAddr = selectRedirectBackend(request, groupCommit, tableId);
}

LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}",
Expand Down Expand Up @@ -320,7 +328,7 @@ private Object executeStreamLoad2PC(HttpServletRequest request, String db) {
return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected.");
}

TNetworkAddress redirectAddr = selectRedirectBackend(request, false);
TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1);
LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}",
redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation);

Expand Down Expand Up @@ -352,7 +360,7 @@ private String getCloudClusterName(HttpServletRequest request) {
return "";
}

private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit)
private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit, long tableId)
throws LoadException {
long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L);
if (debugBackendId != -1L) {
Expand All @@ -366,11 +374,12 @@ private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolea
}
return selectCloudRedirectBackend(cloudClusterName, request, groupCommit);
} else {
return selectLocalRedirectBackend(groupCommit);
return selectLocalRedirectBackend(groupCommit, request, tableId);
}
}

private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit) throws LoadException {
private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServletRequest request, long tableId)
throws LoadException {
Backend backend = null;
BeSelectionPolicy policy = null;
String qualifiedUser = ConnectContext.get().getQualifiedUser();
Expand All @@ -390,12 +399,17 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit) throws L
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
if (groupCommit) {
for (Long backendId : backendIds) {
Backend candidateBe = Env.getCurrentSystemInfo().getBackend(backendId);
if (!candidateBe.isDecommissioned()) {
backend = candidateBe;
break;
}
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setRemoteIP(request.getRemoteAddr());
ctx.setThreadLocalInfo();

try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(tableId, ctx, false);
} catch (DdlException e) {
throw new RuntimeException(e);
}
} else {
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
Expand Down Expand Up @@ -573,10 +587,10 @@ private Object executeWithClusterToken(HttpServletRequest request, String db,
return new RestBaseResult("No label selected.");
}

TNetworkAddress redirectAddr = selectRedirectBackend(request, false);
TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1);

LOG.info("Redirect load action with auth token to destination={},"
+ "stream: {}, db: {}, tbl: {}, label: {}",
+ "stream: {}, db: {}, tbl: {}, label: {}",
redirectAddr.toString(), isStreamLoad, dbName, tableName, label);

URI urlObj = null;
Expand Down
Loading
Loading