Skip to content

Commit

Permalink
Implement Offline-To-Bootstrap transition in Replication/Stats Manage…
Browse files Browse the repository at this point in the history
…rs (#1327)

introduced ClusterParticipant into ReplicationManager and StatsManager to register state change listeners
implemented Offline-To-Bootstrap logic in Replication/Stats Manager.
(Note: this change is remaining part of PR #1326)
  • Loading branch information
jsjtzyy authored and zzmao committed Dec 6, 2019
1 parent c93821b commit ca1c9be
Show file tree
Hide file tree
Showing 19 changed files with 492 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface PartitionStateChangeListener {
void onPartitionBecomeBootstrapFromOffline(String partitionName);

/**
* Action to take when partition becomes bootstrap from offline.
* Action to take when partition becomes standby from bootstrap.
* @param partitionName of the partition.
*/
void onPartitionBecomeStandbyFromBootstrap(String partitionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public enum TransitionErrorCode {
*/
ReplicaNotFound,
/**
* If failure occurs during store operation (i.e. store addition/removal in StoreManager).
* If failure occurs during replica operation (i.e. replica addition/removal in StoreManager, ReplicationManager).
*/
StoreOperationFailure,
ReplicaOperationFailure,
/**
* If store is not started and unavailable for specific operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ public interface StoreManager {
*/
Store getStore(PartitionId id);

/**
* Get replicaId on current node by partition name. (There should be at most one replica belonging to specific
* partition on single node)
* @param partitionName name of {@link PartitionId}
* @return {@link ReplicaId} associated with given partition name. {@code null} if replica is not found in storage manager.
*/
ReplicaId getReplica(String partitionName);

/**
* Set BlobStore Stopped state with given {@link PartitionId} {@code id}.
* @param partitionIds a list {@link PartitionId} of the {@link Store} whose stopped state should be set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public List<PartitionId> setBlobStoreStoppedState(List<PartitionId> partitionIds
throw new UnsupportedOperationException("Method not supported");
}

@Override
public ReplicaId getReplica(String partitionName) {
throw new UnsupportedOperationException("Method not supported");
}

@Override
public boolean removeBlobStore(PartitionId id) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ public class HelixParticipant implements ClusterParticipant, PartitionStateChang
private final String zkConnectStr;
private final Object helixAdministrationLock = new Object();
private final ClusterMapConfig clusterMapConfig;
private final Map<StateModelListenerType, PartitionStateChangeListener> partitionStateChangeListeners;
private HelixManager manager;
private String instanceName;
private HelixAdmin helixAdmin;
final Map<StateModelListenerType, PartitionStateChangeListener> partitionStateChangeListeners;

private static final Logger logger = LoggerFactory.getLogger(HelixParticipant.class);

Expand Down Expand Up @@ -283,11 +283,24 @@ boolean setStoppedReplicas(List<String> stoppedReplicas) {

@Override
public void onPartitionBecomeBootstrapFromOffline(String partitionName) {
// 1. take actions in storage manager (add new replica if necessary)
PartitionStateChangeListener storageManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.StorageManagerListener);
if (storageManagerListener != null) {
storageManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName);
}
// 2. take actions in replication manager (add new replica if necessary)
PartitionStateChangeListener replicationManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.ReplicationManagerListener);
if (replicationManagerListener != null) {
replicationManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName);
}
// 3. take actions in stats manager (add new replica if necessary)
PartitionStateChangeListener statsManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.StatsManagerListener);
if (statsManagerListener != null) {
statsManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.clustermap;

import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.server.AmbryHealthReport;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;


public class MockHelixParticipant extends HelixParticipant {
Set<ReplicaId> sealedReplicas = new HashSet<>();
Set<ReplicaId> stoppedReplicas = new HashSet<>();

public MockHelixParticipant(ClusterMapConfig clusterMapConfig) throws IOException {
super(clusterMapConfig, new MockHelixManagerFactory());
}

@Override
public void participate(List<AmbryHealthReport> ambryHealthReports) throws IOException {
// no op
}

@Override
public boolean setReplicaSealedState(ReplicaId replicaId, boolean isSealed) {
if (isSealed) {
sealedReplicas.add(replicaId);
} else {
sealedReplicas.remove(replicaId);
}
return true;
}

@Override
public boolean setReplicaStoppedState(List<ReplicaId> replicaIds, boolean markStop) {
if (markStop) {
stoppedReplicas.addAll(replicaIds);
} else {
stoppedReplicas.removeAll(replicaIds);
}
return true;
}

@Override
public List<String> getSealedReplicas() {
return sealedReplicas.stream().map(r -> r.getPartitionId().toPathString()).collect(Collectors.toList());
}

@Override
public List<String> getStoppedReplicas() {
return stoppedReplicas.stream().map(r -> r.getPartitionId().toPathString()).collect(Collectors.toList());
}

@Override
public void close() {
// no op
}

/**
* @return a snapshot of current state change listeners.
*/
public Map<StateModelListenerType, PartitionStateChangeListener> getPartitionStateChangeListeners() {
return Collections.unmodifiableMap(partitionStateChangeListeners);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


public class MockReplicaId implements ReplicaId {

public static final long MOCK_REPLICA_CAPACITY = 100000000;
private String mountPath;
private String replicaPath;
private List<ReplicaId> peerReplicas;
Expand Down Expand Up @@ -91,7 +91,7 @@ public void setPeerReplicas(List<ReplicaId> peerReplicas) {

@Override
public long getCapacityInBytes() {
return 100000000;
return MOCK_REPLICA_CAPACITY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.ClusterParticipant;
import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.PartitionStateChangeListener;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.clustermap.StateModelListenerType;
import com.github.ambry.clustermap.StateTransitionException;
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.ReplicationConfig;
import com.github.ambry.config.StoreConfig;
Expand Down Expand Up @@ -48,7 +52,8 @@ public ReplicationManager(ReplicationConfig replicationConfig, ClusterMapConfig
StoreConfig storeConfig, StoreManager storeManager, StoreKeyFactory storeKeyFactory, ClusterMap clusterMap,
ScheduledExecutorService scheduler, DataNodeId dataNode, ConnectionPool connectionPool,
MetricRegistry metricRegistry, NotificationSystem requestNotification,
StoreKeyConverterFactory storeKeyConverterFactory, String transformerClassName) throws ReplicationException {
StoreKeyConverterFactory storeKeyConverterFactory, String transformerClassName,
ClusterParticipant clusterParticipant) throws ReplicationException {
super(replicationConfig, clusterMapConfig, storeKeyFactory, clusterMap, scheduler, dataNode,
clusterMap.getReplicaIds(dataNode), connectionPool, metricRegistry, requestNotification,
storeKeyConverterFactory, transformerClassName);
Expand All @@ -70,6 +75,12 @@ public ReplicationManager(ReplicationConfig replicationConfig, ClusterMapConfig
logger.error("Not replicating to partition " + partition + " because an initialized store could not be found");
}
}
// register replication manager's state change listener if clusterParticipant is not null
if (clusterParticipant != null) {
clusterParticipant.registerPartitionStateChangeListener(StateModelListenerType.ReplicationManagerListener,
new PartitionStateChangeListenerImpl());
logger.info("Replication manager's state change listener registered!");
}
persistor = new DiskTokenPersistor(replicaTokenFileName, mountPathToPartitionInfos, replicationMetrics, clusterMap,
tokenHelper, storeManager);
}
Expand Down Expand Up @@ -98,7 +109,6 @@ public void start() throws ReplicationException {
}
}

// start background persistent thread
// start scheduler thread to persist replica token in the background
if (persistor != null) {
this.scheduler.scheduleAtFixedRate(persistor, replicationConfig.replicationTokenFlushDelaySeconds,
Expand Down Expand Up @@ -201,4 +211,51 @@ private void updatePartitionInfoMaps(List<RemoteReplicaInfo> remoteReplicaInfos,
mountPathToPartitionInfos.computeIfAbsent(replicaId.getMountPath(), key -> ConcurrentHashMap.newKeySet())
.add(partitionInfo);
}

/**
* {@link PartitionStateChangeListener} to capture changes in partition state.
*/
private class PartitionStateChangeListenerImpl implements PartitionStateChangeListener {

@Override
public void onPartitionBecomeBootstrapFromOffline(String partitionName) {
// check if partition exists
ReplicaId replica = storeManager.getReplica(partitionName);
if (replica == null) {
// no matter this is an existing replica or new added one, it should be present in storage manager because new
// replica is added into storage manager first.
throw new StateTransitionException("Replica " + partitionName + " is not found on current node",
StateTransitionException.TransitionErrorCode.ReplicaNotFound);
}

if (!partitionToPartitionInfo.containsKey(replica.getPartitionId())) {
// if partition is not present in partitionToPartitionInfo map, it means this partition was just added in storage
// manager and next step is to add it into replication manager
logger.info("Didn't find replica {} in replication manager, starting to add it.", partitionName);
if (!addReplica(replica)) {
throw new StateTransitionException("Failed to add new replica " + partitionName + " into replication manager",
StateTransitionException.TransitionErrorCode.ReplicaOperationFailure);
}
}
}

@Override
public void onPartitionBecomeStandbyFromBootstrap(String partitionName) {
logger.info("Partition state change notification from Bootstrap to Standby received for partition {}",
partitionName);
// TODO implement replication catchup logic if this is a new replica
}

@Override
public void onPartitionBecomeLeaderFromStandby(String partitionName) {
logger.info("Partition state change notification from Standby to Leader received for partition {}",
partitionName);
}

@Override
public void onPartitionBecomeStandbyFromLeader(String partitionName) {
logger.info("Partition state change notification from Leader to Standby received for partition {}",
partitionName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package com.github.ambry.replication;

import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.ClusterParticipant;
import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.ReplicationConfig;
import com.github.ambry.config.StoreConfig;
Expand All @@ -39,6 +41,7 @@ public class MockReplicationManager extends ReplicationManager {
public RuntimeException exceptionToThrow = null;
// Variables for controlling and examining the values provided to controlReplicationForPartitions()
public Boolean controlReplicationReturnVal;
public Boolean addReplicaReturnVal = null;
public Collection<PartitionId> idsVal;
public List<String> originsVal;
public Boolean enableVal;
Expand All @@ -63,12 +66,13 @@ public static MockReplicationManager getReplicationManager(VerifiableProperties
ClusterMapConfig clusterMapConfig = new ClusterMapConfig(verifiableProperties);
StoreConfig storeConfig = new StoreConfig(verifiableProperties);
return new MockReplicationManager(replicationConfig, clusterMapConfig, storeConfig, storageManager, clusterMap,
dataNodeId, storeKeyConverterFactory);
dataNodeId, storeKeyConverterFactory, null);
}

MockReplicationManager(ReplicationConfig replicationConfig, ClusterMapConfig clusterMapConfig,
StoreConfig storeConfig, StorageManager storageManager, ClusterMap clusterMap, DataNodeId dataNodeId,
StoreKeyConverterFactory storeKeyConverterFactory) throws ReplicationException {
StoreKeyConverterFactory storeKeyConverterFactory, ClusterParticipant clusterParticipant)
throws ReplicationException {
super(replicationConfig, clusterMapConfig, storeConfig, storageManager, new StoreKeyFactory() {
@Override
public StoreKey getStoreKey(DataInputStream stream) {
Expand All @@ -80,7 +84,7 @@ public StoreKey getStoreKey(String input) {
return null;
}
}, clusterMap, null, dataNodeId, null, clusterMap.getMetricRegistry(), null, storeKeyConverterFactory,
BlobIdTransformer.class.getName());
BlobIdTransformer.class.getName(), clusterParticipant);
reset();
}

Expand Down Expand Up @@ -109,6 +113,11 @@ public long getRemoteReplicaLagFromLocalInBytes(PartitionId partitionId, String
return lag;
}

@Override
public boolean addReplica(ReplicaId replicaId) {
return addReplicaReturnVal == null ? super.addReplica(replicaId) : addReplicaReturnVal;
}

/**
* Resets all state
*/
Expand Down
Loading

0 comments on commit ca1c9be

Please sign in to comment.