From ca1c9be80ebe978e78b540a20e3869c48f2798a0 Mon Sep 17 00:00:00 2001 From: Yingyi Zhang Date: Thu, 5 Dec 2019 16:22:45 -0800 Subject: [PATCH] Implement Offline-To-Bootstrap transition in Replication/Stats Managers (#1327) introduced ClusterParticipant into ReplicationManager and StatsManager to register state change listeners implemented Offline-To-Bootstrap logic in Replication/Stats Manager. (Note: this change is remaining part of PR #1326) --- .../PartitionStateChangeListener.java | 2 +- .../clustermap/StateTransitionException.java | 4 +- .../com.github.ambry/server/StoreManager.java | 8 + .../CloudStorageManager.java | 5 + .../HelixParticipant.java | 15 +- .../MockHelixParticipant.java | 81 +++++++++++ .../MockReplicaId.java | 4 +- .../ReplicationManager.java | 61 +++++++- .../MockReplicationManager.java | 15 +- .../ReplicationTest.java | 137 +++++++++++++++++- .../com.github.ambry.server/AmbryServer.java | 5 +- .../com.github.ambry.server/StatsManager.java | 59 +++++++- .../AmbryServerRequestsTest.java | 2 +- .../AmbryStatsReportTest.java | 2 +- .../MockStatsManager.java | 9 +- .../MockStorageManager.java | 6 + .../StatsManagerTest.java | 105 ++++++++++++-- .../StorageManager.java | 7 +- .../StorageManagerTest.java | 2 +- 19 files changed, 492 insertions(+), 37 deletions(-) create mode 100644 ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockHelixParticipant.java diff --git a/ambry-api/src/main/java/com.github.ambry/clustermap/PartitionStateChangeListener.java b/ambry-api/src/main/java/com.github.ambry/clustermap/PartitionStateChangeListener.java index b9c262abbf..58edb82b8c 100644 --- a/ambry-api/src/main/java/com.github.ambry/clustermap/PartitionStateChangeListener.java +++ b/ambry-api/src/main/java/com.github.ambry/clustermap/PartitionStateChangeListener.java @@ -25,7 +25,7 @@ public interface PartitionStateChangeListener { void onPartitionBecomeBootstrapFromOffline(String partitionName); /** - * Action to take when partition becomes bootstrap from offline. + * Action to take when partition becomes standby from bootstrap. * @param partitionName of the partition. */ void onPartitionBecomeStandbyFromBootstrap(String partitionName); diff --git a/ambry-api/src/main/java/com.github.ambry/clustermap/StateTransitionException.java b/ambry-api/src/main/java/com.github.ambry/clustermap/StateTransitionException.java index fc20dc0415..f6fae1bb47 100644 --- a/ambry-api/src/main/java/com.github.ambry/clustermap/StateTransitionException.java +++ b/ambry-api/src/main/java/com.github.ambry/clustermap/StateTransitionException.java @@ -32,9 +32,9 @@ public enum TransitionErrorCode { */ ReplicaNotFound, /** - * If failure occurs during store operation (i.e. store addition/removal in StoreManager). + * If failure occurs during replica operation (i.e. replica addition/removal in StoreManager, ReplicationManager). */ - StoreOperationFailure, + ReplicaOperationFailure, /** * If store is not started and unavailable for specific operations. */ diff --git a/ambry-api/src/main/java/com.github.ambry/server/StoreManager.java b/ambry-api/src/main/java/com.github.ambry/server/StoreManager.java index 5826deed1f..c7ee42ac11 100644 --- a/ambry-api/src/main/java/com.github.ambry/server/StoreManager.java +++ b/ambry-api/src/main/java/com.github.ambry/server/StoreManager.java @@ -59,6 +59,14 @@ public interface StoreManager { */ Store getStore(PartitionId id); + /** + * Get replicaId on current node by partition name. (There should be at most one replica belonging to specific + * partition on single node) + * @param partitionName name of {@link PartitionId} + * @return {@link ReplicaId} associated with given partition name. {@code null} if replica is not found in storage manager. + */ + ReplicaId getReplica(String partitionName); + /** * Set BlobStore Stopped state with given {@link PartitionId} {@code id}. * @param partitionIds a list {@link PartitionId} of the {@link Store} whose stopped state should be set. diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudStorageManager.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudStorageManager.java index 9b2ac3a594..4ddfe83681 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudStorageManager.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudStorageManager.java @@ -112,6 +112,11 @@ public List setBlobStoreStoppedState(List partitionIds throw new UnsupportedOperationException("Method not supported"); } + @Override + public ReplicaId getReplica(String partitionName) { + throw new UnsupportedOperationException("Method not supported"); + } + @Override public boolean removeBlobStore(PartitionId id) { try { diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixParticipant.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixParticipant.java index 9a7ce19add..630397a7c9 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixParticipant.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixParticipant.java @@ -48,10 +48,10 @@ public class HelixParticipant implements ClusterParticipant, PartitionStateChang private final String zkConnectStr; private final Object helixAdministrationLock = new Object(); private final ClusterMapConfig clusterMapConfig; - private final Map partitionStateChangeListeners; private HelixManager manager; private String instanceName; private HelixAdmin helixAdmin; + final Map partitionStateChangeListeners; private static final Logger logger = LoggerFactory.getLogger(HelixParticipant.class); @@ -283,11 +283,24 @@ boolean setStoppedReplicas(List stoppedReplicas) { @Override public void onPartitionBecomeBootstrapFromOffline(String partitionName) { + // 1. take actions in storage manager (add new replica if necessary) PartitionStateChangeListener storageManagerListener = partitionStateChangeListeners.get(StateModelListenerType.StorageManagerListener); if (storageManagerListener != null) { storageManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); } + // 2. take actions in replication manager (add new replica if necessary) + PartitionStateChangeListener replicationManagerListener = + partitionStateChangeListeners.get(StateModelListenerType.ReplicationManagerListener); + if (replicationManagerListener != null) { + replicationManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); + } + // 3. take actions in stats manager (add new replica if necessary) + PartitionStateChangeListener statsManagerListener = + partitionStateChangeListeners.get(StateModelListenerType.StatsManagerListener); + if (statsManagerListener != null) { + statsManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); + } } @Override diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockHelixParticipant.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockHelixParticipant.java new file mode 100644 index 0000000000..69a1017fef --- /dev/null +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockHelixParticipant.java @@ -0,0 +1,81 @@ +/* + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.clustermap; + +import com.github.ambry.config.ClusterMapConfig; +import com.github.ambry.server.AmbryHealthReport; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + + +public class MockHelixParticipant extends HelixParticipant { + Set sealedReplicas = new HashSet<>(); + Set stoppedReplicas = new HashSet<>(); + + public MockHelixParticipant(ClusterMapConfig clusterMapConfig) throws IOException { + super(clusterMapConfig, new MockHelixManagerFactory()); + } + + @Override + public void participate(List ambryHealthReports) throws IOException { + // no op + } + + @Override + public boolean setReplicaSealedState(ReplicaId replicaId, boolean isSealed) { + if (isSealed) { + sealedReplicas.add(replicaId); + } else { + sealedReplicas.remove(replicaId); + } + return true; + } + + @Override + public boolean setReplicaStoppedState(List replicaIds, boolean markStop) { + if (markStop) { + stoppedReplicas.addAll(replicaIds); + } else { + stoppedReplicas.removeAll(replicaIds); + } + return true; + } + + @Override + public List getSealedReplicas() { + return sealedReplicas.stream().map(r -> r.getPartitionId().toPathString()).collect(Collectors.toList()); + } + + @Override + public List getStoppedReplicas() { + return stoppedReplicas.stream().map(r -> r.getPartitionId().toPathString()).collect(Collectors.toList()); + } + + @Override + public void close() { + // no op + } + + /** + * @return a snapshot of current state change listeners. + */ + public Map getPartitionStateChangeListeners() { + return Collections.unmodifiableMap(partitionStateChangeListeners); + } +} diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockReplicaId.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockReplicaId.java index a60061b719..d899187d65 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockReplicaId.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockReplicaId.java @@ -22,7 +22,7 @@ public class MockReplicaId implements ReplicaId { - + public static final long MOCK_REPLICA_CAPACITY = 100000000; private String mountPath; private String replicaPath; private List peerReplicas; @@ -91,7 +91,7 @@ public void setPeerReplicas(List peerReplicas) { @Override public long getCapacityInBytes() { - return 100000000; + return MOCK_REPLICA_CAPACITY; } @Override diff --git a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationManager.java b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationManager.java index b70f82445f..f346756db6 100644 --- a/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationManager.java +++ b/ambry-replication/src/main/java/com.github.ambry.replication/ReplicationManager.java @@ -15,9 +15,13 @@ import com.codahale.metrics.MetricRegistry; import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.ClusterParticipant; import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.clustermap.PartitionStateChangeListener; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.clustermap.StateModelListenerType; +import com.github.ambry.clustermap.StateTransitionException; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.ReplicationConfig; import com.github.ambry.config.StoreConfig; @@ -48,7 +52,8 @@ public ReplicationManager(ReplicationConfig replicationConfig, ClusterMapConfig StoreConfig storeConfig, StoreManager storeManager, StoreKeyFactory storeKeyFactory, ClusterMap clusterMap, ScheduledExecutorService scheduler, DataNodeId dataNode, ConnectionPool connectionPool, MetricRegistry metricRegistry, NotificationSystem requestNotification, - StoreKeyConverterFactory storeKeyConverterFactory, String transformerClassName) throws ReplicationException { + StoreKeyConverterFactory storeKeyConverterFactory, String transformerClassName, + ClusterParticipant clusterParticipant) throws ReplicationException { super(replicationConfig, clusterMapConfig, storeKeyFactory, clusterMap, scheduler, dataNode, clusterMap.getReplicaIds(dataNode), connectionPool, metricRegistry, requestNotification, storeKeyConverterFactory, transformerClassName); @@ -70,6 +75,12 @@ public ReplicationManager(ReplicationConfig replicationConfig, ClusterMapConfig logger.error("Not replicating to partition " + partition + " because an initialized store could not be found"); } } + // register replication manager's state change listener if clusterParticipant is not null + if (clusterParticipant != null) { + clusterParticipant.registerPartitionStateChangeListener(StateModelListenerType.ReplicationManagerListener, + new PartitionStateChangeListenerImpl()); + logger.info("Replication manager's state change listener registered!"); + } persistor = new DiskTokenPersistor(replicaTokenFileName, mountPathToPartitionInfos, replicationMetrics, clusterMap, tokenHelper, storeManager); } @@ -98,7 +109,6 @@ public void start() throws ReplicationException { } } - // start background persistent thread // start scheduler thread to persist replica token in the background if (persistor != null) { this.scheduler.scheduleAtFixedRate(persistor, replicationConfig.replicationTokenFlushDelaySeconds, @@ -201,4 +211,51 @@ private void updatePartitionInfoMaps(List remoteReplicaInfos, mountPathToPartitionInfos.computeIfAbsent(replicaId.getMountPath(), key -> ConcurrentHashMap.newKeySet()) .add(partitionInfo); } + + /** + * {@link PartitionStateChangeListener} to capture changes in partition state. + */ + private class PartitionStateChangeListenerImpl implements PartitionStateChangeListener { + + @Override + public void onPartitionBecomeBootstrapFromOffline(String partitionName) { + // check if partition exists + ReplicaId replica = storeManager.getReplica(partitionName); + if (replica == null) { + // no matter this is an existing replica or new added one, it should be present in storage manager because new + // replica is added into storage manager first. + throw new StateTransitionException("Replica " + partitionName + " is not found on current node", + StateTransitionException.TransitionErrorCode.ReplicaNotFound); + } + + if (!partitionToPartitionInfo.containsKey(replica.getPartitionId())) { + // if partition is not present in partitionToPartitionInfo map, it means this partition was just added in storage + // manager and next step is to add it into replication manager + logger.info("Didn't find replica {} in replication manager, starting to add it.", partitionName); + if (!addReplica(replica)) { + throw new StateTransitionException("Failed to add new replica " + partitionName + " into replication manager", + StateTransitionException.TransitionErrorCode.ReplicaOperationFailure); + } + } + } + + @Override + public void onPartitionBecomeStandbyFromBootstrap(String partitionName) { + logger.info("Partition state change notification from Bootstrap to Standby received for partition {}", + partitionName); + // TODO implement replication catchup logic if this is a new replica + } + + @Override + public void onPartitionBecomeLeaderFromStandby(String partitionName) { + logger.info("Partition state change notification from Standby to Leader received for partition {}", + partitionName); + } + + @Override + public void onPartitionBecomeStandbyFromLeader(String partitionName) { + logger.info("Partition state change notification from Leader to Standby received for partition {}", + partitionName); + } + } } diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/MockReplicationManager.java b/ambry-replication/src/test/java/com.github.ambry.replication/MockReplicationManager.java index e215e6c841..56c1d2ae9e 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/MockReplicationManager.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/MockReplicationManager.java @@ -14,8 +14,10 @@ package com.github.ambry.replication; import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.ClusterParticipant; import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.ReplicationConfig; import com.github.ambry.config.StoreConfig; @@ -39,6 +41,7 @@ public class MockReplicationManager extends ReplicationManager { public RuntimeException exceptionToThrow = null; // Variables for controlling and examining the values provided to controlReplicationForPartitions() public Boolean controlReplicationReturnVal; + public Boolean addReplicaReturnVal = null; public Collection idsVal; public List originsVal; public Boolean enableVal; @@ -63,12 +66,13 @@ public static MockReplicationManager getReplicationManager(VerifiableProperties ClusterMapConfig clusterMapConfig = new ClusterMapConfig(verifiableProperties); StoreConfig storeConfig = new StoreConfig(verifiableProperties); return new MockReplicationManager(replicationConfig, clusterMapConfig, storeConfig, storageManager, clusterMap, - dataNodeId, storeKeyConverterFactory); + dataNodeId, storeKeyConverterFactory, null); } MockReplicationManager(ReplicationConfig replicationConfig, ClusterMapConfig clusterMapConfig, StoreConfig storeConfig, StorageManager storageManager, ClusterMap clusterMap, DataNodeId dataNodeId, - StoreKeyConverterFactory storeKeyConverterFactory) throws ReplicationException { + StoreKeyConverterFactory storeKeyConverterFactory, ClusterParticipant clusterParticipant) + throws ReplicationException { super(replicationConfig, clusterMapConfig, storeConfig, storageManager, new StoreKeyFactory() { @Override public StoreKey getStoreKey(DataInputStream stream) { @@ -80,7 +84,7 @@ public StoreKey getStoreKey(String input) { return null; } }, clusterMap, null, dataNodeId, null, clusterMap.getMetricRegistry(), null, storeKeyConverterFactory, - BlobIdTransformer.class.getName()); + BlobIdTransformer.class.getName(), clusterParticipant); reset(); } @@ -109,6 +113,11 @@ public long getRemoteReplicaLagFromLocalInBytes(PartitionId partitionId, String return lag; } + @Override + public boolean addReplica(ReplicaId replicaId) { + return addReplicaReturnVal == null ? super.addReplica(replicaId) : addReplicaReturnVal; + } + /** * Resets all state */ diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java index 35d7900fb7..6e42714511 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java @@ -18,9 +18,12 @@ import com.github.ambry.clustermap.ClusterMapUtils; import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.clustermap.MockClusterMap; +import com.github.ambry.clustermap.MockHelixParticipant; import com.github.ambry.clustermap.MockReplicaId; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.clustermap.StateModelListenerType; +import com.github.ambry.clustermap.StateTransitionException; import com.github.ambry.commons.BlobId; import com.github.ambry.commons.BlobIdFactory; import com.github.ambry.commons.CommonTestUtils; @@ -79,12 +82,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import org.json.JSONObject; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.mockito.Mockito; +import static com.github.ambry.clustermap.TestUtils.*; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -123,7 +128,10 @@ public static List data() { /** * Constructor to set the configs */ - public ReplicationTest(short requestVersion, short responseVersion) { + public ReplicationTest(short requestVersion, short responseVersion) throws Exception { + List zkInfoList = new ArrayList<>(); + zkInfoList.add(new com.github.ambry.utils.TestUtils.ZkInfo(null, "DC1", (byte) 0, 2199, false)); + JSONObject zkJson = constructZkLayoutJSON(zkInfoList); Properties properties = new Properties(); properties.setProperty("replication.metadata.request.version", Short.toString(requestVersion)); properties.setProperty("replication.metadataresponse.version", Short.toString(responseVersion)); @@ -131,10 +139,12 @@ public ReplicationTest(short requestVersion, short responseVersion) { properties.setProperty("clustermap.cluster.name", "test"); properties.setProperty("clustermap.datacenter.name", "DC1"); properties.setProperty("clustermap.host.name", "localhost"); + properties.setProperty("clustermap.dcs.zk.connect.strings", zkJson.toString(2)); properties.setProperty("replication.synced.replica.backoff.duration.ms", "3000"); properties.setProperty("replication.intra.replica.thread.throttle.sleep.duration.ms", "100"); properties.setProperty("replication.inter.replica.thread.throttle.sleep.duration.ms", "200"); properties.setProperty("replication.replica.thread.idle.sleep.duration.ms", "1000"); + properties.put("store.segment.size.in.bytes", Long.toString(MockReplicaId.MOCK_REPLICA_CAPACITY / 2L)); verifiableProperties = new VerifiableProperties(properties); replicationConfig = new ReplicationConfig(verifiableProperties); } @@ -205,7 +215,7 @@ public void addAndRemoveReplicaTest() throws Exception { storageManager.start(); MockReplicationManager replicationManager = new MockReplicationManager(replicationConfig, clusterMapConfig, storeConfig, storageManager, clusterMap, - dataNodeId, storeKeyConverterFactory); + dataNodeId, storeKeyConverterFactory, null); ReplicaId replicaToTest = clusterMap.getReplicaIds(dataNodeId).get(0); // Attempting to add replica that already exists should fail assertFalse("Adding an existing replica should fail", replicationManager.addReplica(replicaToTest)); @@ -265,6 +275,101 @@ public void addAndRemoveReplicaTest() throws Exception { storageManager.shutdown(); } + /** + * Test that state transition in replication manager from OFFLINE to BOOTSTRAP + * @throws Exception + */ + @Test + public void replicaFromOfflineToBootstrapTest() throws Exception { + MockClusterMap clusterMap = new MockClusterMap(); + ClusterMapConfig clusterMapConfig = new ClusterMapConfig(verifiableProperties); + MockHelixParticipant mockHelixParticipant = new MockHelixParticipant(clusterMapConfig); + DataNodeId currentNode = clusterMap.getDataNodeIds().get(0); + Pair managers = + createStorageManagerAndReplicationManager(clusterMap, clusterMapConfig, mockHelixParticipant); + StorageManager storageManager = managers.getFirst(); + MockReplicationManager replicationManager = (MockReplicationManager) managers.getSecond(); + assertTrue("State change listener in cluster participant should contain replication manager listener", + mockHelixParticipant.getPartitionStateChangeListeners() + .containsKey(StateModelListenerType.ReplicationManagerListener)); + // 1. test partition not found case (should throw exception) + try { + mockHelixParticipant.onPartitionBecomeBootstrapFromOffline("invalidPartition"); + fail("should fail because replica is not found"); + } catch (StateTransitionException e) { + assertEquals("Transition error doesn't match", StateTransitionException.TransitionErrorCode.ReplicaNotFound, + e.getErrorCode()); + } + // 2. create a new partition and test replica addition success case + PartitionId newPartition = clusterMap.createNewPartition(clusterMap.getDataNodes()); + ReplicaId replicaToAdd = newPartition.getReplicaIds() + .stream() + .filter(r -> ((ReplicaId) r).getDataNodeId() == currentNode) + .findFirst() + .get(); + assertTrue("Adding new replica to Storage Manager should succeed", storageManager.addBlobStore(replicaToAdd)); + assertFalse("partitionToPartitionInfo should not contain new partition", + replicationManager.partitionToPartitionInfo.containsKey(newPartition)); + mockHelixParticipant.onPartitionBecomeBootstrapFromOffline(replicaToAdd.getPartitionId().toPathString()); + assertTrue("partitionToPartitionInfo should contain new partition", + replicationManager.partitionToPartitionInfo.containsKey(newPartition)); + // 3. test replica addition failure case + replicationManager.partitionToPartitionInfo.remove(newPartition); + replicationManager.addReplicaReturnVal = false; + try { + mockHelixParticipant.onPartitionBecomeBootstrapFromOffline(replicaToAdd.getPartitionId().toPathString()); + fail("should fail due to replica addition failure"); + } catch (StateTransitionException e) { + assertEquals("Transition error doesn't match", + StateTransitionException.TransitionErrorCode.ReplicaOperationFailure, e.getErrorCode()); + } + replicationManager.addReplicaReturnVal = null; + // 4. test OFFLINE -> BOOTSTRAP on existing replica (should be no-op) + ReplicaId existingReplica = clusterMap.getReplicaIds(currentNode).get(0); + assertTrue("partitionToPartitionInfo should contain existing partition", + replicationManager.partitionToPartitionInfo.containsKey(existingReplica.getPartitionId())); + mockHelixParticipant.onPartitionBecomeBootstrapFromOffline(existingReplica.getPartitionId().toPathString()); + storageManager.shutdown(); + } + + /** + * Test state transition in replication manager from STANDBY to LEADER (right now it is no-op in prod code, but we + * keep test here for future use) + * @throws Exception + */ + @Test + public void replicaFromStandbyToLeaderTest() throws Exception { + MockClusterMap clusterMap = new MockClusterMap(); + ClusterMapConfig clusterMapConfig = new ClusterMapConfig(verifiableProperties); + MockHelixParticipant mockHelixParticipant = new MockHelixParticipant(clusterMapConfig); + Pair managers = + createStorageManagerAndReplicationManager(clusterMap, clusterMapConfig, mockHelixParticipant); + StorageManager storageManager = managers.getFirst(); + MockReplicationManager replicationManager = (MockReplicationManager) managers.getSecond(); + PartitionId existingPartition = replicationManager.partitionToPartitionInfo.keySet().iterator().next(); + mockHelixParticipant.onPartitionBecomeLeaderFromStandby(existingPartition.toPathString()); + storageManager.shutdown(); + } + + /** + * Test state transition in replication manager from LEADER to STANDBY (right now it is no-op in prod code, but we + * keep test here for future use) + * @throws Exception + */ + @Test + public void replicaFromLeaderToStandbyTest() throws Exception { + MockClusterMap clusterMap = new MockClusterMap(); + ClusterMapConfig clusterMapConfig = new ClusterMapConfig(verifiableProperties); + MockHelixParticipant mockHelixParticipant = new MockHelixParticipant(clusterMapConfig); + Pair managers = + createStorageManagerAndReplicationManager(clusterMap, clusterMapConfig, mockHelixParticipant); + StorageManager storageManager = managers.getFirst(); + MockReplicationManager replicationManager = (MockReplicationManager) managers.getSecond(); + PartitionId existingPartition = replicationManager.partitionToPartitionInfo.keySet().iterator().next(); + mockHelixParticipant.onPartitionBecomeStandbyFromLeader(existingPartition.toPathString()); + storageManager.shutdown(); + } + /** * Tests pausing all partitions and makes sure that the replica thread pauses. Also tests that it resumes when one * eligible partition is reenabled and that replication completes successfully. @@ -1429,6 +1534,32 @@ public void replicaTokenTest() throws InterruptedException { remoteReplicaInfo.onTokenPersisted(); } + // helpers + + /** + * Helper method to create storage manager and replication manager + * @param clusterMap {@link ClusterMap} to use + * @param clusterMapConfig {@link ClusterMapConfig} to use + * @param clusterParticipant {@link com.github.ambry.clustermap.ClusterParticipant} for listener registration. + * @return a pair of storage manager and replication manager + * @throws Exception + */ + private Pair createStorageManagerAndReplicationManager(ClusterMap clusterMap, + ClusterMapConfig clusterMapConfig, MockHelixParticipant clusterParticipant) throws Exception { + StoreConfig storeConfig = new StoreConfig(verifiableProperties); + DataNodeId dataNodeId = clusterMap.getDataNodeIds().get(0); + MockStoreKeyConverterFactory storeKeyConverterFactory = new MockStoreKeyConverterFactory(null, null); + storeKeyConverterFactory.setConversionMap(new HashMap<>()); + StorageManager storageManager = + new StorageManager(storeConfig, new DiskManagerConfig(verifiableProperties), Utils.newScheduler(1, true), + new MetricRegistry(), null, clusterMap, dataNodeId, null, null, new MockTime(), null); + storageManager.start(); + MockReplicationManager replicationManager = + new MockReplicationManager(replicationConfig, clusterMapConfig, storeConfig, storageManager, clusterMap, + dataNodeId, storeKeyConverterFactory, clusterParticipant); + return new Pair<>(storageManager, replicationManager); + } + /** * Creates and gets the remote replicas that the local host will deal with and the {@link ReplicaThread} to perform * replication with. @@ -2043,7 +2174,7 @@ private class ReplicationTestSetup { * A class holds the results generated by {@link ReplicationTest#createPutMessage(StoreKey, short, short, boolean)}. */ public static class PutMsgInfoAndBuffer { - public ByteBuffer byteBuffer; + ByteBuffer byteBuffer; MessageInfo messageInfo; PutMsgInfoAndBuffer(ByteBuffer bytebuffer, MessageInfo messageInfo) { diff --git a/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java b/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java index 73e519796d..309f2c9291 100644 --- a/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java +++ b/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java @@ -165,7 +165,7 @@ public void startup() throws InstantiationException { replicationManager = new ReplicationManager(replicationConfig, clusterMapConfig, storeConfig, storageManager, storeKeyFactory, clusterMap, scheduler, nodeId, connectionPool, registry, notificationSystem, storeKeyConverterFactory, - serverConfig.serverMessageTransformer); + serverConfig.serverMessageTransformer, clusterParticipant); replicationManager.start(); if (replicationConfig.replicationEnabledWithVcrCluster) { @@ -180,7 +180,8 @@ public void startup() throws InstantiationException { } logger.info("Creating StatsManager to publish stats"); - statsManager = new StatsManager(storageManager, clusterMap.getReplicaIds(nodeId), registry, statsConfig, time); + statsManager = new StatsManager(storageManager, clusterMap.getReplicaIds(nodeId), registry, statsConfig, time, + clusterParticipant); if (serverConfig.serverStatsPublishLocalEnabled) { statsManager.start(); } diff --git a/ambry-server/src/main/java/com.github.ambry.server/StatsManager.java b/ambry-server/src/main/java/com.github.ambry.server/StatsManager.java index b724557fbc..89d86cf104 100644 --- a/ambry-server/src/main/java/com.github.ambry.server/StatsManager.java +++ b/ambry-server/src/main/java/com.github.ambry.server/StatsManager.java @@ -15,8 +15,12 @@ package com.github.ambry.server; import com.codahale.metrics.MetricRegistry; +import com.github.ambry.clustermap.ClusterParticipant; import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.clustermap.PartitionStateChangeListener; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.clustermap.StateModelListenerType; +import com.github.ambry.clustermap.StateTransitionException; import com.github.ambry.config.StatsManagerConfig; import com.github.ambry.store.StorageManager; import com.github.ambry.store.Store; @@ -57,12 +61,12 @@ class StatsManager { private final File statsOutputFile; private final long publishPeriodInSecs; private final int initialDelayInSecs; - private final ConcurrentMap partitionToReplicaMap; private final StatsManagerMetrics metrics; private final Time time; private final ObjectMapper mapper = new ObjectMapper(); private ScheduledExecutorService scheduler = null; private StatsAggregator statsAggregator = null; + final ConcurrentMap partitionToReplicaMap; /** * Constructs a {@link StatsManager}. @@ -71,9 +75,10 @@ class StatsManager { * @param registry the {@link MetricRegistry} to be used for {@link StatsManagerMetrics} * @param config the {@link StatsManagerConfig} to be used to configure the output file path and publish period * @param time the {@link Time} instance to be used for reporting + * @param clusterParticipant the {@link ClusterParticipant} to register state change listener. */ StatsManager(StorageManager storageManager, List replicaIds, MetricRegistry registry, - StatsManagerConfig config, Time time) { + StatsManagerConfig config, Time time, ClusterParticipant clusterParticipant) { this.storageManager = storageManager; statsOutputFile = new File(config.outputFilePath); publishPeriodInSecs = config.publishPeriodInSecs; @@ -82,6 +87,11 @@ class StatsManager { partitionToReplicaMap = replicaIds.stream().collect(Collectors.toConcurrentMap(ReplicaId::getPartitionId, Function.identity())); this.time = time; + if (clusterParticipant != null) { + clusterParticipant.registerPartitionStateChangeListener(StateModelListenerType.StatsManagerListener, + new PartitionStateChangeListenerImpl()); + logger.info("Stats Manager's state change listener registered!"); + } } /** @@ -356,4 +366,49 @@ List examineUnreachablePartitions(List unreachablePartition } return unreachableStores; } + + /** + * {@link PartitionStateChangeListener} to capture changes in partition state. + */ + private class PartitionStateChangeListenerImpl implements PartitionStateChangeListener { + + @Override + public void onPartitionBecomeBootstrapFromOffline(String partitionName) { + // check if partition exists + ReplicaId replica = storageManager.getReplica(partitionName); + if (replica == null) { + // no matter this is an existing replica or new added one, it should be present in storage manager because new + // replica is added into storage manager first. + throw new StateTransitionException("Replica " + partitionName + " is not found on current node", + StateTransitionException.TransitionErrorCode.ReplicaNotFound); + } + if (!partitionToReplicaMap.containsKey(replica.getPartitionId())) { + // if replica is not present in partitionToReplicaMap, it means this new replica was just added into storage + // manager. Here we add it into stats manager accordingly. + logger.info("Didn't find replica {} in stats manager, starting to add it.", partitionName); + if (!addReplica(replica)) { + throw new StateTransitionException("Failed to add new replica into stats manager", + StateTransitionException.TransitionErrorCode.ReplicaOperationFailure); + } + } + } + + @Override + public void onPartitionBecomeStandbyFromBootstrap(String partitionName) { + logger.info("Partition state change notification from Bootstrap to Standby received for partition {}", + partitionName); + } + + @Override + public void onPartitionBecomeLeaderFromStandby(String partitionName) { + logger.info("Partition state change notification from Standby to Leader received for partition {}", + partitionName); + } + + @Override + public void onPartitionBecomeStandbyFromLeader(String partitionName) { + logger.info("Partition state change notification from Leader to Standby received for partition {}", + partitionName); + } + } } diff --git a/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java b/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java index 124c9570ee..6bbec08918 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java +++ b/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java @@ -172,7 +172,7 @@ public AmbryServerRequestsTest(boolean putRequestShareMemory, boolean validateRe storeKeyConverterFactory); statsManager = new MockStatsManager(storageManager, clusterMap.getReplicaIds(dataNodeId), clusterMap.getMetricRegistry(), - statsManagerConfig); + statsManagerConfig, null); ServerMetrics serverMetrics = new ServerMetrics(clusterMap.getMetricRegistry(), AmbryRequests.class, AmbryServer.class); ambryRequests = new AmbryServerRequests(storageManager, requestResponseChannel, clusterMap, dataNodeId, diff --git a/ambry-server/src/test/java/com.github.ambry.server/AmbryStatsReportTest.java b/ambry-server/src/test/java/com.github.ambry.server/AmbryStatsReportTest.java index 1a0c78c794..96f67a86b8 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/AmbryStatsReportTest.java +++ b/ambry-server/src/test/java/com.github.ambry.server/AmbryStatsReportTest.java @@ -42,7 +42,7 @@ public void testAmbryStatsReport() throws Exception { StatsManager testStatsManager = new StatsManager(new MockStorageManager(Collections.emptyMap(), new MockDataNodeId(Collections.singletonList(new Port(6667, PortType.PLAINTEXT)), Collections.singletonList("/tmp"), "DC1")), Collections.emptyList(), new MetricRegistry(), config, - new MockTime()); + new MockTime(), null); // test account stats report AmbryStatsReport ambryStatsReport = new AmbryStatsReport(testStatsManager, AGGREGATE_INTERVAL_MINS, StatsReportType.ACCOUNT_REPORT); diff --git a/ambry-server/src/test/java/com.github.ambry.server/MockStatsManager.java b/ambry-server/src/test/java/com.github.ambry.server/MockStatsManager.java index f939f8579a..dd2be3e859 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/MockStatsManager.java +++ b/ambry-server/src/test/java/com.github.ambry.server/MockStatsManager.java @@ -14,6 +14,7 @@ package com.github.ambry.server; import com.codahale.metrics.MetricRegistry; +import com.github.ambry.clustermap.ClusterParticipant; import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.config.StatsManagerConfig; import com.github.ambry.store.StorageManager; @@ -25,15 +26,15 @@ * An extension of {@link StatsManager} to help with tests. */ class MockStatsManager extends StatsManager { - boolean returnValOfAddReplica = true; + Boolean returnValOfAddReplica = null; MockStatsManager(StorageManager storageManager, List replicaIds, MetricRegistry metricRegistry, - StatsManagerConfig statsManagerConfig) { - super(storageManager, replicaIds, metricRegistry, statsManagerConfig, new MockTime()); + StatsManagerConfig statsManagerConfig, ClusterParticipant clusterParticipant) { + super(storageManager, replicaIds, metricRegistry, statsManagerConfig, new MockTime(), clusterParticipant); } @Override boolean addReplica(ReplicaId id) { - return returnValOfAddReplica; + return returnValOfAddReplica == null ? super.addReplica(id) : returnValOfAddReplica; } } diff --git a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java index f3ed09dfd8..6a622fe999 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java +++ b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java @@ -325,6 +325,7 @@ private void checkValidityOfIds(Collection ids) throws Store */ PartitionId startedPartitionId = null; PartitionId addedPartitionId = null; + ReplicaId getReplicaReturnVal = null; CountDownLatch waitOperationCountdown = new CountDownLatch(0); boolean firstCall = true; List unreachablePartitions = new ArrayList<>(); @@ -376,6 +377,11 @@ public Store getStore(PartitionId id, boolean skipStateCheck) { return storeToReturn; } + @Override + public ReplicaId getReplica(String partitionName) { + return getReplicaReturnVal; + } + @Override public boolean scheduleNextForCompaction(PartitionId id) { if (exceptionToThrowOnSchedulingCompaction != null) { diff --git a/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java b/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java index c44e918376..9d9c0f7899 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java +++ b/ambry-server/src/test/java/com.github.ambry.server/StatsManagerTest.java @@ -18,10 +18,14 @@ import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.clustermap.MockClusterMap; import com.github.ambry.clustermap.MockDataNodeId; +import com.github.ambry.clustermap.MockHelixParticipant; import com.github.ambry.clustermap.MockPartitionId; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.clustermap.ReplicaState; +import com.github.ambry.clustermap.StateModelListenerType; +import com.github.ambry.clustermap.StateTransitionException; +import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.StatsManagerConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.network.Port; @@ -58,9 +62,11 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import org.codehaus.jackson.map.ObjectMapper; +import org.json.JSONObject; import org.junit.After; import org.junit.Test; +import static com.github.ambry.clustermap.TestUtils.*; import static org.junit.Assert.*; @@ -73,6 +79,8 @@ public class StatsManagerTest { private static final int MAX_CONTAINER_COUNT = 6; private static final int MIN_CONTAINER_COUNT = 3; private final StatsManager statsManager; + private final StorageManager storageManager; + private final MockHelixParticipant clusterParticipant; private final String outputFileString; private final File tempDir; private final StatsSnapshot preAggregatedSnapshot; @@ -81,7 +89,8 @@ public class StatsManagerTest { private final List replicas; private final Random random = new Random(); private final ObjectMapper mapper = new ObjectMapper(); - private final StatsManagerConfig config; + private final StatsManagerConfig statsManagerConfig; + private DataNodeId dataNodeId; /** * Deletes the temporary directory. @@ -102,13 +111,24 @@ public StatsManagerTest() throws Exception { tempDir = Files.createTempDirectory("nodeStatsDir-" + UtilsTest.getRandomString(10)).toFile(); tempDir.deleteOnExit(); outputFileString = (new File(tempDir.getAbsolutePath(), "stats_output.json")).getAbsolutePath(); + List zkInfoList = new ArrayList<>(); + zkInfoList.add(new com.github.ambry.utils.TestUtils.ZkInfo(null, "DC1", (byte) 0, 2199, false)); + JSONObject zkJson = constructZkLayoutJSON(zkInfoList); + Properties properties = new Properties(); + properties.put("stats.output.file.path", outputFileString); + properties.setProperty("clustermap.cluster.name", "test"); + properties.setProperty("clustermap.datacenter.name", "DC1"); + properties.setProperty("clustermap.host.name", "localhost"); + properties.setProperty("clustermap.dcs.zk.connect.strings", zkJson.toString(2)); + statsManagerConfig = new StatsManagerConfig(new VerifiableProperties(properties)); + ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(properties)); storeMap = new HashMap<>(); partitionToSnapshot = new HashMap<>(); preAggregatedSnapshot = generateRandomSnapshot().get(StatsReportType.ACCOUNT_REPORT); Pair baseSliceAndNewSlice = new Pair<>(preAggregatedSnapshot, null); replicas = new ArrayList<>(); PartitionId partitionId; - DataNodeId dataNodeId = new MockDataNodeId(Collections.singletonList(new Port(6667, PortType.PLAINTEXT)), + dataNodeId = new MockDataNodeId(Collections.singletonList(new Port(6667, PortType.PLAINTEXT)), Collections.singletonList("/tmp"), "DC1"); for (int i = 0; i < 2; i++) { partitionId = new MockPartitionId(i, MockClusterMap.DEFAULT_PARTITION_CLASS, @@ -126,11 +146,10 @@ public StatsManagerTest() throws Exception { partitionId = new MockPartitionId(2, MockClusterMap.DEFAULT_PARTITION_CLASS); storeMap.put(partitionId, new MockStore(new MockStoreStats(snapshotsByType, false))); partitionToSnapshot.put(partitionId, snapshotsByType.get(StatsReportType.ACCOUNT_REPORT)); - StorageManager storageManager = new MockStorageManager(storeMap, dataNodeId); - Properties properties = new Properties(); - properties.put("stats.output.file.path", outputFileString); - config = new StatsManagerConfig(new VerifiableProperties(properties)); - statsManager = new StatsManager(storageManager, replicas, new MetricRegistry(), config, new MockTime()); + storageManager = new MockStorageManager(storeMap, dataNodeId); + clusterParticipant = new MockHelixParticipant(clusterMapConfig); + statsManager = + new StatsManager(storageManager, replicas, new MetricRegistry(), statsManagerConfig, new MockTime(), null); } /** @@ -180,7 +199,7 @@ public void testStatsManagerWithProblematicStores() throws Exception { problematicStoreMap.put(partitionId2, exceptionStore); StatsManager testStatsManager = new StatsManager(new MockStorageManager(problematicStoreMap, dataNodeId), Arrays.asList(partitionId1.getReplicaIds().get(0), partitionId2.getReplicaIds().get(0)), new MetricRegistry(), - config, new MockTime()); + statsManagerConfig, new MockTime(), null); List unreachablePartitions = new ArrayList<>(); StatsSnapshot actualSnapshot = new StatsSnapshot(0L, null); for (PartitionId partitionId : problematicStoreMap.keySet()) { @@ -204,7 +223,7 @@ public void testStatsManagerWithProblematicStores() throws Exception { mixedStoreMap.put(partitionId4, exceptionStore); testStatsManager = new StatsManager(new MockStorageManager(mixedStoreMap, dataNodeId), Arrays.asList(partitionId3.getReplicaIds().get(0), partitionId4.getReplicaIds().get(0)), new MetricRegistry(), - config, new MockTime()); + statsManagerConfig, new MockTime(), null); actualSnapshot = new StatsSnapshot(0L, null); for (PartitionId partitionId : mixedStoreMap.keySet()) { testStatsManager.collectAndAggregate(actualSnapshot, partitionId, unreachablePartitions); @@ -247,7 +266,8 @@ public void testAddAndRemoveReplica() throws Exception { } StorageManager mockStorageManager = new MockStorageManager(testStoreMap, dataNodeId); StatsManager testStatsManager = - new StatsManager(mockStorageManager, testReplicas, new MetricRegistry(), config, new MockTime()); + new StatsManager(mockStorageManager, testReplicas, new MetricRegistry(), statsManagerConfig, new MockTime(), + null); // verify that adding an existing store to StatsManager should fail assertFalse("Adding a store which already exists should fail", testStatsManager.addReplica(testReplicas.get(0))); @@ -359,6 +379,69 @@ public void testAddAndRemoveReplica() throws Exception { statsWrapper.getSnapshot().getSubMap().containsKey(partitionId4.toPathString())); } + /** + * Test state transition in stats manager from OFFLINE to BOOTSTRAP + */ + @Test + public void testReplicaFromOfflineToBootstrap() { + MockStatsManager mockStatsManager = + new MockStatsManager(storageManager, replicas, new MetricRegistry(), statsManagerConfig, clusterParticipant); + // 1. verify stats manager's listener is registered + assertTrue("Stats manager listener is found in cluster participant", + clusterParticipant.getPartitionStateChangeListeners().containsKey(StateModelListenerType.StatsManagerListener)); + // 2. test partition not found + try { + clusterParticipant.onPartitionBecomeBootstrapFromOffline("InvalidPartition"); + fail("should fail because partition is not found"); + } catch (StateTransitionException e) { + assertEquals("Transition error doesn't match", StateTransitionException.TransitionErrorCode.ReplicaNotFound, + e.getErrorCode()); + } + // 3. create a new partition and test replica addition failure + PartitionId newPartition = new MockPartitionId(3, MockClusterMap.DEFAULT_PARTITION_CLASS, + Collections.singletonList((MockDataNodeId) dataNodeId), 0); + ((MockStorageManager) storageManager).getReplicaReturnVal = newPartition.getReplicaIds().get(0); + mockStatsManager.returnValOfAddReplica = false; + try { + clusterParticipant.onPartitionBecomeBootstrapFromOffline(newPartition.toPathString()); + fail("should fail because adding replica to stats manager failed"); + } catch (StateTransitionException e) { + assertEquals("Transition error code doesn't match", + StateTransitionException.TransitionErrorCode.ReplicaOperationFailure, e.getErrorCode()); + } + // 4. test replica addition success during Offline-To-Bootstrap transition + assertFalse("Before adding new replica, in-mem data structure should not contain new partition", + mockStatsManager.partitionToReplicaMap.containsKey(newPartition)); + mockStatsManager.returnValOfAddReplica = null; + clusterParticipant.onPartitionBecomeBootstrapFromOffline(newPartition.toPathString()); + assertTrue("After adding new replica, in-mem data structure should contain new partition", + mockStatsManager.partitionToReplicaMap.containsKey(newPartition)); + // 5. state transition on existing replica should be no-op + clusterParticipant.onPartitionBecomeBootstrapFromOffline(replicas.get(0).getPartitionId().toPathString()); + } + + /** + * Test state transition in stats manager from STANDBY to LEADER + */ + @Test + public void testReplicaFromStandbyToLeader() { + MockStatsManager mockStatsManager = + new MockStatsManager(storageManager, replicas, new MetricRegistry(), statsManagerConfig, clusterParticipant); + // state transition on existing replica should be no-op + clusterParticipant.onPartitionBecomeLeaderFromStandby(replicas.get(0).getPartitionId().toPathString()); + } + + /** + * Test state transition in stats manager from LEADER to STANDBY + */ + @Test + public void testReplicaFromLeaderToStandby() { + MockStatsManager mockStatsManager = + new MockStatsManager(storageManager, replicas, new MetricRegistry(), statsManagerConfig, clusterParticipant); + // state transition on existing replica should be no-op + clusterParticipant.onPartitionBecomeStandbyFromLeader(replicas.get(0).getPartitionId().toPathString()); + } + /** * Test that the {@link StatsManager} can correctly collect and aggregate all type of stats on the node. This * test is using randomly generated account snapshot and partitionClass snapshot in mock {@link StoreStats}. @@ -386,7 +469,7 @@ public void testGetNodeStatsInJSON() throws Exception { } StorageManager storageManager = new MockStorageManager(storeMap, dataNodeId); StatsManager statsManager = - new StatsManager(storageManager, replicaIds, new MetricRegistry(), config, new MockTime()); + new StatsManager(storageManager, replicaIds, new MetricRegistry(), statsManagerConfig, new MockTime(), null); StatsSnapshot expectAccountSnapshot = new StatsSnapshot(0L, new HashMap<>()); StatsSnapshot expectPartitionClassSnapshot = new StatsSnapshot(0L, new HashMap<>()); diff --git a/ambry-store/src/main/java/com.github.ambry.store/StorageManager.java b/ambry-store/src/main/java/com.github.ambry.store/StorageManager.java index 9ce359daa1..bcd16636dc 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/StorageManager.java +++ b/ambry-store/src/main/java/com.github.ambry.store/StorageManager.java @@ -195,6 +195,11 @@ public Store getStore(PartitionId id, boolean skipStateCheck) { return diskManager != null ? diskManager.getStore(id, skipStateCheck) : null; } + @Override + public ReplicaId getReplica(String partitionName) { + return partitionNameToReplicaId.get(partitionName); + } + @Override public ServerErrorCode checkLocalPartitionStatus(PartitionId partition, ReplicaId localReplica) { if (getStore(partition) == null) { @@ -389,7 +394,7 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) { if (!addBlobStore(replicaToAdd)) { logger.error("Failed to add store {} into storage manager", partitionName); throw new StateTransitionException("Failed to add store " + partitionName + " into storage manager", - StateTransitionException.TransitionErrorCode.StoreOperationFailure); + StateTransitionException.TransitionErrorCode.ReplicaOperationFailure); } // TODO, update InstanceConfig in Helix // note that partitionNameToReplicaId should be updated if addBlobStore succeeds, so replicationManager should be diff --git a/ambry-store/src/test/java/com.github.ambry.store/StorageManagerTest.java b/ambry-store/src/test/java/com.github.ambry.store/StorageManagerTest.java index 9561210dc9..347f7ac743 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/StorageManagerTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/StorageManagerTest.java @@ -282,7 +282,7 @@ public void replicaFromOfflineToBootstrapTest() throws Exception { try { mockHelixParticipant.onPartitionBecomeBootstrapFromOffline(newPartition.toPathString()); } catch (StateTransitionException e) { - assertEquals("Error code doesn't match", StateTransitionException.TransitionErrorCode.StoreOperationFailure, + assertEquals("Error code doesn't match", StateTransitionException.TransitionErrorCode.ReplicaOperationFailure, e.getErrorCode()); } // restart disk manager to test case where new replica(store) is successfully added into StorageManager