Skip to content

Commit

Permalink
[improvement](clone) dead be will abort sched task #36795 (#36895)
Browse files Browse the repository at this point in the history
cherry pick from #36795
  • Loading branch information
yujun777 authored Jun 27, 2024
1 parent b4a19dd commit 02b9d7a
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1822,36 +1822,64 @@ private void gatherStatistics(TabletSchedCtx tabletCtx) {
* If task is timeout, remove the tablet.
*/
public void handleRunningTablets() {
Set<Long> aliveBeIds = Sets.newHashSet(Env.getCurrentSystemInfo().getAllBackendIds(true));
// 1. remove the tablet ctx if timeout
List<TabletSchedCtx> timeoutTablets = Lists.newArrayList();
List<TabletSchedCtx> cancelTablets = Lists.newArrayList();
synchronized (this) {
runningTablets.values().stream().filter(TabletSchedCtx::isTimeout).forEach(timeoutTablets::add);
for (TabletSchedCtx tabletCtx : runningTablets.values()) {
long srcBeId = tabletCtx.getSrcBackendId();
long destBeId = tabletCtx.getDestBackendId();
if (Config.disable_tablet_scheduler) {
tabletCtx.setErrMsg("tablet scheduler is disabled");
cancelTablets.add(tabletCtx);
} else if (Config.disable_balance && tabletCtx.getType() == Type.BALANCE) {
tabletCtx.setErrMsg("balance is disabled");
cancelTablets.add(tabletCtx);
} else if (tabletCtx.isTimeout()) {
tabletCtx.setErrMsg("timeout");
cancelTablets.add(tabletCtx);
stat.counterCloneTaskTimeout.incrementAndGet();
} else if (destBeId > 0 && !aliveBeIds.contains(destBeId)) {
tabletCtx.setErrMsg("dest be " + destBeId + " is dead");
cancelTablets.add(tabletCtx);
} else if (srcBeId > 0 && !aliveBeIds.contains(srcBeId)) {
tabletCtx.setErrMsg("src be " + srcBeId + " is dead");
cancelTablets.add(tabletCtx);
}
}
}

// 2. release ctx
timeoutTablets.forEach(t -> {
cancelTablets.forEach(t -> {
// Set "resetReplicaState" to true because
// the timeout task should also be considered as UNRECOVERABLE,
// so need to reset replica state.
t.setErrMsg("timeout");
finalizeTabletCtx(t, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, "timeout");
stat.counterCloneTaskTimeout.incrementAndGet();
finalizeTabletCtx(t, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, t.getErrMsg());
});
}

public List<List<String>> getPendingTabletsInfo(int limit) {
List<TabletSchedCtx> tabletCtxs = getCopiedTablets(pendingTablets, limit);
return collectTabletCtx(tabletCtxs);
return collectTabletCtx(getPendingTablets(limit));
}

public List<TabletSchedCtx> getPendingTablets(int limit) {
return getCopiedTablets(pendingTablets, limit);
}

public List<List<String>> getRunningTabletsInfo(int limit) {
List<TabletSchedCtx> tabletCtxs = getCopiedTablets(runningTablets.values(), limit);
return collectTabletCtx(tabletCtxs);
return collectTabletCtx(getRunningTablets(limit));
}

public List<TabletSchedCtx> getRunningTablets(int limit) {
return getCopiedTablets(runningTablets.values(), limit);
}

public List<List<String>> getHistoryTabletsInfo(int limit) {
List<TabletSchedCtx> tabletCtxs = getCopiedTablets(schedHistory, limit);
return collectTabletCtx(tabletCtxs);
return collectTabletCtx(getHistoryTablets(limit));
}

public List<TabletSchedCtx> getHistoryTablets(int limit) {
return getCopiedTablets(schedHistory, limit);
}

private List<List<String>> collectTabletCtx(List<TabletSchedCtx> tabletCtxs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,18 @@ public static void addDebugPoint(String name) {
addDebugPoint(name, new DebugPoint());
}

public static <E> void addDebugPointWithValue(String name, E value) {
public static void addDebugPointWithParams(String name, Map<String, String> params) {
DebugPoint debugPoint = new DebugPoint();
debugPoint.params.put("value", String.format("%s", value));
debugPoint.params = params;
addDebugPoint(name, debugPoint);
}

public static <E> void addDebugPointWithValue(String name, E value) {
Map<String, String> params = Maps.newHashMap();
params.put("value", String.format("%s", value));
addDebugPointWithParams(name, params);
}

public static void removeDebugPoint(String name) {
DebugPoint debugPoint = debugPoints.remove(name);
LOG.info("remove debug point: name={}, exists={}", name, debugPoint != null);
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.Version;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.HbPackage;
import org.apache.doris.resource.Tag;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -237,6 +239,14 @@ public HeartbeatResponse call() {
result.setBackendInfo(backendInfo);
}

String debugDeadBeIds = DebugPointUtil.getDebugParamOrDefault(
"HeartbeatMgr.BackendHeartbeatHandler", "deadBeIds", "");
if (!Strings.isNullOrEmpty(debugDeadBeIds)
&& Arrays.stream(debugDeadBeIds.split(",")).anyMatch(id -> Long.parseLong(id) == backendId)) {
result.getStatus().setStatusCode(TStatusCode.INTERNAL_ERROR);
result.getStatus().addToErrorMsgs("debug point HeartbeatMgr.deadBeIds set dead be");
}

ok = true;
if (result.getStatus().getStatusCode() == TStatusCode.OK) {
TBackendInfo tBackendInfo = result.getBackendInfo();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.clone;

import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.system.Backend;
import org.apache.doris.utframe.TestWithFeService;

import com.google.common.collect.Maps;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;

public class BeDownCancelCloneTest extends TestWithFeService {

@Override
protected int backendNum() {
return 4;
}

@Override
protected void beforeCreatingConnectContext() throws Exception {
FeConstants.runningUnitTest = true;
FeConstants.default_scheduler_interval_millisecond = 1000;
Config.enable_debug_points = true;
FeConstants.tablet_checker_interval_ms = 100;
Config.tablet_repair_delay_factor_second = 1;
Config.allow_replica_on_same_host = true;
Config.disable_balance = true;
Config.schedule_batch_size = 1000;
Config.schedule_slot_num_per_hdd_path = 1000;
FeConstants.heartbeat_interval_second = 5;
Config.max_backend_heartbeat_failure_tolerance_count = 1;
Config.min_clone_task_timeout_sec = 20 * 60 * 1000;
}

@Test
public void test() throws Exception {
connectContext = createDefaultCtx();

createDatabase("db1");
System.out.println(Env.getCurrentInternalCatalog().getDbNames());

// 3. create table tbl1
createTable("create table db1.tbl1(k1 int) distributed by hash(k1) buckets 1;");
RebalancerTestUtil.updateReplicaPathHash();

Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db1");
OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
Assertions.assertNotNull(tbl);
Tablet tablet = tbl.getPartitions().iterator().next()
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
.getTablets().iterator().next();

Assertions.assertEquals(3, tablet.getReplicas().size());
long destBeId = Env.getCurrentSystemInfo().getAllBackendIds(true).stream()
.filter(beId -> tablet.getReplicaByBackendId(beId) == null)
.findFirst()
.orElse(-1L);
Assertions.assertTrue(destBeId != -1L);
Backend destBe = Env.getCurrentSystemInfo().getBackend(destBeId);
Assertions.assertNotNull(destBe);
Assertions.assertTrue(destBe.isAlive());

// add debug point, make clone wait
DebugPointUtil.addDebugPoint("MockedBackendFactory.handleCloneTablet.block");

// move replica[0] to destBeId
Replica srcReplica = tablet.getReplicas().get(0);
String moveTabletSql = "ADMIN SET REPLICA STATUS PROPERTIES(\"tablet_id\" = \"" + tablet.getId() + "\", "
+ "\"backend_id\" = \"" + srcReplica.getBackendId() + "\", \"status\" = \"drop\")";
Assertions.assertNotNull(getSqlStmtExecutor(moveTabletSql));
Assertions.assertFalse(srcReplica.isScheduleAvailable());

Thread.sleep(3000);

Assertions.assertEquals(0, Env.getCurrentEnv().getTabletScheduler().getHistoryTablets(100).size());
Assertions.assertEquals(4, tablet.getReplicas().size());
Replica destReplica = tablet.getReplicaByBackendId(destBeId);
Assertions.assertNotNull(destReplica);
Assertions.assertEquals(Replica.ReplicaState.CLONE, destReplica.getState());

// clone a replica on destBe
List<TabletSchedCtx> runningTablets = Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100);
Assertions.assertEquals(1, runningTablets.size());
Assertions.assertEquals(destBeId, runningTablets.get(0).getDestBackendId());

Map<String, String> params2 = Maps.newHashMap();
params2.put("deadBeIds", String.valueOf(destBeId));
DebugPointUtil.addDebugPointWithParams("HeartbeatMgr.BackendHeartbeatHandler", params2);

Thread.sleep((FeConstants.heartbeat_interval_second
* Config.max_backend_heartbeat_failure_tolerance_count + 4) * 1000L);

destBe = Env.getCurrentSystemInfo().getBackend(destBeId);
Assertions.assertNotNull(destBe);
Assertions.assertFalse(destBe.isAlive());

// delete clone dest task
Assertions.assertFalse(Env.getCurrentEnv().getTabletScheduler().getHistoryTablets(100).isEmpty());

// first drop dest replica (its backend is dead) and src replica (it's mark as drop)
// then re clone a replica to src be, and waiting for cloning.
runningTablets = Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100);
Assertions.assertEquals(1, runningTablets.size());
Assertions.assertEquals(srcReplica.getBackendId(), runningTablets.get(0).getDestBackendId());

DebugPointUtil.removeDebugPoint("MockedBackendFactory.handleCloneTablet.block");
Thread.sleep(2000);

// destBe is dead, cancel clone task
runningTablets = Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100);
Assertions.assertEquals(0, runningTablets.size());

Assertions.assertEquals(3, tablet.getReplicas().size());
for (Replica replica : tablet.getReplicas()) {
Assertions.assertTrue(replica.getBackendId() != destBeId);
Assertions.assertTrue(replica.isScheduleAvailable());
Assertions.assertEquals(Replica.ReplicaState.NORMAL, replica.getState());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DiskInfo.DiskState;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.proto.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.PBackendServiceGrpc;
Expand Down Expand Up @@ -228,6 +229,13 @@ private void handleDropTablet(TAgentTaskRequest request, TFinishTaskRequest fini
}

private void handleCloneTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) {
while (DebugPointUtil.isEnable("MockedBackendFactory.handleCloneTablet.block")) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// ignore
}
}
TCloneReq req = request.getCloneReq();
long dataSize = Math.max(1, CatalogTestUtil.getTabletDataSize(req.tablet_id));
long pathHash = req.dest_path_hash;
Expand Down

0 comments on commit 02b9d7a

Please sign in to comment.