diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index b52a2d5a1dcb..0b7b9b8a6fdb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -45,6 +45,7 @@ import org.apache.iotdb.confignode.manager.load.service.StatisticsService; import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.tsfile.utils.Pair; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; @@ -413,4 +414,14 @@ public void forceUpdateRegionPriority( public void removeRegionRouteCache(TConsensusGroupId regionGroupId) { loadCache.removeRegionRouteCache(regionGroupId); } + + /** Force balance the region leader and broadcast RouteChangeEvent if necessary. */ + public void forceBalanceRegionLeader() { + Map> differentRegionLeaderMap = + routeBalancer.balanceRegionLeader(); + Map> differentRegionPriorityMap = + routeBalancer.balanceRegionPriority(); + statisticsService.broadcastRouteChangeEventIfNecessary( + differentRegionLeaderMap, differentRegionPriorityMap); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index f8f1c92484f9..463a5df0dc73 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -42,6 +42,7 @@ import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.Pair; import org.slf4j.Logger; @@ -74,12 +75,16 @@ public class RouteBalancer { (CONF.isEnableAutoLeaderBalanceForRatisConsensus() && ConsensusFactory.RATIS_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS)) || (CONF.isEnableAutoLeaderBalanceForIoTConsensus() - && ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS)); + && ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS)) + // The simple consensus protocol will always automatically designate itself as the leader + || ConsensusFactory.SIMPLE_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS); private static final boolean IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION = (CONF.isEnableAutoLeaderBalanceForRatisConsensus() && ConsensusFactory.RATIS_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS)) || (CONF.isEnableAutoLeaderBalanceForIoTConsensus() - && ConsensusFactory.IOT_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS)); + && ConsensusFactory.IOT_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS)) + // The simple consensus protocol will always automatically designate itself as the leader + || ConsensusFactory.SIMPLE_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS); private final IManager configManager; @@ -114,11 +119,12 @@ public RouteBalancer(IManager configManager) { } /** - * Balance cluster RegionGroup leader distribution through configured algorithm + * Balance cluster RegionGroup leader distribution through configured algorithm TODO: @YongzaoDan, + * increase scheduling delay * * @return Map> */ - public Map> balanceRegionLeader() { + public synchronized Map> balanceRegionLeader() { Map> differentRegionLeaderMap = new ConcurrentHashMap<>(); if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION) { @@ -184,6 +190,21 @@ private Map> balanceRegionLeader( if (requestId.get() > 0) { // Don't retry ChangeLeader request AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNode(clientHandler); + for (int i = 0; i < requestId.get(); i++) { + if (clientHandler.getResponseMap().get(i).getCode() + == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + getLoadManager() + .forceUpdateRegionLeader( + clientHandler.getRequest(i).getRegionId(), + clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId()); + } else { + differentRegionLeaderMap.remove(clientHandler.getRequest(i).getRegionId()); + LOGGER.error( + "[LeaderBalancer] Failed to change the leader of Region: {} to DataNode: {}", + clientHandler.getRequest(i).getRegionId(), + clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId()); + } + } } return differentRegionLeaderMap; } @@ -196,7 +217,8 @@ private void changeRegionLeader( TDataNodeLocation newLeader) { switch (consensusProtocolClass) { case ConsensusFactory.IOT_CONSENSUS: - // For IoTConsensus protocol, change RegionRouteMap is enough. + case ConsensusFactory.SIMPLE_CONSENSUS: + // For IoTConsensus or SimpleConsensus protocol, change RegionRouteMap is enough. // And the result will be broadcast by Cluster-LoadStatistics-Service soon. getLoadManager().forceUpdateRegionLeader(regionGroupId, newLeader.getDataNodeId()); break; @@ -216,11 +238,12 @@ private void changeRegionLeader( } /** - * Balance cluster RegionGroup route priority through configured algorithm + * Balance cluster RegionGroup route priority through configured algorithm TODO: @YongzaoDan, + * increase scheduling delay * * @return Map> */ - public Map> + public synchronized Map> balanceRegionPriority() { Map currentPriorityMap = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 3ecc795ebf08..310c08b88b3a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -27,8 +27,6 @@ import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.cluster.NodeType; import org.apache.iotdb.commons.cluster.RegionStatus; -import org.apache.iotdb.confignode.conf.ConfigNodeConfig; -import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.ProcedureManager; import org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache; @@ -64,8 +62,7 @@ public class LoadCache { private static final Logger LOGGER = LoggerFactory.getLogger(LoadCache.class); - private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); - private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs(); + private static final long WAIT_LEADER_INTERVAL = 50; private static final long LEADER_ELECTION_WAITING_TIMEOUT = Math.max( ProcedureManager.PROCEDURE_WAIT_TIME_OUT - TimeUnit.SECONDS.toMillis(2), @@ -557,7 +554,7 @@ public void waitForLeaderElection(List regionGroupIds) { return; } try { - TimeUnit.MILLISECONDS.sleep(HEARTBEAT_INTERVAL); + TimeUnit.MILLISECONDS.sleep(WAIT_LEADER_INTERVAL); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.warn("Interrupt when wait for leader election", e); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/route/RegionRouteCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/route/RegionRouteCache.java index 02d82a34338c..95b749f58878 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/route/RegionRouteCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/route/RegionRouteCache.java @@ -26,10 +26,14 @@ import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class RegionRouteCache { + private static final Logger LOGGER = LoggerFactory.getLogger(RegionRouteCache.class); private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); private static final String SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS = @@ -116,6 +120,7 @@ public boolean periodicUpdate() { * @param leaderId Leader DataNodeId */ public void forceUpdateRegionLeader(int leaderId) { + cacheLeaderSample(new Pair<>(System.currentTimeMillis(), leaderId)); this.leaderId.set(leaderId); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java index 0b4346e50cf6..4d98a66106b3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java @@ -146,11 +146,7 @@ private void updateLoadStatistics() { // Map> Map> differentRegionPriorityMap = routeBalancer.balanceRegionPriority(); - - if (containsChangeEvent(differentRegionLeaderMap) - || containsChangeEvent(differentRegionPriorityMap)) { - eventBus.post(new RouteChangeEvent(differentRegionLeaderMap, differentRegionPriorityMap)); - } + broadcastRouteChangeEventIfNecessary(differentRegionLeaderMap, differentRegionPriorityMap); } if (isNeedBroadcast) { @@ -160,6 +156,16 @@ private void updateLoadStatistics() { } } + public void broadcastRouteChangeEventIfNecessary( + Map> differentRegionLeaderMap, + Map> + differentRegionPriorityMap) { + if (containsChangeEvent(differentRegionLeaderMap) + || containsChangeEvent(differentRegionPriorityMap)) { + eventBus.post(new RouteChangeEvent(differentRegionLeaderMap, differentRegionPriorityMap)); + } + } + private static boolean containsChangeEvent(Map> map) { return !map.isEmpty() && map.values().stream().anyMatch(pair -> !Objects.equals(pair.getLeft(), pair.getRight())); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index e919f127d7f3..148677fea142 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -575,6 +575,8 @@ public void activateRegionGroup( heartbeatSampleMap.put( dataNodeId, new RegionHeartbeatSample(currentTime, currentTime, regionStatus))); getLoadManager().forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap); + // force balance region leader to skip waiting for leader election + getLoadManager().forceBalanceRegionLeader(); // Wait for leader election getLoadManager().waitForLeaderElection(Collections.singletonList(regionGroupId)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionLoader.java index feda7e54e3ca..5d47acfda5c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionLoader.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.schema.SchemaConstant; +import org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader; import org.reflections.Reflections; import org.reflections.util.ConfigurationBuilder; @@ -105,6 +106,19 @@ public void init(String schemaEngineMode) { currentMode = schemaEngineMode; currentConstructor = constructor; } + if (currentMode.equals(SchemaConstant.DEFAULT_SCHEMA_ENGINE_MODE)) { + MNodeFactoryLoader.getInstance().getMemMNodeIMNodeFactory(); + logger.info( + "[SchemaRegionLoader], schemaEngineMode:{}, currentMode:{}", + schemaEngineMode, + currentMode); + } else { + MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory(); + logger.info( + "[SchemaRegionLoader], schemaEngineMode:{}, currentMode:{}", + schemaEngineMode, + currentMode); + } } public ISchemaRegion createSchemaRegion(ISchemaRegionParams schemaRegionParams)