Skip to content

Commit

Permalink
[IOTDB-6301] Optimize insert first (apache#12080)
Browse files Browse the repository at this point in the history
Co-authored-by: Xiangpeng Hu <65238551+BUAAserein@users.noreply.github.com>
  • Loading branch information
CRZbulabula and HxpSerein authored Mar 4, 2024
1 parent 82165f7 commit 7acc5cd
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iotdb.confignode.manager.load.service.StatisticsService;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.tsfile.utils.Pair;

import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
Expand Down Expand Up @@ -413,4 +414,14 @@ public void forceUpdateRegionPriority(
public void removeRegionRouteCache(TConsensusGroupId regionGroupId) {
loadCache.removeRegionRouteCache(regionGroupId);
}

/** Force balance the region leader and broadcast RouteChangeEvent if necessary. */
public void forceBalanceRegionLeader() {
Map<TConsensusGroupId, Pair<Integer, Integer>> differentRegionLeaderMap =
routeBalancer.balanceRegionLeader();
Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> differentRegionPriorityMap =
routeBalancer.balanceRegionPriority();
statisticsService.broadcastRouteChangeEventIfNecessary(
differentRegionLeaderMap, differentRegionPriorityMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;

import org.slf4j.Logger;
Expand Down Expand Up @@ -74,12 +75,16 @@ public class RouteBalancer {
(CONF.isEnableAutoLeaderBalanceForRatisConsensus()
&& ConsensusFactory.RATIS_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&& ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS));
&& ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
// The simple consensus protocol will always automatically designate itself as the leader
|| ConsensusFactory.SIMPLE_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS);
private static final boolean IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION =
(CONF.isEnableAutoLeaderBalanceForRatisConsensus()
&& ConsensusFactory.RATIS_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&& ConsensusFactory.IOT_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS));
&& ConsensusFactory.IOT_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS))
// The simple consensus protocol will always automatically designate itself as the leader
|| ConsensusFactory.SIMPLE_CONSENSUS.equals(SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS);

private final IManager configManager;

Expand Down Expand Up @@ -114,11 +119,12 @@ public RouteBalancer(IManager configManager) {
}

/**
* Balance cluster RegionGroup leader distribution through configured algorithm
* Balance cluster RegionGroup leader distribution through configured algorithm TODO: @YongzaoDan,
* increase scheduling delay
*
* @return Map<RegionGroupId, Pair<old leader index, new leader index>>
*/
public Map<TConsensusGroupId, Pair<Integer, Integer>> balanceRegionLeader() {
public synchronized Map<TConsensusGroupId, Pair<Integer, Integer>> balanceRegionLeader() {
Map<TConsensusGroupId, Pair<Integer, Integer>> differentRegionLeaderMap =
new ConcurrentHashMap<>();
if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION) {
Expand Down Expand Up @@ -184,6 +190,21 @@ private Map<TConsensusGroupId, Pair<Integer, Integer>> balanceRegionLeader(
if (requestId.get() > 0) {
// Don't retry ChangeLeader request
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNode(clientHandler);
for (int i = 0; i < requestId.get(); i++) {
if (clientHandler.getResponseMap().get(i).getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
getLoadManager()
.forceUpdateRegionLeader(
clientHandler.getRequest(i).getRegionId(),
clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId());
} else {
differentRegionLeaderMap.remove(clientHandler.getRequest(i).getRegionId());
LOGGER.error(
"[LeaderBalancer] Failed to change the leader of Region: {} to DataNode: {}",
clientHandler.getRequest(i).getRegionId(),
clientHandler.getRequest(i).getNewLeaderNode().getDataNodeId());
}
}
}
return differentRegionLeaderMap;
}
Expand All @@ -196,7 +217,8 @@ private void changeRegionLeader(
TDataNodeLocation newLeader) {
switch (consensusProtocolClass) {
case ConsensusFactory.IOT_CONSENSUS:
// For IoTConsensus protocol, change RegionRouteMap is enough.
case ConsensusFactory.SIMPLE_CONSENSUS:
// For IoTConsensus or SimpleConsensus protocol, change RegionRouteMap is enough.
// And the result will be broadcast by Cluster-LoadStatistics-Service soon.
getLoadManager().forceUpdateRegionLeader(regionGroupId, newLeader.getDataNodeId());
break;
Expand All @@ -216,11 +238,12 @@ private void changeRegionLeader(
}

/**
* Balance cluster RegionGroup route priority through configured algorithm
* Balance cluster RegionGroup route priority through configured algorithm TODO: @YongzaoDan,
* increase scheduling delay
*
* @return Map<RegionGroupId, Pair<old route priority, new route priority>>
*/
public Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>>
public synchronized Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>>
balanceRegionPriority() {

Map<TConsensusGroupId, TRegionReplicaSet> currentPriorityMap =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
import org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache;
Expand Down Expand Up @@ -64,8 +62,7 @@ public class LoadCache {

private static final Logger LOGGER = LoggerFactory.getLogger(LoadCache.class);

private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
private static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
private static final long WAIT_LEADER_INTERVAL = 50;
private static final long LEADER_ELECTION_WAITING_TIMEOUT =
Math.max(
ProcedureManager.PROCEDURE_WAIT_TIME_OUT - TimeUnit.SECONDS.toMillis(2),
Expand Down Expand Up @@ -557,7 +554,7 @@ public void waitForLeaderElection(List<TConsensusGroupId> regionGroupIds) {
return;
}
try {
TimeUnit.MILLISECONDS.sleep(HEARTBEAT_INTERVAL);
TimeUnit.MILLISECONDS.sleep(WAIT_LEADER_INTERVAL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupt when wait for leader election", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.tsfile.utils.Pair;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class RegionRouteCache {
private static final Logger LOGGER = LoggerFactory.getLogger(RegionRouteCache.class);

private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
private static final String SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS =
Expand Down Expand Up @@ -116,6 +120,7 @@ public boolean periodicUpdate() {
* @param leaderId Leader DataNodeId
*/
public void forceUpdateRegionLeader(int leaderId) {
cacheLeaderSample(new Pair<>(System.currentTimeMillis(), leaderId));
this.leaderId.set(leaderId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,7 @@ private void updateLoadStatistics() {
// Map<RegionGroupId, Pair<old priority, new priority>>
Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>>
differentRegionPriorityMap = routeBalancer.balanceRegionPriority();

if (containsChangeEvent(differentRegionLeaderMap)
|| containsChangeEvent(differentRegionPriorityMap)) {
eventBus.post(new RouteChangeEvent(differentRegionLeaderMap, differentRegionPriorityMap));
}
broadcastRouteChangeEventIfNecessary(differentRegionLeaderMap, differentRegionPriorityMap);
}

if (isNeedBroadcast) {
Expand All @@ -160,6 +156,16 @@ private void updateLoadStatistics() {
}
}

public void broadcastRouteChangeEventIfNecessary(
Map<TConsensusGroupId, Pair<Integer, Integer>> differentRegionLeaderMap,
Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>>
differentRegionPriorityMap) {
if (containsChangeEvent(differentRegionLeaderMap)
|| containsChangeEvent(differentRegionPriorityMap)) {
eventBus.post(new RouteChangeEvent(differentRegionLeaderMap, differentRegionPriorityMap));
}
}

private static <T> boolean containsChangeEvent(Map<TConsensusGroupId, Pair<T, T>> map) {
return !map.isEmpty()
&& map.values().stream().anyMatch(pair -> !Objects.equals(pair.getLeft(), pair.getRight()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,8 @@ public void activateRegionGroup(
heartbeatSampleMap.put(
dataNodeId, new RegionHeartbeatSample(currentTime, currentTime, regionStatus)));
getLoadManager().forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap);
// force balance region leader to skip waiting for leader election
getLoadManager().forceBalanceRegionLeader();
// Wait for leader election
getLoadManager().waitForLeaderElection(Collections.singletonList(regionGroupId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;

import org.reflections.Reflections;
import org.reflections.util.ConfigurationBuilder;
Expand Down Expand Up @@ -105,6 +106,19 @@ public void init(String schemaEngineMode) {
currentMode = schemaEngineMode;
currentConstructor = constructor;
}
if (currentMode.equals(SchemaConstant.DEFAULT_SCHEMA_ENGINE_MODE)) {
MNodeFactoryLoader.getInstance().getMemMNodeIMNodeFactory();
logger.info(
"[SchemaRegionLoader], schemaEngineMode:{}, currentMode:{}",
schemaEngineMode,
currentMode);
} else {
MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
logger.info(
"[SchemaRegionLoader], schemaEngineMode:{}, currentMode:{}",
schemaEngineMode,
currentMode);
}
}

public ISchemaRegion createSchemaRegion(ISchemaRegionParams schemaRegionParams)
Expand Down

0 comments on commit 7acc5cd

Please sign in to comment.