Skip to content

Commit

Permalink
WIP: Integrate state transition logic with server startup
Browse files Browse the repository at this point in the history
This changes integrates replica state transition logic with server
regular startup. For new added replica, state transition should perform
some bootstrap work. For existing replicas, their regular startup won't
be affected.
  • Loading branch information
jsjtzyy committed Nov 30, 2019
1 parent f35f3ea commit 07de3a1
Show file tree
Hide file tree
Showing 23 changed files with 435 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.github.ambry.clustermap;

import com.github.ambry.server.AmbryHealthReport;
import com.github.ambry.server.StateModelListenerType;
import java.io.IOException;
import java.util.List;

Expand Down Expand Up @@ -61,9 +62,11 @@ public interface ClusterParticipant extends AutoCloseable {

/**
* Register a listener for leadership changes in partitions of this node.
* @param listenerType the type of listener, which is defined in {@link StateModelListenerType}
* @param partitionStateChangeListener listener to register.
*/
void registerPartitionStateChangeListener(PartitionStateChangeListener partitionStateChangeListener);
void registerPartitionStateChangeListener(StateModelListenerType listenerType,
PartitionStateChangeListener partitionStateChangeListener);

/**
* Terminate the participant.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,27 @@
*/
public interface PartitionStateChangeListener {

/**
*
* @param partitionName
*/
void onPartitionBecomeBootstrapFromOffline(String partitionName);

/**
*
* @param partitionName
*/
void onPartitionBecomeStandbyFromBootstrap(String partitionName);

/**
* Action to take on becoming leader of a partition.
* @param partitionName of the partition.
*/
void onPartitionStateChangeToLeaderFromStandby(String partitionName);
void onPartitionBecomeLeaderFromStandby(String partitionName);

/**
* Action to take on being removed as leader of a partition.
* @param partitionName of the partition.
*/
void onPartitionStateChangeToStandbyFromLeader(String partitionName);
void onPartitionBecomeStandbyFromLeader(String partitionName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ public class ClusterMapConfig {
@Default(DEFAULT_STATE_MODEL_DEF)
public final String clustermapStateModelDefinition;

@Config("clustermap.enable.state.model.listener")
@Default("false")
public final boolean clustermapEnableStateModelListener;

public ClusterMapConfig(VerifiableProperties verifiableProperties) {
clusterMapFixedTimeoutDatanodeErrorThreshold =
verifiableProperties.getIntInRange("clustermap.fixedtimeout.datanode.error.threshold", 3, 1, 100);
Expand Down Expand Up @@ -235,5 +239,7 @@ public ClusterMapConfig(VerifiableProperties verifiableProperties) {
OLD_STATE_MODEL_DEF)) {
throw new IllegalArgumentException("Unsupported state model definition: " + clustermapStateModelDefinition);
}
clustermapEnableStateModelListener =
verifiableProperties.getBoolean("clustermap.enable.state.model.listener", false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.server;

/**
* The type of partition state model listener.
* The state model listeners implement {@link com.github.ambry.clustermap.PartitionStateChangeListener} in different
* components (i.e. StorageManager, ReplicationManager etc) and take actions when state transition occurs.
*/
public enum StateModelListenerType {
StorageManagerListener, ReplicationManagerListener, StatsManagerListener, CloudToStoreReplicationManagerListener
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.server;

public class StateTransitionException extends RuntimeException {
private static final long serialVersionUID = 1L;
private final TransitionErrorCode error;

public StateTransitionException(String s, TransitionErrorCode error) {
super(s);
this.error = error;
}

public TransitionErrorCode getErrorCode() {
return error;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.server;

public enum TransitionErrorCode {
ReplicaNotFound, StoreOperationFailure, StoreNotStarted
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.github.ambry.clustermap;

import com.github.ambry.config.ClusterMapConfig;
import java.util.Objects;
import org.apache.helix.NotificationContext;
import org.apache.helix.model.Message;
Expand All @@ -30,41 +31,48 @@ public class AmbryPartitionStateModel extends StateModel {
private final String resourceName;
private final String partitionName;
private final PartitionStateChangeListener partitionStateChangeListener;
private final ClusterMapConfig clusterMapConfig;

AmbryPartitionStateModel(String resourceName, String partitionName,
PartitionStateChangeListener partitionStateChangeListener) {
PartitionStateChangeListener partitionStateChangeListener, ClusterMapConfig clusterMapConfig) {
this.resourceName = resourceName;
this.partitionName = partitionName;
this.partitionStateChangeListener = Objects.requireNonNull(partitionStateChangeListener);
this.clusterMapConfig = Objects.requireNonNull(clusterMapConfig);
StateModelParser parser = new StateModelParser();
_currentState = parser.getInitialState(AmbryPartitionStateModel.class);
}

@Transition(to = "BOOTSTRAP", from = "OFFLINE")
public void onBecomeBootstrapFromOffline(Message message, NotificationContext context) {
// TODO to distinguish between regular start and dynamic replica addition, check 1. store dir, 2. if there is bootstrap log
logger.info("Partition {} in resource {} is becoming BOOTSTRAP from OFFLINE", message.getPartitionName(),
message.getResourceName());
if (clusterMapConfig.clustermapEnableStateModelListener) {
partitionStateChangeListener.onPartitionBecomeBootstrapFromOffline(message.getPartitionName());
}
}

@Transition(to = "STANDBY", from = "BOOTSTRAP")
public void onBecomeStandbyFromBootstrap(Message message, NotificationContext context) {
logger.info("Partition {} in resource {} is becoming STANDBY from BOOTSTRAP", message.getPartitionName(),
message.getResourceName());
if (clusterMapConfig.clustermapEnableStateModelListener) {
partitionStateChangeListener.onPartitionBecomeStandbyFromBootstrap(message.getPartitionName());
}
}

@Transition(to = "LEADER", from = "STANDBY")
public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
logger.info("Partition {} in resource {} is becoming LEADER from STANDBY", message.getPartitionName(),
message.getResourceName());
partitionStateChangeListener.onPartitionStateChangeToLeaderFromStandby(message.getPartitionName());
partitionStateChangeListener.onPartitionBecomeLeaderFromStandby(message.getPartitionName());
}

@Transition(to = "STANDBY", from = "LEADER")
public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
logger.info("Partition {} in resource {} is becoming STANDBY from LEADER", message.getPartitionName(),
message.getResourceName());
partitionStateChangeListener.onPartitionStateChangeToStandbyFromLeader(message.getPartitionName());
partitionStateChangeListener.onPartitionBecomeStandbyFromLeader(message.getPartitionName());
}

@Transition(to = "INACTIVE", from = "STANDBY")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.github.ambry.clustermap;

import com.github.ambry.config.ClusterMapConfig;
import org.apache.helix.model.LeaderStandbySMD;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
Expand All @@ -22,11 +23,11 @@
* A factory for creating {@link StateModel}.
*/
class AmbryStateModelFactory extends StateModelFactory<StateModel> {
private final String ambryStateModelDef;
private final ClusterMapConfig clustermapConfig;
private final PartitionStateChangeListener partitionStateChangeListener;

AmbryStateModelFactory(String stateModelDef, PartitionStateChangeListener partitionStateChangeListener) {
ambryStateModelDef = stateModelDef;
AmbryStateModelFactory(ClusterMapConfig clusterMapConfig, PartitionStateChangeListener partitionStateChangeListener) {
this.clustermapConfig = clusterMapConfig;
this.partitionStateChangeListener = partitionStateChangeListener;
}

Expand All @@ -39,15 +40,19 @@ class AmbryStateModelFactory extends StateModelFactory<StateModel> {
@Override
public StateModel createNewStateModel(String resourceName, String partitionName) {
StateModel stateModelToReturn;
switch (ambryStateModelDef) {
switch (clustermapConfig.clustermapStateModelDefinition) {
case AmbryStateModelDefinition.AMBRY_LEADER_STANDBY_MODEL:
stateModelToReturn = new AmbryPartitionStateModel(resourceName, partitionName, partitionStateChangeListener);
stateModelToReturn =
new AmbryPartitionStateModel(resourceName, partitionName, partitionStateChangeListener, clustermapConfig);
break;
case LeaderStandbySMD.name:
stateModelToReturn = new DefaultLeaderStandbyStateModel();
break;
default:
throw new IllegalArgumentException("Unsupported state model definition: " + ambryStateModelDef);
// Code won't get here because state model def is already validated in ClusterMapConfig. We keep exception here
// in case the validation logic is changed in ClusterMapConfig.
throw new IllegalArgumentException(
"Unsupported state model definition: " + clustermapConfig.clustermapStateModelDefinition);
}
return stateModelToReturn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.server.AmbryHealthReport;
import com.github.ambry.server.StateModelListenerType;
import com.github.ambry.utils.Utils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -44,16 +44,17 @@
* An implementation of {@link ClusterParticipant} that registers as a participant to a Helix cluster.
*/
class HelixParticipant implements ClusterParticipant, PartitionStateChangeListener {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final String clusterName;
private final String zkConnectStr;
private final HelixFactory helixFactory;
private final Object helixAdministrationLock = new Object();
private final ClusterMapConfig clusterMapConfig;
private final Map<StateModelListenerType, PartitionStateChangeListener> partitionStateChangeListeners;
private HelixManager manager;
private String instanceName;
private HelixAdmin helixAdmin;
private List<PartitionStateChangeListener> partitionStateChangeListeners;

private static final Logger logger = LoggerFactory.getLogger(HelixParticipant.class);

/**
* Instantiate a HelixParticipant.
Expand Down Expand Up @@ -81,7 +82,7 @@ class HelixParticipant implements ClusterParticipant, PartitionStateChangeListen
throw new IOException("Received JSON exception while parsing ZKInfo json string", e);
}
manager = helixFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkConnectStr);
partitionStateChangeListeners = new LinkedList<>();
partitionStateChangeListeners = new HashMap<>();
}

/**
Expand All @@ -96,7 +97,7 @@ public void participate(List<AmbryHealthReport> ambryHealthReports) throws IOExc
clusterMapConfig.clustermapStateModelDefinition);
StateMachineEngine stateMachineEngine = manager.getStateMachineEngine();
stateMachineEngine.registerStateModelFactory(clusterMapConfig.clustermapStateModelDefinition,
new AmbryStateModelFactory(clusterMapConfig.clustermapStateModelDefinition, this));
new AmbryStateModelFactory(clusterMapConfig, this));
registerHealthReportTasks(stateMachineEngine, ambryHealthReports);
try {
synchronized (helixAdministrationLock) {
Expand All @@ -117,8 +118,9 @@ public void participate(List<AmbryHealthReport> ambryHealthReports) throws IOExc
}

@Override
public void registerPartitionStateChangeListener(PartitionStateChangeListener partitionStateChangeListener) {
partitionStateChangeListeners.add(partitionStateChangeListener);
public void registerPartitionStateChangeListener(StateModelListenerType listenerType,
PartitionStateChangeListener partitionStateChangeListener) {
partitionStateChangeListeners.put(listenerType, partitionStateChangeListener);
}

@Override
Expand Down Expand Up @@ -275,16 +277,38 @@ boolean setStoppedReplicas(List<String> stoppedReplicas) {
}

@Override
public void onPartitionStateChangeToLeaderFromStandby(String partitionName) {
for (PartitionStateChangeListener partitionStateChangeListener : partitionStateChangeListeners) {
partitionStateChangeListener.onPartitionStateChangeToLeaderFromStandby(partitionName);
public void onPartitionBecomeBootstrapFromOffline(String partitionName) {
PartitionStateChangeListener storageManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.StorageManagerListener);
if (storageManagerListener != null) {
storageManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName);
}
}

@Override
public void onPartitionBecomeStandbyFromBootstrap(String partitionName) {
PartitionStateChangeListener storageManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.StorageManagerListener);
if (storageManagerListener != null) {
storageManagerListener.onPartitionBecomeStandbyFromBootstrap(partitionName);
}
}

@Override
public void onPartitionBecomeLeaderFromStandby(String partitionName) {
PartitionStateChangeListener cloudToStoreReplicationListener =
partitionStateChangeListeners.get(StateModelListenerType.CloudToStoreReplicationManagerListener);
if (cloudToStoreReplicationListener != null) {
cloudToStoreReplicationListener.onPartitionBecomeLeaderFromStandby(partitionName);
}
}

@Override
public void onPartitionStateChangeToStandbyFromLeader(String partitionName) {
for (PartitionStateChangeListener partitionStateChangeListener : partitionStateChangeListeners) {
partitionStateChangeListener.onPartitionStateChangeToStandbyFromLeader(partitionName);
public void onPartitionBecomeStandbyFromLeader(String partitionName) {
PartitionStateChangeListener cloudToStoreReplicationListener =
partitionStateChangeListeners.get(StateModelListenerType.CloudToStoreReplicationManagerListener);
if (cloudToStoreReplicationListener != null) {
cloudToStoreReplicationListener.onPartitionBecomeStandbyFromLeader(partitionName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.codahale.metrics.MetricRegistry;
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.server.AmbryHealthReport;
import com.github.ambry.server.StateModelListenerType;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
Expand Down Expand Up @@ -82,7 +83,7 @@ public ClusterParticipant getClusterParticipant() throws IOException {
public void participate(List<AmbryHealthReport> ambryHealthReports) {
for (PartitionStateChangeListener listener : listeners) {
for (String partitionName : partitionLayout.getAllPartitionNames()) {
listener.onPartitionStateChangeToLeaderFromStandby(partitionName);
listener.onPartitionBecomeLeaderFromStandby(partitionName);
}
}
}
Expand Down Expand Up @@ -113,7 +114,8 @@ public List<String> getStoppedReplicas() {
}

@Override
public void registerPartitionStateChangeListener(PartitionStateChangeListener partitionStateChangeListener) {
public void registerPartitionStateChangeListener(StateModelListenerType listenerType,
PartitionStateChangeListener partitionStateChangeListener) {
listeners.add(partitionStateChangeListener);
}
};
Expand Down
Loading

0 comments on commit 07de3a1

Please sign in to comment.