Skip to content

Commit

Permalink
Upgrade Helix lib to 0.9.1 and add metrics for state transition (#1374)
Browse files Browse the repository at this point in the history
1. Upgrade Helix lib which should fix missing ZK callback issues
2. Introduce participant metrics to track partitions in each state
  • Loading branch information
jsjtzyy authored Feb 14, 2020
1 parent 6f407d1 commit bcab57e
Show file tree
Hide file tree
Showing 13 changed files with 245 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ void registerPartitionStateChangeListener(StateModelListenerType listenerType,
*/
boolean updateDataNodeInfoInCluster(ReplicaId replicaId, boolean shouldExist);

/**
* Initialize participant related metrics if needed.
* @param localPartitionCount total number of partitions on local node.
*/
default void initializeParticipantMetrics(int localPartitionCount) {
}

/**
* Terminate the participant.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ public class AmbryPartitionStateModel extends StateModel {
private final String partitionName;
private final PartitionStateChangeListener partitionStateChangeListener;
private final ClusterMapConfig clusterMapConfig;
private final HelixParticipantMetrics helixParticipantMetrics;

AmbryPartitionStateModel(String resourceName, String partitionName,
PartitionStateChangeListener partitionStateChangeListener, ClusterMapConfig clusterMapConfig) {
PartitionStateChangeListener partitionStateChangeListener, ClusterMapConfig clusterMapConfig,
HelixParticipantMetrics helixParticipantMetrics) {
this.resourceName = resourceName;
this.partitionName = partitionName;
this.partitionStateChangeListener = Objects.requireNonNull(partitionStateChangeListener);
this.clusterMapConfig = Objects.requireNonNull(clusterMapConfig);
this.helixParticipantMetrics = Objects.requireNonNull(helixParticipantMetrics);
StateModelParser parser = new StateModelParser();
_currentState = parser.getInitialState(AmbryPartitionStateModel.class);
}
Expand All @@ -50,6 +53,8 @@ public void onBecomeBootstrapFromOffline(Message message, NotificationContext co
if (clusterMapConfig.clustermapEnableStateModelListener) {
partitionStateChangeListener.onPartitionBecomeBootstrapFromOffline(message.getPartitionName());
}
helixParticipantMetrics.offlineCount.addAndGet(-1);
helixParticipantMetrics.bootstrapCount.addAndGet(1);
}

@Transition(to = "STANDBY", from = "BOOTSTRAP")
Expand All @@ -60,20 +65,26 @@ public void onBecomeStandbyFromBootstrap(Message message, NotificationContext co
if (clusterMapConfig.clustermapEnableStateModelListener) {
partitionStateChangeListener.onPartitionBecomeStandbyFromBootstrap(partitionName);
}
helixParticipantMetrics.bootstrapCount.addAndGet(-1);
helixParticipantMetrics.standbyCount.addAndGet(1);
}

@Transition(to = "LEADER", from = "STANDBY")
public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
logger.info("Partition {} in resource {} is becoming LEADER from STANDBY", message.getPartitionName(),
message.getResourceName());
partitionStateChangeListener.onPartitionBecomeLeaderFromStandby(message.getPartitionName());
helixParticipantMetrics.standbyCount.addAndGet(-1);
helixParticipantMetrics.leaderCount.addAndGet(1);
}

@Transition(to = "STANDBY", from = "LEADER")
public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
logger.info("Partition {} in resource {} is becoming STANDBY from LEADER", message.getPartitionName(),
message.getResourceName());
partitionStateChangeListener.onPartitionBecomeStandbyFromLeader(message.getPartitionName());
helixParticipantMetrics.leaderCount.addAndGet(-1);
helixParticipantMetrics.standbyCount.addAndGet(1);
}

@Transition(to = "INACTIVE", from = "STANDBY")
Expand All @@ -84,6 +95,8 @@ public void onBecomeInactiveFromStandby(Message message, NotificationContext con
if (clusterMapConfig.clustermapEnableStateModelListener) {
partitionStateChangeListener.onPartitionBecomeInactiveFromStandby(partitionName);
}
helixParticipantMetrics.standbyCount.addAndGet(-1);
helixParticipantMetrics.inactiveCount.addAndGet(1);
}

@Transition(to = "OFFLINE", from = "INACTIVE")
Expand All @@ -94,6 +107,8 @@ public void onBecomeOfflineFromInactive(Message message, NotificationContext con
if (clusterMapConfig.clustermapEnableStateModelListener) {
partitionStateChangeListener.onPartitionBecomeOfflineFromInactive(partitionName);
}
helixParticipantMetrics.inactiveCount.addAndGet(-1);
helixParticipantMetrics.offlineCount.addAndGet(1);
}

@Transition(to = "DROPPED", from = "OFFLINE")
Expand All @@ -104,16 +119,20 @@ public void onBecomeDroppedFromOffline(Message message, NotificationContext cont
if (clusterMapConfig.clustermapEnableStateModelListener) {
partitionStateChangeListener.onPartitionBecomeDroppedFromOffline(partitionName);
}
helixParticipantMetrics.offlineCount.addAndGet(-1);
helixParticipantMetrics.partitionDroppedCount.inc();
}

@Transition(to = "DROPPED", from = "ERROR")
public void onBecomeDroppedFromError(Message message, NotificationContext context) {
logger.info("Partition {} in resource {} is becoming DROPPED from ERROR", message.getPartitionName(),
message.getResourceName());
helixParticipantMetrics.partitionDroppedCount.inc();
}

@Override
public void reset() {
logger.info("Reset method invoked. Partition {} in resource {} is reset to OFFLINE", partitionName, resourceName);
helixParticipantMetrics.offlineCount.addAndGet(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
class AmbryStateModelFactory extends StateModelFactory<StateModel> {
private final ClusterMapConfig clustermapConfig;
private final PartitionStateChangeListener partitionStateChangeListener;
private final HelixParticipantMetrics helixParticipantMetrics;

AmbryStateModelFactory(ClusterMapConfig clusterMapConfig, PartitionStateChangeListener partitionStateChangeListener) {
AmbryStateModelFactory(ClusterMapConfig clusterMapConfig, PartitionStateChangeListener partitionStateChangeListener,
HelixParticipantMetrics helixParticipantMetrics) {
this.clustermapConfig = clusterMapConfig;
this.partitionStateChangeListener = partitionStateChangeListener;
this.helixParticipantMetrics = helixParticipantMetrics;
}

/**
Expand All @@ -43,7 +46,8 @@ public StateModel createNewStateModel(String resourceName, String partitionName)
switch (clustermapConfig.clustermapStateModelDefinition) {
case AmbryStateModelDefinition.AMBRY_LEADER_STANDBY_MODEL:
stateModelToReturn =
new AmbryPartitionStateModel(resourceName, partitionName, partitionStateChangeListener, clustermapConfig);
new AmbryPartitionStateModel(resourceName, partitionName, partitionStateChangeListener, clustermapConfig,
helixParticipantMetrics);
break;
case LeaderStandbySMD.name:
stateModelToReturn = new DefaultLeaderStandbyStateModel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public HelixClusterManager getClusterMap() throws IOException {
@Override
public HelixParticipant getClusterParticipant() throws IOException {
if (helixParticipant == null) {
helixParticipant = new HelixParticipant(clusterMapConfig, helixFactory);
helixParticipant = new HelixParticipant(clusterMapConfig, helixFactory, metricRegistry);
}
return helixParticipant;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.github.ambry.clustermap;

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.server.AmbryHealthReport;
import com.github.ambry.utils.Utils;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class HelixParticipant implements ClusterParticipant, PartitionStateChang
private final String zkConnectStr;
private final Object helixAdministrationLock = new Object();
private final ClusterMapConfig clusterMapConfig;
private final HelixParticipantMetrics participantMetrics;
private HelixManager manager;
private String instanceName;
private HelixAdmin helixAdmin;
Expand All @@ -63,10 +65,13 @@ public class HelixParticipant implements ClusterParticipant, PartitionStateChang
* Instantiate a HelixParticipant.
* @param clusterMapConfig the {@link ClusterMapConfig} associated with this participant.
* @param helixFactory the {@link HelixFactory} to use to get the {@link HelixManager}.
* @param metricRegistry the {@link MetricRegistry} to instantiate {@link HelixParticipantMetrics}.
* @throws IOException if there is an error in parsing the JSON serialized ZK connect string config.
*/
public HelixParticipant(ClusterMapConfig clusterMapConfig, HelixFactory helixFactory) throws IOException {
public HelixParticipant(ClusterMapConfig clusterMapConfig, HelixFactory helixFactory, MetricRegistry metricRegistry)
throws IOException {
this.clusterMapConfig = clusterMapConfig;
participantMetrics = new HelixParticipantMetrics(metricRegistry);
clusterName = clusterMapConfig.clusterMapClusterName;
instanceName =
ClusterMapUtils.getInstanceName(clusterMapConfig.clusterMapHostName, clusterMapConfig.clusterMapPort);
Expand All @@ -88,6 +93,11 @@ public HelixParticipant(ClusterMapConfig clusterMapConfig, HelixFactory helixFac
partitionStateChangeListeners = new HashMap<>();
}

@Override
public void initializeParticipantMetrics(int localPartitionCount) {
participantMetrics.setLocalPartitionCount(localPartitionCount);
}

/**
* Initiate the participation by registering via the {@link HelixManager} as a participant to the associated
* Helix cluster.
Expand All @@ -100,7 +110,7 @@ public void participate(List<AmbryHealthReport> ambryHealthReports) throws IOExc
clusterMapConfig.clustermapStateModelDefinition);
StateMachineEngine stateMachineEngine = manager.getStateMachineEngine();
stateMachineEngine.registerStateModelFactory(clusterMapConfig.clustermapStateModelDefinition,
new AmbryStateModelFactory(clusterMapConfig, this));
new AmbryStateModelFactory(clusterMapConfig, this, participantMetrics));
registerHealthReportTasks(stateMachineEngine, ambryHealthReports);
try {
synchronized (helixAdministrationLock) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2020 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.clustermap;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import java.util.concurrent.atomic.AtomicInteger;


/**
* Metrics for {@link HelixParticipant} to monitor partition state transitions.
*/
class HelixParticipantMetrics {
final AtomicInteger bootstrapCount = new AtomicInteger();
final AtomicInteger standbyCount = new AtomicInteger();
final AtomicInteger leaderCount = new AtomicInteger();
final AtomicInteger inactiveCount = new AtomicInteger();
final AtomicInteger offlineCount = new AtomicInteger();
// no need to record exact number of "dropped" partition, a counter to track partition-dropped events would suffice
final Counter partitionDroppedCount;

HelixParticipantMetrics(MetricRegistry metricRegistry) {
Gauge<Integer> bootstrapPartitionCount = bootstrapCount::get;
metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "bootstrapPartitionCount"),
bootstrapPartitionCount);
Gauge<Integer> standbyPartitionCount = standbyCount::get;
metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "standbyPartitionCount"),
standbyPartitionCount);
Gauge<Integer> leaderPartitionCount = leaderCount::get;
metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "leaderPartitionCount"), leaderPartitionCount);
Gauge<Integer> inactivePartitionCount = inactiveCount::get;
metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "inactivePartitionCount"),
inactivePartitionCount);
Gauge<Integer> offlinePartitionCount = offlineCount::get;
metricRegistry.register(MetricRegistry.name(HelixParticipant.class, "offlinePartitionCount"),
offlinePartitionCount);
partitionDroppedCount =
metricRegistry.counter(MetricRegistry.name(HelixParticipant.class, "partitionDroppedCount"));
}

/**
* Set number of partitions on current node. This is invoked during startup.
* @param partitionCount number of partitions on current node
*/
void setLocalPartitionCount(int partitionCount) {
// this method should be invoked before participation, so the initial value is expected to be 0.
if (!offlineCount.compareAndSet(0, partitionCount)) {
throw new IllegalStateException(
"Number of OFFLINE partitions has changed before initializing participant metrics");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.github.ambry.clustermap;

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.utils.TestUtils;
Expand Down Expand Up @@ -81,7 +82,8 @@ public AmbryReplicaSyncUpManagerTest() throws IOException {
mockHelixParticipant.currentReplica = currentReplica;
mockHelixParticipant.replicaSyncUpService = replicaSyncUpService;
stateModel =
new AmbryPartitionStateModel(RESOURCE_NAME, partition.toPathString(), mockHelixParticipant, clusterMapConfig);
new AmbryPartitionStateModel(RESOURCE_NAME, partition.toPathString(), mockHelixParticipant, clusterMapConfig,
new HelixParticipantMetrics(new MetricRegistry()));
mockMessage = Mockito.mock(Message.class);
when(mockMessage.getPartitionName()).thenReturn(partition.toPathString());
when(mockMessage.getResourceName()).thenReturn(RESOURCE_NAME);
Expand Down
Loading

0 comments on commit bcab57e

Please sign in to comment.