diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 3341f5bb305962..715739310bdb58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -1822,36 +1822,64 @@ private void gatherStatistics(TabletSchedCtx tabletCtx) { * If task is timeout, remove the tablet. */ public void handleRunningTablets() { + Set aliveBeIds = Sets.newHashSet(Env.getCurrentSystemInfo().getAllBackendIds(true)); // 1. remove the tablet ctx if timeout - List timeoutTablets = Lists.newArrayList(); + List cancelTablets = Lists.newArrayList(); synchronized (this) { - runningTablets.values().stream().filter(TabletSchedCtx::isTimeout).forEach(timeoutTablets::add); + for (TabletSchedCtx tabletCtx : runningTablets.values()) { + long srcBeId = tabletCtx.getSrcBackendId(); + long destBeId = tabletCtx.getDestBackendId(); + if (Config.disable_tablet_scheduler) { + tabletCtx.setErrMsg("tablet scheduler is disabled"); + cancelTablets.add(tabletCtx); + } else if (Config.disable_balance && tabletCtx.getType() == Type.BALANCE) { + tabletCtx.setErrMsg("balance is disabled"); + cancelTablets.add(tabletCtx); + } else if (tabletCtx.isTimeout()) { + tabletCtx.setErrMsg("timeout"); + cancelTablets.add(tabletCtx); + stat.counterCloneTaskTimeout.incrementAndGet(); + } else if (destBeId > 0 && !aliveBeIds.contains(destBeId)) { + tabletCtx.setErrMsg("dest be " + destBeId + " is dead"); + cancelTablets.add(tabletCtx); + } else if (srcBeId > 0 && !aliveBeIds.contains(srcBeId)) { + tabletCtx.setErrMsg("src be " + srcBeId + " is dead"); + cancelTablets.add(tabletCtx); + } + } } // 2. release ctx - timeoutTablets.forEach(t -> { + cancelTablets.forEach(t -> { // Set "resetReplicaState" to true because // the timeout task should also be considered as UNRECOVERABLE, // so need to reset replica state. - t.setErrMsg("timeout"); - finalizeTabletCtx(t, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, "timeout"); - stat.counterCloneTaskTimeout.incrementAndGet(); + finalizeTabletCtx(t, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, t.getErrMsg()); }); } public List> getPendingTabletsInfo(int limit) { - List tabletCtxs = getCopiedTablets(pendingTablets, limit); - return collectTabletCtx(tabletCtxs); + return collectTabletCtx(getPendingTablets(limit)); + } + + public List getPendingTablets(int limit) { + return getCopiedTablets(pendingTablets, limit); } public List> getRunningTabletsInfo(int limit) { - List tabletCtxs = getCopiedTablets(runningTablets.values(), limit); - return collectTabletCtx(tabletCtxs); + return collectTabletCtx(getRunningTablets(limit)); + } + + public List getRunningTablets(int limit) { + return getCopiedTablets(runningTablets.values(), limit); } public List> getHistoryTabletsInfo(int limit) { - List tabletCtxs = getCopiedTablets(schedHistory, limit); - return collectTabletCtx(tabletCtxs); + return collectTabletCtx(getHistoryTablets(limit)); + } + + public List getHistoryTablets(int limit) { + return getCopiedTablets(schedHistory, limit); } private List> collectTabletCtx(List tabletCtxs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java index da06232f0c0f5e..420cee77631a4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java @@ -134,12 +134,18 @@ public static void addDebugPoint(String name) { addDebugPoint(name, new DebugPoint()); } - public static void addDebugPointWithValue(String name, E value) { + public static void addDebugPointWithParams(String name, Map params) { DebugPoint debugPoint = new DebugPoint(); - debugPoint.params.put("value", String.format("%s", value)); + debugPoint.params = params; addDebugPoint(name, debugPoint); } + public static void addDebugPointWithValue(String name, E value) { + Map params = Maps.newHashMap(); + params.put("value", String.format("%s", value)); + addDebugPointWithParams(name, params); + } + public static void removeDebugPoint(String name) { DebugPoint debugPoint = debugPoints.remove(name); LOG.info("remove debug point: name={}, exists={}", name, debugPoint != null); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index dc4c28217b9933..7c081c12cd07b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -24,6 +24,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.Version; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.persist.HbPackage; import org.apache.doris.resource.Tag; @@ -53,6 +54,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -237,6 +239,14 @@ public HeartbeatResponse call() { result.setBackendInfo(backendInfo); } + String debugDeadBeIds = DebugPointUtil.getDebugParamOrDefault( + "HeartbeatMgr.BackendHeartbeatHandler", "deadBeIds", ""); + if (!Strings.isNullOrEmpty(debugDeadBeIds) + && Arrays.stream(debugDeadBeIds.split(",")).anyMatch(id -> Long.parseLong(id) == backendId)) { + result.getStatus().setStatusCode(TStatusCode.INTERNAL_ERROR); + result.getStatus().addToErrorMsgs("debug point HeartbeatMgr.deadBeIds set dead be"); + } + ok = true; if (result.getStatus().getStatusCode() == TStatusCode.OK) { TBackendInfo tBackendInfo = result.getBackendInfo(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java new file mode 100644 index 00000000000000..4a413495e98f2e --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.DebugPointUtil; +import org.apache.doris.system.Backend; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Maps; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +public class BeDownCancelCloneTest extends TestWithFeService { + + @Override + protected int backendNum() { + return 4; + } + + @Override + protected void beforeCreatingConnectContext() throws Exception { + FeConstants.runningUnitTest = true; + FeConstants.default_scheduler_interval_millisecond = 1000; + Config.enable_debug_points = true; + FeConstants.tablet_checker_interval_ms = 100; + Config.tablet_repair_delay_factor_second = 1; + Config.allow_replica_on_same_host = true; + Config.disable_balance = true; + Config.schedule_batch_size = 1000; + Config.schedule_slot_num_per_hdd_path = 1000; + FeConstants.heartbeat_interval_second = 5; + Config.max_backend_heartbeat_failure_tolerance_count = 1; + Config.min_clone_task_timeout_sec = 20 * 60 * 1000; + } + + @Test + public void test() throws Exception { + connectContext = createDefaultCtx(); + + createDatabase("db1"); + System.out.println(Env.getCurrentInternalCatalog().getDbNames()); + + // 3. create table tbl1 + createTable("create table db1.tbl1(k1 int) distributed by hash(k1) buckets 1;"); + RebalancerTestUtil.updateReplicaPathHash(); + + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db1"); + OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1"); + Assertions.assertNotNull(tbl); + Tablet tablet = tbl.getPartitions().iterator().next() + .getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next() + .getTablets().iterator().next(); + + Assertions.assertEquals(3, tablet.getReplicas().size()); + long destBeId = Env.getCurrentSystemInfo().getAllBackendIds(true).stream() + .filter(beId -> tablet.getReplicaByBackendId(beId) == null) + .findFirst() + .orElse(-1L); + Assertions.assertTrue(destBeId != -1L); + Backend destBe = Env.getCurrentSystemInfo().getBackend(destBeId); + Assertions.assertNotNull(destBe); + Assertions.assertTrue(destBe.isAlive()); + + // add debug point, make clone wait + DebugPointUtil.addDebugPoint("MockedBackendFactory.handleCloneTablet.block"); + + // move replica[0] to destBeId + Replica srcReplica = tablet.getReplicas().get(0); + String moveTabletSql = "ADMIN SET REPLICA STATUS PROPERTIES(\"tablet_id\" = \"" + tablet.getId() + "\", " + + "\"backend_id\" = \"" + srcReplica.getBackendId() + "\", \"status\" = \"drop\")"; + Assertions.assertNotNull(getSqlStmtExecutor(moveTabletSql)); + Assertions.assertFalse(srcReplica.isScheduleAvailable()); + + Thread.sleep(3000); + + Assertions.assertEquals(0, Env.getCurrentEnv().getTabletScheduler().getHistoryTablets(100).size()); + Assertions.assertEquals(4, tablet.getReplicas().size()); + Replica destReplica = tablet.getReplicaByBackendId(destBeId); + Assertions.assertNotNull(destReplica); + Assertions.assertEquals(Replica.ReplicaState.CLONE, destReplica.getState()); + + // clone a replica on destBe + List runningTablets = Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100); + Assertions.assertEquals(1, runningTablets.size()); + Assertions.assertEquals(destBeId, runningTablets.get(0).getDestBackendId()); + + Map params2 = Maps.newHashMap(); + params2.put("deadBeIds", String.valueOf(destBeId)); + DebugPointUtil.addDebugPointWithParams("HeartbeatMgr.BackendHeartbeatHandler", params2); + + Thread.sleep((FeConstants.heartbeat_interval_second + * Config.max_backend_heartbeat_failure_tolerance_count + 4) * 1000L); + + destBe = Env.getCurrentSystemInfo().getBackend(destBeId); + Assertions.assertNotNull(destBe); + Assertions.assertFalse(destBe.isAlive()); + + // delete clone dest task + Assertions.assertFalse(Env.getCurrentEnv().getTabletScheduler().getHistoryTablets(100).isEmpty()); + + // first drop dest replica (its backend is dead) and src replica (it's mark as drop) + // then re clone a replica to src be, and waiting for cloning. + runningTablets = Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100); + Assertions.assertEquals(1, runningTablets.size()); + Assertions.assertEquals(srcReplica.getBackendId(), runningTablets.get(0).getDestBackendId()); + + DebugPointUtil.removeDebugPoint("MockedBackendFactory.handleCloneTablet.block"); + Thread.sleep(2000); + + // destBe is dead, cancel clone task + runningTablets = Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100); + Assertions.assertEquals(0, runningTablets.size()); + + Assertions.assertEquals(3, tablet.getReplicas().size()); + for (Replica replica : tablet.getReplicas()) { + Assertions.assertTrue(replica.getBackendId() != destBeId); + Assertions.assertTrue(replica.isScheduleAvailable()); + Assertions.assertEquals(Replica.ReplicaState.NORMAL, replica.getState()); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index cb228043fddecd..665dc8163aeef0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.DiskInfo.DiskState; import org.apache.doris.common.ClientPool; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.proto.Data; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.PBackendServiceGrpc; @@ -228,6 +229,13 @@ private void handleDropTablet(TAgentTaskRequest request, TFinishTaskRequest fini } private void handleCloneTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) { + while (DebugPointUtil.isEnable("MockedBackendFactory.handleCloneTablet.block")) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + // ignore + } + } TCloneReq req = request.getCloneReq(); long dataSize = Math.max(1, CatalogTestUtil.getTabletDataSize(req.tablet_id)); long pathHash = req.dest_path_hash;