Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Offline-To-Bootstrap state transition in StorageManager #1326

Merged
merged 4 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ public interface ClusterParticipant extends AutoCloseable {

/**
* Register a listener for leadership changes in partitions of this node.
* @param listenerType the type of listener, which is defined in {@link StateModelListenerType}
* @param partitionStateChangeListener listener to register.
*/
void registerPartitionStateChangeListener(PartitionStateChangeListener partitionStateChangeListener);
void registerPartitionStateChangeListener(StateModelListenerType listenerType,
PartitionStateChangeListener partitionStateChangeListener);

/**
* Terminate the participant.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,27 @@
*/
public interface PartitionStateChangeListener {

/**
* Action to take when partition becomes bootstrap from offline.
* @param partitionName of the partition.
*/
void onPartitionBecomeBootstrapFromOffline(String partitionName);

/**
* Action to take when partition becomes bootstrap from offline.
* @param partitionName of the partition.
*/
void onPartitionBecomeStandbyFromBootstrap(String partitionName);

/**
* Action to take on becoming leader of a partition.
* @param partitionName of the partition.
*/
void onPartitionStateChangeToLeaderFromStandby(String partitionName);
void onPartitionBecomeLeaderFromStandby(String partitionName);

/**
* Action to take on being removed as leader of a partition.
* @param partitionName of the partition.
*/
void onPartitionStateChangeToStandbyFromLeader(String partitionName);
void onPartitionBecomeStandbyFromLeader(String partitionName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.clustermap;

/**
* The type of partition state model listener.
* The state model listeners implement {@link com.github.ambry.clustermap.PartitionStateChangeListener} in different
* components (i.e. StorageManager, ReplicationManager etc) and take actions when state transition occurs.
*/
public enum StateModelListenerType {
/**
* The partition state change listener owned by storage manager. It invokes some store operations when partition state
* transition occurs. For example, when new replica transits from OFFLINE to BOOTSTRAP, storage manager instantiates
* blob store associated with this replica and adds it into disk manager and compaction manager.
*/
StorageManagerListener,
/**
* The partition state change listener owned by replication manager. It performs some replica operations in response to
* partition state transition. For example, when new replica transits from BOOTSTRAP to STANDBY, replication manager
* keeps checking replication lag of this replica and ensures it catches up with its peer replicas.
*/
ReplicationManagerListener,
/**
* The partition state change listener owned by stats manager. It takes actions when new replica is added (OFFLINE ->
* BOOTSTRAP) or old replica is removed (INACTIVE -> OFFLINE)
*/
StatsManagerListener,
/**
* The partition state change listener owned by cloud-to-store replication manager. It takes actions when replica
* leadership hand-off occurs. For example, if any replica becomes LEADER from STANDBY, it is supposed to replicate
* data from VCR nodes. This is part of two-way replication between Ambry and cloud.
*/
CloudToStoreReplicationManagerListener
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.clustermap;

public class StateTransitionException extends RuntimeException {
private static final long serialVersionUID = 1L;
private final TransitionErrorCode error;

public StateTransitionException(String s, TransitionErrorCode error) {
super(s);
this.error = error;
}

public TransitionErrorCode getErrorCode() {
return error;
}

public enum TransitionErrorCode {
/**
* If replica is not present in Helix and not found on current node.
*/
ReplicaNotFound,
/**
* If failure occurs during store operation (i.e. store addition/removal in StoreManager).
*/
StoreOperationFailure,
/**
* If store is not started and unavailable for specific operations.
*/
StoreNotStarted
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ public class ClusterMapConfig {
@Default(DEFAULT_STATE_MODEL_DEF)
public final String clustermapStateModelDefinition;

/**
* Whether to enable state model listeners to take actions when state transition occurs.
*/
@Config("clustermap.enable.state.model.listener")
@Default("false")
public final boolean clustermapEnableStateModelListener;

public ClusterMapConfig(VerifiableProperties verifiableProperties) {
clusterMapFixedTimeoutDatanodeErrorThreshold =
verifiableProperties.getIntInRange("clustermap.fixedtimeout.datanode.error.threshold", 3, 1, 100);
Expand Down Expand Up @@ -235,5 +242,7 @@ public ClusterMapConfig(VerifiableProperties verifiableProperties) {
OLD_STATE_MODEL_DEF)) {
throw new IllegalArgumentException("Unsupported state model definition: " + clustermapStateModelDefinition);
}
clustermapEnableStateModelListener =
verifiableProperties.getBoolean("clustermap.enable.state.model.listener", false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.github.ambry.clustermap;

import com.github.ambry.config.ClusterMapConfig;
import java.util.Objects;
import org.apache.helix.NotificationContext;
import org.apache.helix.model.Message;
Expand All @@ -30,41 +31,48 @@ public class AmbryPartitionStateModel extends StateModel {
private final String resourceName;
private final String partitionName;
private final PartitionStateChangeListener partitionStateChangeListener;
private final ClusterMapConfig clusterMapConfig;

AmbryPartitionStateModel(String resourceName, String partitionName,
PartitionStateChangeListener partitionStateChangeListener) {
PartitionStateChangeListener partitionStateChangeListener, ClusterMapConfig clusterMapConfig) {
this.resourceName = resourceName;
this.partitionName = partitionName;
this.partitionStateChangeListener = Objects.requireNonNull(partitionStateChangeListener);
this.clusterMapConfig = Objects.requireNonNull(clusterMapConfig);
StateModelParser parser = new StateModelParser();
_currentState = parser.getInitialState(AmbryPartitionStateModel.class);
}

@Transition(to = "BOOTSTRAP", from = "OFFLINE")
public void onBecomeBootstrapFromOffline(Message message, NotificationContext context) {
// TODO to distinguish between regular start and dynamic replica addition, check 1. store dir, 2. if there is bootstrap log
logger.info("Partition {} in resource {} is becoming BOOTSTRAP from OFFLINE", message.getPartitionName(),
message.getResourceName());
if (clusterMapConfig.clustermapEnableStateModelListener) {
partitionStateChangeListener.onPartitionBecomeBootstrapFromOffline(message.getPartitionName());
}
}

@Transition(to = "STANDBY", from = "BOOTSTRAP")
public void onBecomeStandbyFromBootstrap(Message message, NotificationContext context) {
logger.info("Partition {} in resource {} is becoming STANDBY from BOOTSTRAP", message.getPartitionName(),
message.getResourceName());
if (clusterMapConfig.clustermapEnableStateModelListener) {
partitionStateChangeListener.onPartitionBecomeStandbyFromBootstrap(message.getPartitionName());
}
}

@Transition(to = "LEADER", from = "STANDBY")
public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
logger.info("Partition {} in resource {} is becoming LEADER from STANDBY", message.getPartitionName(),
message.getResourceName());
partitionStateChangeListener.onPartitionStateChangeToLeaderFromStandby(message.getPartitionName());
partitionStateChangeListener.onPartitionBecomeLeaderFromStandby(message.getPartitionName());
}

@Transition(to = "STANDBY", from = "LEADER")
public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
logger.info("Partition {} in resource {} is becoming STANDBY from LEADER", message.getPartitionName(),
message.getResourceName());
partitionStateChangeListener.onPartitionStateChangeToStandbyFromLeader(message.getPartitionName());
partitionStateChangeListener.onPartitionBecomeStandbyFromLeader(message.getPartitionName());
}

@Transition(to = "INACTIVE", from = "STANDBY")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.github.ambry.clustermap;

import com.github.ambry.config.ClusterMapConfig;
import org.apache.helix.model.LeaderStandbySMD;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
Expand All @@ -22,11 +23,11 @@
* A factory for creating {@link StateModel}.
*/
class AmbryStateModelFactory extends StateModelFactory<StateModel> {
private final String ambryStateModelDef;
private final ClusterMapConfig clustermapConfig;
private final PartitionStateChangeListener partitionStateChangeListener;

AmbryStateModelFactory(String stateModelDef, PartitionStateChangeListener partitionStateChangeListener) {
ambryStateModelDef = stateModelDef;
AmbryStateModelFactory(ClusterMapConfig clusterMapConfig, PartitionStateChangeListener partitionStateChangeListener) {
this.clustermapConfig = clusterMapConfig;
this.partitionStateChangeListener = partitionStateChangeListener;
}

Expand All @@ -39,15 +40,19 @@ class AmbryStateModelFactory extends StateModelFactory<StateModel> {
@Override
public StateModel createNewStateModel(String resourceName, String partitionName) {
StateModel stateModelToReturn;
switch (ambryStateModelDef) {
switch (clustermapConfig.clustermapStateModelDefinition) {
case AmbryStateModelDefinition.AMBRY_LEADER_STANDBY_MODEL:
stateModelToReturn = new AmbryPartitionStateModel(resourceName, partitionName, partitionStateChangeListener);
stateModelToReturn =
new AmbryPartitionStateModel(resourceName, partitionName, partitionStateChangeListener, clustermapConfig);
break;
case LeaderStandbySMD.name:
stateModelToReturn = new DefaultLeaderStandbyStateModel();
break;
default:
throw new IllegalArgumentException("Unsupported state model definition: " + ambryStateModelDef);
// Code won't get here because state model def is already validated in ClusterMapConfig. We keep exception here
// in case the validation logic is changed in ClusterMapConfig.
throw new IllegalArgumentException(
"Unsupported state model definition: " + clustermapConfig.clustermapStateModelDefinition);
}
return stateModelToReturn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import com.github.ambry.utils.Utils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -43,25 +43,26 @@
/**
* An implementation of {@link ClusterParticipant} that registers as a participant to a Helix cluster.
*/
class HelixParticipant implements ClusterParticipant, PartitionStateChangeListener {
private final Logger logger = LoggerFactory.getLogger(getClass());
public class HelixParticipant implements ClusterParticipant, PartitionStateChangeListener {
private final String clusterName;
private final String zkConnectStr;
private final HelixFactory helixFactory;
Copy link
Collaborator

@ankagrawal ankagrawal Dec 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MINOR: since we are already in this file, looks like helixfactory is not being used. We can remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, removed.

private final Object helixAdministrationLock = new Object();
private final ClusterMapConfig clusterMapConfig;
private final Map<StateModelListenerType, PartitionStateChangeListener> partitionStateChangeListeners;
private HelixManager manager;
private String instanceName;
private HelixAdmin helixAdmin;
private List<PartitionStateChangeListener> partitionStateChangeListeners;

private static final Logger logger = LoggerFactory.getLogger(HelixParticipant.class);

/**
* Instantiate a HelixParticipant.
* @param clusterMapConfig the {@link ClusterMapConfig} associated with this participant.
* @param helixFactory the {@link HelixFactory} to use to get the {@link HelixManager}.
* @throws IOException if there is an error in parsing the JSON serialized ZK connect string config.
*/
HelixParticipant(ClusterMapConfig clusterMapConfig, HelixFactory helixFactory) throws IOException {
public HelixParticipant(ClusterMapConfig clusterMapConfig, HelixFactory helixFactory) throws IOException {
this.clusterMapConfig = clusterMapConfig;
clusterName = clusterMapConfig.clusterMapClusterName;
instanceName =
Expand All @@ -81,7 +82,7 @@ class HelixParticipant implements ClusterParticipant, PartitionStateChangeListen
throw new IOException("Received JSON exception while parsing ZKInfo json string", e);
}
manager = helixFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkConnectStr);
partitionStateChangeListeners = new LinkedList<>();
partitionStateChangeListeners = new HashMap<>();
}

/**
Expand All @@ -96,7 +97,7 @@ public void participate(List<AmbryHealthReport> ambryHealthReports) throws IOExc
clusterMapConfig.clustermapStateModelDefinition);
StateMachineEngine stateMachineEngine = manager.getStateMachineEngine();
stateMachineEngine.registerStateModelFactory(clusterMapConfig.clustermapStateModelDefinition,
new AmbryStateModelFactory(clusterMapConfig.clustermapStateModelDefinition, this));
new AmbryStateModelFactory(clusterMapConfig, this));
registerHealthReportTasks(stateMachineEngine, ambryHealthReports);
try {
synchronized (helixAdministrationLock) {
Expand All @@ -117,8 +118,9 @@ public void participate(List<AmbryHealthReport> ambryHealthReports) throws IOExc
}

@Override
public void registerPartitionStateChangeListener(PartitionStateChangeListener partitionStateChangeListener) {
partitionStateChangeListeners.add(partitionStateChangeListener);
public void registerPartitionStateChangeListener(StateModelListenerType listenerType,
PartitionStateChangeListener partitionStateChangeListener) {
partitionStateChangeListeners.put(listenerType, partitionStateChangeListener);
}

@Override
Expand Down Expand Up @@ -212,6 +214,13 @@ public List<String> getStoppedReplicas() {
return ClusterMapUtils.getStoppedReplicas(instanceConfig);
}

/**
* @return a snapshot of registered state change listeners.
*/
public Map<StateModelListenerType, PartitionStateChangeListener> getPartitionStateChangeListeners() {
return Collections.unmodifiableMap(partitionStateChangeListeners);
}

/**
* Register {@link HelixHealthReportAggregatorTask}s for appropriate {@link AmbryHealthReport}s.
* @param engine the {@link StateMachineEngine} to register the task state model.
Expand Down Expand Up @@ -275,16 +284,38 @@ boolean setStoppedReplicas(List<String> stoppedReplicas) {
}

@Override
public void onPartitionStateChangeToLeaderFromStandby(String partitionName) {
for (PartitionStateChangeListener partitionStateChangeListener : partitionStateChangeListeners) {
partitionStateChangeListener.onPartitionStateChangeToLeaderFromStandby(partitionName);
public void onPartitionBecomeBootstrapFromOffline(String partitionName) {
PartitionStateChangeListener storageManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.StorageManagerListener);
if (storageManagerListener != null) {
storageManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName);
}
}

@Override
public void onPartitionBecomeStandbyFromBootstrap(String partitionName) {
PartitionStateChangeListener storageManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.StorageManagerListener);
if (storageManagerListener != null) {
storageManagerListener.onPartitionBecomeStandbyFromBootstrap(partitionName);
}
}

@Override
public void onPartitionBecomeLeaderFromStandby(String partitionName) {
PartitionStateChangeListener cloudToStoreReplicationListener =
partitionStateChangeListeners.get(StateModelListenerType.CloudToStoreReplicationManagerListener);
if (cloudToStoreReplicationListener != null) {
cloudToStoreReplicationListener.onPartitionBecomeLeaderFromStandby(partitionName);
}
}

@Override
public void onPartitionStateChangeToStandbyFromLeader(String partitionName) {
for (PartitionStateChangeListener partitionStateChangeListener : partitionStateChangeListeners) {
partitionStateChangeListener.onPartitionStateChangeToStandbyFromLeader(partitionName);
public void onPartitionBecomeStandbyFromLeader(String partitionName) {
Copy link
Collaborator

@ankagrawal ankagrawal Dec 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MINOR: 

maybe we can replace this by
if(partitionstateChangeListeners.containsKey(StateModelListenerType.CloudToStoreReplicationManagerListener) {
partitionStateChangeListeners.get(StateModelListenerType.CloudToStoreReplicationManagerListener).onPartitionBecomeStandbyFromLeader(partitionName)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate your suggestion but I feel like it loses a little bit of readability. What do you think?

PartitionStateChangeListener cloudToStoreReplicationListener =
partitionStateChangeListeners.get(StateModelListenerType.CloudToStoreReplicationManagerListener);
if (cloudToStoreReplicationListener != null) {
cloudToStoreReplicationListener.onPartitionBecomeStandbyFromLeader(partitionName);
}
}
}
Loading