From bcab57e72fca02c567cf5927cffdcfd9fbf3a782 Mon Sep 17 00:00:00 2001 From: Yingyi Zhang Date: Thu, 13 Feb 2020 16:00:11 -0800 Subject: [PATCH] Upgrade Helix lib to 0.9.1 and add metrics for state transition (#1374) 1. Upgrade Helix lib which should fix missing ZK callback issues 2. Introduce participant metrics to track partitions in each state --- .../clustermap/ClusterParticipant.java | 7 ++ .../AmbryPartitionStateModel.java | 21 ++++- .../AmbryStateModelFactory.java | 8 +- .../HelixClusterAgentsFactory.java | 2 +- .../HelixParticipant.java | 14 ++- .../HelixParticipantMetrics.java | 64 +++++++++++++ .../AmbryReplicaSyncUpManagerTest.java | 4 +- .../AmbryStateModelFactoryTest.java | 89 ++++++++++++++++++- .../HelixParticipantTest.java | 21 +++-- .../MockHelixParticipant.java | 3 +- .../StorageManager.java | 1 + .../StorageManagerTest.java | 31 ++++++- gradle/dependency-versions.gradle | 2 +- 13 files changed, 245 insertions(+), 22 deletions(-) create mode 100644 ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixParticipantMetrics.java diff --git a/ambry-api/src/main/java/com.github.ambry/clustermap/ClusterParticipant.java b/ambry-api/src/main/java/com.github.ambry/clustermap/ClusterParticipant.java index a0c4e9741e..53774498f4 100644 --- a/ambry-api/src/main/java/com.github.ambry/clustermap/ClusterParticipant.java +++ b/ambry-api/src/main/java/com.github.ambry/clustermap/ClusterParticipant.java @@ -84,6 +84,13 @@ void registerPartitionStateChangeListener(StateModelListenerType listenerType, */ boolean updateDataNodeInfoInCluster(ReplicaId replicaId, boolean shouldExist); + /** + * Initialize participant related metrics if needed. + * @param localPartitionCount total number of partitions on local node. + */ + default void initializeParticipantMetrics(int localPartitionCount) { + } + /** * Terminate the participant. */ diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/AmbryPartitionStateModel.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/AmbryPartitionStateModel.java index 9e418130d7..14ccbb8006 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/AmbryPartitionStateModel.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/AmbryPartitionStateModel.java @@ -32,13 +32,16 @@ public class AmbryPartitionStateModel extends StateModel { private final String partitionName; private final PartitionStateChangeListener partitionStateChangeListener; private final ClusterMapConfig clusterMapConfig; + private final HelixParticipantMetrics helixParticipantMetrics; AmbryPartitionStateModel(String resourceName, String partitionName, - PartitionStateChangeListener partitionStateChangeListener, ClusterMapConfig clusterMapConfig) { + PartitionStateChangeListener partitionStateChangeListener, ClusterMapConfig clusterMapConfig, + HelixParticipantMetrics helixParticipantMetrics) { this.resourceName = resourceName; this.partitionName = partitionName; this.partitionStateChangeListener = Objects.requireNonNull(partitionStateChangeListener); this.clusterMapConfig = Objects.requireNonNull(clusterMapConfig); + this.helixParticipantMetrics = Objects.requireNonNull(helixParticipantMetrics); StateModelParser parser = new StateModelParser(); _currentState = parser.getInitialState(AmbryPartitionStateModel.class); } @@ -50,6 +53,8 @@ public void onBecomeBootstrapFromOffline(Message message, NotificationContext co if (clusterMapConfig.clustermapEnableStateModelListener) { partitionStateChangeListener.onPartitionBecomeBootstrapFromOffline(message.getPartitionName()); } + helixParticipantMetrics.offlineCount.addAndGet(-1); + helixParticipantMetrics.bootstrapCount.addAndGet(1); } @Transition(to = "STANDBY", from = "BOOTSTRAP") @@ -60,6 +65,8 @@ public void onBecomeStandbyFromBootstrap(Message message, NotificationContext co if (clusterMapConfig.clustermapEnableStateModelListener) { partitionStateChangeListener.onPartitionBecomeStandbyFromBootstrap(partitionName); } + helixParticipantMetrics.bootstrapCount.addAndGet(-1); + helixParticipantMetrics.standbyCount.addAndGet(1); } @Transition(to = "LEADER", from = "STANDBY") @@ -67,6 +74,8 @@ public void onBecomeLeaderFromStandby(Message message, NotificationContext conte logger.info("Partition {} in resource {} is becoming LEADER from STANDBY", message.getPartitionName(), message.getResourceName()); partitionStateChangeListener.onPartitionBecomeLeaderFromStandby(message.getPartitionName()); + helixParticipantMetrics.standbyCount.addAndGet(-1); + helixParticipantMetrics.leaderCount.addAndGet(1); } @Transition(to = "STANDBY", from = "LEADER") @@ -74,6 +83,8 @@ public void onBecomeStandbyFromLeader(Message message, NotificationContext conte logger.info("Partition {} in resource {} is becoming STANDBY from LEADER", message.getPartitionName(), message.getResourceName()); partitionStateChangeListener.onPartitionBecomeStandbyFromLeader(message.getPartitionName()); + helixParticipantMetrics.leaderCount.addAndGet(-1); + helixParticipantMetrics.standbyCount.addAndGet(1); } @Transition(to = "INACTIVE", from = "STANDBY") @@ -84,6 +95,8 @@ public void onBecomeInactiveFromStandby(Message message, NotificationContext con if (clusterMapConfig.clustermapEnableStateModelListener) { partitionStateChangeListener.onPartitionBecomeInactiveFromStandby(partitionName); } + helixParticipantMetrics.standbyCount.addAndGet(-1); + helixParticipantMetrics.inactiveCount.addAndGet(1); } @Transition(to = "OFFLINE", from = "INACTIVE") @@ -94,6 +107,8 @@ public void onBecomeOfflineFromInactive(Message message, NotificationContext con if (clusterMapConfig.clustermapEnableStateModelListener) { partitionStateChangeListener.onPartitionBecomeOfflineFromInactive(partitionName); } + helixParticipantMetrics.inactiveCount.addAndGet(-1); + helixParticipantMetrics.offlineCount.addAndGet(1); } @Transition(to = "DROPPED", from = "OFFLINE") @@ -104,16 +119,20 @@ public void onBecomeDroppedFromOffline(Message message, NotificationContext cont if (clusterMapConfig.clustermapEnableStateModelListener) { partitionStateChangeListener.onPartitionBecomeDroppedFromOffline(partitionName); } + helixParticipantMetrics.offlineCount.addAndGet(-1); + helixParticipantMetrics.partitionDroppedCount.inc(); } @Transition(to = "DROPPED", from = "ERROR") public void onBecomeDroppedFromError(Message message, NotificationContext context) { logger.info("Partition {} in resource {} is becoming DROPPED from ERROR", message.getPartitionName(), message.getResourceName()); + helixParticipantMetrics.partitionDroppedCount.inc(); } @Override public void reset() { logger.info("Reset method invoked. Partition {} in resource {} is reset to OFFLINE", partitionName, resourceName); + helixParticipantMetrics.offlineCount.addAndGet(1); } } diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/AmbryStateModelFactory.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/AmbryStateModelFactory.java index 55e17aef7e..81925e91d9 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/AmbryStateModelFactory.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/AmbryStateModelFactory.java @@ -25,10 +25,13 @@ class AmbryStateModelFactory extends StateModelFactory { private final ClusterMapConfig clustermapConfig; private final PartitionStateChangeListener partitionStateChangeListener; + private final HelixParticipantMetrics helixParticipantMetrics; - AmbryStateModelFactory(ClusterMapConfig clusterMapConfig, PartitionStateChangeListener partitionStateChangeListener) { + AmbryStateModelFactory(ClusterMapConfig clusterMapConfig, PartitionStateChangeListener partitionStateChangeListener, + HelixParticipantMetrics helixParticipantMetrics) { this.clustermapConfig = clusterMapConfig; this.partitionStateChangeListener = partitionStateChangeListener; + this.helixParticipantMetrics = helixParticipantMetrics; } /** @@ -43,7 +46,8 @@ public StateModel createNewStateModel(String resourceName, String partitionName) switch (clustermapConfig.clustermapStateModelDefinition) { case AmbryStateModelDefinition.AMBRY_LEADER_STANDBY_MODEL: stateModelToReturn = - new AmbryPartitionStateModel(resourceName, partitionName, partitionStateChangeListener, clustermapConfig); + new AmbryPartitionStateModel(resourceName, partitionName, partitionStateChangeListener, clustermapConfig, + helixParticipantMetrics); break; case LeaderStandbySMD.name: stateModelToReturn = new DefaultLeaderStandbyStateModel(); diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterAgentsFactory.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterAgentsFactory.java index a823fab528..097b77de0a 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterAgentsFactory.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterAgentsFactory.java @@ -60,7 +60,7 @@ public HelixClusterManager getClusterMap() throws IOException { @Override public HelixParticipant getClusterParticipant() throws IOException { if (helixParticipant == null) { - helixParticipant = new HelixParticipant(clusterMapConfig, helixFactory); + helixParticipant = new HelixParticipant(clusterMapConfig, helixFactory, metricRegistry); } return helixParticipant; } diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixParticipant.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixParticipant.java index 3e87f6b25f..6c4d242eca 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixParticipant.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixParticipant.java @@ -13,6 +13,7 @@ */ package com.github.ambry.clustermap; +import com.codahale.metrics.MetricRegistry; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.server.AmbryHealthReport; import com.github.ambry.utils.Utils; @@ -51,6 +52,7 @@ public class HelixParticipant implements ClusterParticipant, PartitionStateChang private final String zkConnectStr; private final Object helixAdministrationLock = new Object(); private final ClusterMapConfig clusterMapConfig; + private final HelixParticipantMetrics participantMetrics; private HelixManager manager; private String instanceName; private HelixAdmin helixAdmin; @@ -63,10 +65,13 @@ public class HelixParticipant implements ClusterParticipant, PartitionStateChang * Instantiate a HelixParticipant. * @param clusterMapConfig the {@link ClusterMapConfig} associated with this participant. * @param helixFactory the {@link HelixFactory} to use to get the {@link HelixManager}. + * @param metricRegistry the {@link MetricRegistry} to instantiate {@link HelixParticipantMetrics}. * @throws IOException if there is an error in parsing the JSON serialized ZK connect string config. */ - public HelixParticipant(ClusterMapConfig clusterMapConfig, HelixFactory helixFactory) throws IOException { + public HelixParticipant(ClusterMapConfig clusterMapConfig, HelixFactory helixFactory, MetricRegistry metricRegistry) + throws IOException { this.clusterMapConfig = clusterMapConfig; + participantMetrics = new HelixParticipantMetrics(metricRegistry); clusterName = clusterMapConfig.clusterMapClusterName; instanceName = ClusterMapUtils.getInstanceName(clusterMapConfig.clusterMapHostName, clusterMapConfig.clusterMapPort); @@ -88,6 +93,11 @@ public HelixParticipant(ClusterMapConfig clusterMapConfig, HelixFactory helixFac partitionStateChangeListeners = new HashMap<>(); } + @Override + public void initializeParticipantMetrics(int localPartitionCount) { + participantMetrics.setLocalPartitionCount(localPartitionCount); + } + /** * Initiate the participation by registering via the {@link HelixManager} as a participant to the associated * Helix cluster. @@ -100,7 +110,7 @@ public void participate(List ambryHealthReports) throws IOExc clusterMapConfig.clustermapStateModelDefinition); StateMachineEngine stateMachineEngine = manager.getStateMachineEngine(); stateMachineEngine.registerStateModelFactory(clusterMapConfig.clustermapStateModelDefinition, - new AmbryStateModelFactory(clusterMapConfig, this)); + new AmbryStateModelFactory(clusterMapConfig, this, participantMetrics)); registerHealthReportTasks(stateMachineEngine, ambryHealthReports); try { synchronized (helixAdministrationLock) { diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixParticipantMetrics.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixParticipantMetrics.java new file mode 100644 index 0000000000..a68c61604a --- /dev/null +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixParticipantMetrics.java @@ -0,0 +1,64 @@ +/* + * Copyright 2020 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.clustermap; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * Metrics for {@link HelixParticipant} to monitor partition state transitions. + */ +class HelixParticipantMetrics { + final AtomicInteger bootstrapCount = new AtomicInteger(); + final AtomicInteger standbyCount = new AtomicInteger(); + final AtomicInteger leaderCount = new AtomicInteger(); + final AtomicInteger inactiveCount = new AtomicInteger(); + final AtomicInteger offlineCount = new AtomicInteger(); + // no need to record exact number of "dropped" partition, a counter to track partition-dropped events would suffice + final Counter partitionDroppedCount; + + HelixParticipantMetrics(MetricRegistry metricRegistry) { + Gauge bootstrapPartitionCount = bootstrapCount::get; + metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "bootstrapPartitionCount"), + bootstrapPartitionCount); + Gauge standbyPartitionCount = standbyCount::get; + metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "standbyPartitionCount"), + standbyPartitionCount); + Gauge leaderPartitionCount = leaderCount::get; + metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "leaderPartitionCount"), leaderPartitionCount); + Gauge inactivePartitionCount = inactiveCount::get; + metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "inactivePartitionCount"), + inactivePartitionCount); + Gauge offlinePartitionCount = offlineCount::get; + metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "offlinePartitionCount"), + offlinePartitionCount); + partitionDroppedCount = + metricRegistry.counter(MetricRegistry.name(HelixParticipant.class, "partitionDroppedCount")); + } + + /** + * Set number of partitions on current node. This is invoked during startup. + * @param partitionCount number of partitions on current node + */ + void setLocalPartitionCount(int partitionCount) { + // this method should be invoked before participation, so the initial value is expected to be 0. + if (!offlineCount.compareAndSet(0, partitionCount)) { + throw new IllegalStateException( + "Number of OFFLINE partitions has changed before initializing participant metrics"); + } + } +} diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/AmbryReplicaSyncUpManagerTest.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/AmbryReplicaSyncUpManagerTest.java index f795b3d27e..44c9cd02be 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/AmbryReplicaSyncUpManagerTest.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/AmbryReplicaSyncUpManagerTest.java @@ -13,6 +13,7 @@ */ package com.github.ambry.clustermap; +import com.codahale.metrics.MetricRegistry; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.utils.TestUtils; @@ -81,7 +82,8 @@ public AmbryReplicaSyncUpManagerTest() throws IOException { mockHelixParticipant.currentReplica = currentReplica; mockHelixParticipant.replicaSyncUpService = replicaSyncUpService; stateModel = - new AmbryPartitionStateModel(RESOURCE_NAME, partition.toPathString(), mockHelixParticipant, clusterMapConfig); + new AmbryPartitionStateModel(RESOURCE_NAME, partition.toPathString(), mockHelixParticipant, clusterMapConfig, + new HelixParticipantMetrics(new MetricRegistry())); mockMessage = Mockito.mock(Message.class); when(mockMessage.getPartitionName()).thenReturn(partition.toPathString()); when(mockMessage.getResourceName()).thenReturn(RESOURCE_NAME); diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/AmbryStateModelFactoryTest.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/AmbryStateModelFactoryTest.java index 16c9124a46..c4f365e262 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/AmbryStateModelFactoryTest.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/AmbryStateModelFactoryTest.java @@ -13,25 +13,35 @@ */ package com.github.ambry.clustermap; +import com.codahale.metrics.MetricRegistry; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.utils.TestUtils; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; +import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.StateModel; +import org.json.JSONObject; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.Mockito; +import static com.github.ambry.clustermap.TestUtils.*; import static org.junit.Assert.*; +import static org.junit.Assume.*; +import static org.mockito.Mockito.*; /** - * Test for {@link AmbryStateModelFactory} + * Test for {@link AmbryStateModelFactory} and {@link AmbryPartitionStateModel} */ @RunWith(Parameterized.class) public class AmbryStateModelFactoryTest { private final ClusterMapConfig config; + private final String stateModelDef; @Parameterized.Parameters public static List data() { @@ -39,13 +49,18 @@ public static List data() { new Object[][]{{ClusterMapConfig.OLD_STATE_MODEL_DEF}, {ClusterMapConfig.AMBRY_STATE_MODEL_DEF}}); } - public AmbryStateModelFactoryTest(String stateModelDef) { + public AmbryStateModelFactoryTest(String stateModelDef) throws Exception { + List zkInfoList = new ArrayList<>(); + zkInfoList.add(new TestUtils.ZkInfo(null, "DC0", (byte) 0, 2299, false)); + JSONObject zkJson = constructZkLayoutJSON(zkInfoList); Properties props = new Properties(); props.setProperty("clustermap.host.name", "localhost"); props.setProperty("clustermap.cluster.name", "AmbryTest"); props.setProperty("clustermap.datacenter.name", "DC0"); props.setProperty("clustermap.state.model.definition", stateModelDef); + props.setProperty("clustermap.dcs.zk.connect.strings", zkJson.toString(2)); config = new ClusterMapConfig(new VerifiableProperties(props)); + this.stateModelDef = stateModelDef; } @Test @@ -85,7 +100,7 @@ public void onPartitionBecomeOfflineFromInactive(String partitionName) { public void onPartitionBecomeDroppedFromOffline(String partitionName) { // no op } - }); + }, new HelixParticipantMetrics(new MetricRegistry())); StateModel stateModel; switch (config.clustermapStateModelDefinition) { case ClusterMapConfig.OLD_STATE_MODEL_DEF: @@ -100,4 +115,72 @@ public void onPartitionBecomeDroppedFromOffline(String partitionName) { // state model is already validated in clusterMapConfig, no need to test invalid state model here. } } + + /** + * Test that {@link HelixParticipantMetrics} keeps track of partition during state transition + * @throws Exception + */ + @Test + public void testAmbryPartitionStateModel() throws Exception { + assumeTrue(stateModelDef.equals(ClusterMapConfig.AMBRY_STATE_MODEL_DEF)); + MockHelixParticipant mockHelixParticipant = new MockHelixParticipant(config); + MetricRegistry metricRegistry = new MetricRegistry(); + HelixParticipantMetrics participantMetrics = new HelixParticipantMetrics(metricRegistry); + String resourceName = "0"; + String partitionName = "1"; + Message mockMessage = Mockito.mock(Message.class); + when(mockMessage.getPartitionName()).thenReturn(partitionName); + when(mockMessage.getResourceName()).thenReturn(resourceName); + AmbryPartitionStateModel stateModel = + new AmbryPartitionStateModel(resourceName, partitionName, mockHelixParticipant, config, participantMetrics); + participantMetrics.setLocalPartitionCount(1); + assertEquals("Offline count is not expected", 1, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".offlinePartitionCount").getValue()); + // OFFLINE -> BOOTSTRAP + stateModel.onBecomeBootstrapFromOffline(mockMessage, null); + assertEquals("Bootstrap count should be 1", 1, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".bootstrapPartitionCount").getValue()); + assertEquals("Offline count should be 0", 0, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".offlinePartitionCount").getValue()); + // BOOTSTRAP -> STANDBY + stateModel.onBecomeStandbyFromBootstrap(mockMessage, null); + assertEquals("Standby count should be 1", 1, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".standbyPartitionCount").getValue()); + assertEquals("Bootstrap count should be 0", 0, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".bootstrapPartitionCount").getValue()); + // STANDBY -> LEADER + stateModel.onBecomeLeaderFromStandby(mockMessage, null); + assertEquals("Leader count should be 1", 1, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".leaderPartitionCount").getValue()); + assertEquals("Standby count should be 0", 0, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".standbyPartitionCount").getValue()); + // LEADER -> STANDBY + stateModel.onBecomeStandbyFromLeader(mockMessage, null); + assertEquals("Standby count should be 1", 1, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".standbyPartitionCount").getValue()); + assertEquals("Leader count should be 0", 0, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".leaderPartitionCount").getValue()); + // STANDBY -> INACTIVE + stateModel.onBecomeInactiveFromStandby(mockMessage, null); + assertEquals("Inactive count should be 1", 1, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".inactivePartitionCount").getValue()); + assertEquals("Standby count should be 0", 0, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".standbyPartitionCount").getValue()); + // INACTIVE -> OFFLINE + stateModel.onBecomeOfflineFromInactive(mockMessage, null); + assertEquals("Offline count should be 1", 1, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".offlinePartitionCount").getValue()); + assertEquals("Inactive count should be 0", 0, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".inactivePartitionCount").getValue()); + // OFFLINE -> DROPPED + stateModel.onBecomeDroppedFromOffline(mockMessage, null); + assertEquals("Offline count should be 0", 0, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".offlinePartitionCount").getValue()); + assertEquals("Dropped count should be updated", 1, participantMetrics.partitionDroppedCount.getCount()); + + // reset method + stateModel.reset(); + assertEquals("Offline count should be 1 after reset", 1, + metricRegistry.getGauges().get(HelixParticipant.class.getName() + ".offlinePartitionCount").getValue()); + } } diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/HelixParticipantTest.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/HelixParticipantTest.java index 2ced99012b..e51b040455 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/HelixParticipantTest.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/HelixParticipantTest.java @@ -13,6 +13,7 @@ */ package com.github.ambry.clustermap; +import com.codahale.metrics.MetricRegistry; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.VerifiableProperties; import java.io.IOException; @@ -91,7 +92,8 @@ public void testGetAndSetReplicaSealedState() throws IOException { int port = 2200; String instanceName = ClusterMapUtils.getInstanceName(hostname, port); HelixParticipant helixParticipant = - new HelixParticipant(new ClusterMapConfig(new VerifiableProperties(props)), helixManagerFactory); + new HelixParticipant(new ClusterMapConfig(new VerifiableProperties(props)), helixManagerFactory, + new MetricRegistry()); helixParticipant.participate(Collections.emptyList()); HelixManager helixManager = helixManagerFactory.getZKHelixManager(null, null, null, null); HelixAdmin helixAdmin = helixManager.getClusterManagmentTool(); @@ -180,9 +182,11 @@ public void testGetAndSetReplicaStoppedState() throws IOException, JSONException String instanceName = ClusterMapUtils.getInstanceName(hostname, port); String instanceNameDummy = ClusterMapUtils.getInstanceName("dummyHost", 2200); HelixParticipant helixParticipant = - new HelixParticipant(new ClusterMapConfig(new VerifiableProperties(props)), helixManagerFactory); + new HelixParticipant(new ClusterMapConfig(new VerifiableProperties(props)), helixManagerFactory, + new MetricRegistry()); HelixParticipant helixParticipantDummy = - new HelixParticipant(new ClusterMapConfig(new VerifiableProperties(propsDummy)), helixManagerFactory); + new HelixParticipant(new ClusterMapConfig(new VerifiableProperties(propsDummy)), helixManagerFactory, + new MetricRegistry()); HelixParticipant helixParticipantSpy = Mockito.spy(helixParticipant); helixParticipant.participate(Collections.emptyList()); helixParticipantDummy.participate(Collections.emptyList()); @@ -274,7 +278,8 @@ public void testBadCases() throws IOException { // Connect failure. ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); helixManagerFactory.getHelixManager().beBad = true; - HelixParticipant helixParticipant = new HelixParticipant(clusterMapConfig, helixManagerFactory); + HelixParticipant helixParticipant = + new HelixParticipant(clusterMapConfig, helixManagerFactory, new MetricRegistry()); try { helixParticipant.participate(Collections.emptyList()); fail("Participation should have failed"); @@ -286,7 +291,7 @@ public void testBadCases() throws IOException { props.setProperty("clustermap.cluster.name", ""); clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); try { - new HelixParticipant(clusterMapConfig, helixManagerFactory); + new HelixParticipant(clusterMapConfig, helixManagerFactory, new MetricRegistry()); fail("Instantiation should have failed"); } catch (IllegalStateException e) { // OK @@ -296,7 +301,7 @@ public void testBadCases() throws IOException { props.setProperty("clustermap.dcs.zk.connect.strings", ""); clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); try { - new HelixParticipant(clusterMapConfig, helixManagerFactory); + new HelixParticipant(clusterMapConfig, helixManagerFactory, new MetricRegistry()); fail("Instantiation should have failed"); } catch (IOException e) { // OK @@ -310,7 +315,7 @@ public void testBadCases() throws IOException { @Test public void testHelixParticipant() throws Exception { ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); - HelixParticipant participant = new HelixParticipant(clusterMapConfig, helixManagerFactory); + HelixParticipant participant = new HelixParticipant(clusterMapConfig, helixManagerFactory, new MetricRegistry()); participant.participate(Collections.emptyList()); MockHelixManagerFactory.MockHelixManager helixManager = helixManagerFactory.getHelixManager(); assertTrue(helixManager.isConnected()); @@ -335,7 +340,7 @@ public void testUpdateNodeInfoInCluster() throws Exception { props.setProperty("clustermap.update.datanode.info", Boolean.toString(true)); props.setProperty("clustermap.port", String.valueOf(localNode.getPort())); ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); - HelixParticipant participant = new HelixParticipant(clusterMapConfig, helixManagerFactory); + HelixParticipant participant = new HelixParticipant(clusterMapConfig, helixManagerFactory, new MetricRegistry()); // create InstanceConfig for local node. Also, put existing replica into sealed list List sealedList = new ArrayList<>(); sealedList.add(existingReplica.getPartitionId().toPathString()); diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockHelixParticipant.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockHelixParticipant.java index 086832d96c..82bdcd76df 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockHelixParticipant.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockHelixParticipant.java @@ -13,6 +13,7 @@ */ package com.github.ambry.clustermap; +import com.codahale.metrics.MetricRegistry; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.server.AmbryHealthReport; import java.io.IOException; @@ -39,7 +40,7 @@ public class MockHelixParticipant extends HelixParticipant { private PartitionStateChangeListener mockReplicationManagerListener; public MockHelixParticipant(ClusterMapConfig clusterMapConfig) throws IOException { - super(clusterMapConfig, new MockHelixManagerFactory()); + super(clusterMapConfig, new MockHelixManagerFactory(), new MetricRegistry()); // create mock state change listener for ReplicationManager mockReplicationManagerListener = Mockito.mock(PartitionStateChangeListener.class); // mock Bootstrap-To-Standby change diff --git a/ambry-store/src/main/java/com.github.ambry.store/StorageManager.java b/ambry-store/src/main/java/com.github.ambry.store/StorageManager.java index 1d0280c2e8..53eb985724 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/StorageManager.java +++ b/ambry-store/src/main/java/com.github.ambry.store/StorageManager.java @@ -177,6 +177,7 @@ public void start() throws InterruptedException { if (clusterParticipant != null) { clusterParticipant.registerPartitionStateChangeListener(StateModelListenerType.StorageManagerListener, new PartitionStateChangeListenerImpl()); + clusterParticipant.initializeParticipantMetrics(partitionNameToReplicaId.size()); } logger.info("Starting storage manager complete"); } finally { diff --git a/ambry-store/src/test/java/com.github.ambry.store/StorageManagerTest.java b/ambry-store/src/test/java/com.github.ambry.store/StorageManagerTest.java index b86157a9f3..fe893114ad 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/StorageManagerTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/StorageManagerTest.java @@ -348,6 +348,7 @@ public void replicaFromStandbyToInactiveTest() throws Exception { shutdownAndAssertStoresInaccessible(storageManager, localReplicas); // 5. mock disable compaction failure + mockHelixParticipant = new MockClusterParticipant(); MockStorageManager mockStorageManager = new MockStorageManager(localNode, mockHelixParticipant); mockStorageManager.start(); try { @@ -392,6 +393,32 @@ public void replicaFromInactiveToOfflineTest() throws Exception { shutdownAndAssertStoresInaccessible(storageManager, localReplicas); } + /** + * Test that initializing participant metrics fails because the initial offline partition count is not zero. + * @throws Exception + */ + @Test + public void initParticipantMetricsFailureTest() throws Exception { + generateConfigs(true, false); + MockDataNodeId localNode = clusterMap.getDataNodes().get(0); + List localReplicas = clusterMap.getReplicaIds(localNode); + MockClusterParticipant mockHelixParticipant = new MockClusterParticipant(); + // create first storage manager and start + StorageManager storageManager1 = createStorageManager(localNode, new MetricRegistry(), mockHelixParticipant); + storageManager1.start(); + shutdownAndAssertStoresInaccessible(storageManager1, localReplicas); + // create second storage manager with same mock helix participant + StorageManager storageManager2 = createStorageManager(localNode, new MetricRegistry(), mockHelixParticipant); + try { + storageManager2.start(); + fail("should fail because offline partition count is non-zero before initialization"); + } catch (IllegalStateException e) { + // expected + } finally { + shutdownAndAssertStoresInaccessible(storageManager2, localReplicas); + } + } + /** * Test failure cases when updating InstanceConfig in Helix for both Offline-To-Bootstrap and Inactive-To-Offline. */ @@ -1166,7 +1193,7 @@ private class MockClusterParticipant extends HelixParticipant { Set stoppedReplicas = new HashSet<>(); MockClusterParticipant() throws IOException { - super(clusterMapConfig, new MockHelixManagerFactory()); + super(clusterMapConfig, new MockHelixManagerFactory(), new MetricRegistry()); } @Override @@ -1223,7 +1250,7 @@ private class MockStorageManager extends StorageManager { boolean controlCompactionReturnVal = false; MockStorageManager(DataNodeId currentNode, ClusterParticipant clusterParticipant) throws Exception { - super(storeConfig, diskManagerConfig, Utils.newScheduler(1, false), metricRegistry, new MockIdFactory(), + super(storeConfig, diskManagerConfig, Utils.newScheduler(1, false), new MetricRegistry(), new MockIdFactory(), clusterMap, currentNode, new DummyMessageStoreHardDelete(), clusterParticipant, SystemTime.getInstance(), new DummyMessageStoreRecovery()); } diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index b3ed5cd587..69aa75c786 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -19,7 +19,7 @@ ext { javaxVersion = "3.0.1" nettyVersion = "4.1.42.Final" nettyTcnativeVersion = "2.0.26.Final" - helixVersion = "0.8.4" + helixVersion = "0.9.1" jacksonVersion = "1.8.5" jaydioVersion = "0.1" azureStorageVersion = "12.2.0"