Skip to content

Commit

Permalink
Decouple ClusterStateTaskListener & ClusterApplier (#30809)
Browse files Browse the repository at this point in the history
Today, the `ClusterApplier` and `MasterService` both use the
`ClusterStateTaskListener` interface to notify their callers when asynchronous
activities have completed. However, this is not wholly appropriate: none of the
callers into the `ClusterApplier` care about the `ClusterState` arguments that
they receive.  This change introduces a dedicated ClusterApplyListener
interface for callers into the `ClusterApplier`, to distinguish these listeners
from the real `ClusterStateTaskListener`s that are waiting for responses from
the `MasterService`.
  • Loading branch information
DaveCTurner committed May 24, 2018
1 parent 0380d93 commit 9dbcb4f
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.cluster.service;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;

import java.util.function.Supplier;

Expand All @@ -38,11 +37,29 @@ public interface ClusterApplier {
* @param clusterStateSupplier the cluster state supplier which provides the latest cluster state to apply
* @param listener callback that is invoked after cluster state is applied
*/
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener);
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener);

/**
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
*/
ClusterState.Builder newClusterStateBuilder();

/**
* Listener for results of cluster state application
*/
interface ClusterApplyListener {
/**
* Called on successful cluster state application
* @param source information where the cluster state came from
*/
default void onSuccess(String source) {
}

/**
* Called on failure during cluster state application
* @param source information where the cluster state came from
* @param e exception that occurred
*/
void onFailure(String source, Exception e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
Expand Down Expand Up @@ -141,10 +140,10 @@ protected synchronized void doStart() {
}

class UpdateTask extends SourcePrioritizedRunnable implements Function<ClusterState, ClusterState> {
final ClusterStateTaskListener listener;
final ClusterApplyListener listener;
final Function<ClusterState, ClusterState> updateFunction;

UpdateTask(Priority priority, String source, ClusterStateTaskListener listener,
UpdateTask(Priority priority, String source, ClusterApplyListener listener,
Function<ClusterState, ClusterState> updateFunction) {
super(priority, source);
this.listener = listener;
Expand Down Expand Up @@ -301,7 +300,7 @@ public void run() {
}

public void runOnApplierThread(final String source, Consumer<ClusterState> clusterStateConsumer,
final ClusterStateTaskListener listener, Priority priority) {
final ClusterApplyListener listener, Priority priority) {
submitStateUpdateTask(source, ClusterStateTaskConfig.build(priority),
(clusterState) -> {
clusterStateConsumer.accept(clusterState);
Expand All @@ -311,13 +310,13 @@ public void runOnApplierThread(final String source, Consumer<ClusterState> clust
}

public void runOnApplierThread(final String source, Consumer<ClusterState> clusterStateConsumer,
final ClusterStateTaskListener listener) {
final ClusterApplyListener listener) {
runOnApplierThread(source, clusterStateConsumer, listener, Priority.HIGH);
}

@Override
public void onNewClusterState(final String source, final Supplier<ClusterState> clusterStateSupplier,
final ClusterStateTaskListener listener) {
final ClusterApplyListener listener) {
Function<ClusterState, ClusterState> applyFunction = currentState -> {
ClusterState nextState = clusterStateSupplier.get();
if (nextState != null) {
Expand All @@ -331,12 +330,12 @@ public void onNewClusterState(final String source, final Supplier<ClusterState>

private void submitStateUpdateTask(final String source, final ClusterStateTaskConfig config,
final Function<ClusterState, ClusterState> executor,
final ClusterStateTaskListener listener) {
final ClusterApplyListener listener) {
if (!lifecycle.started()) {
return;
}
try {
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterStateTaskListener(listener, logger), executor);
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, logger), executor);
if (config.timeout() != null) {
threadPoolExecutor.execute(updateTask, config.timeout(),
() -> threadPool.generic().execute(
Expand Down Expand Up @@ -417,7 +416,7 @@ protected void runTask(UpdateTask task) {
}

if (previousClusterState == newClusterState) {
task.listener.clusterStateProcessed(task.source, newClusterState, newClusterState);
task.listener.onSuccess(task.source);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, task.source);
Expand Down Expand Up @@ -486,7 +485,7 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl

callClusterStateListeners(clusterChangedEvent);

task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
task.listener.onSuccess(task.source);
}

private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
Expand All @@ -511,11 +510,11 @@ private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent)
});
}

private static class SafeClusterStateTaskListener implements ClusterStateTaskListener {
private final ClusterStateTaskListener listener;
private static class SafeClusterApplyListener implements ClusterApplyListener {
private final ClusterApplyListener listener;
private final Logger logger;

SafeClusterStateTaskListener(ClusterStateTaskListener listener, Logger logger) {
SafeClusterApplyListener(ClusterApplyListener listener, Logger logger) {
this.listener = listener;
this.logger = logger;
}
Expand All @@ -532,14 +531,12 @@ public void onFailure(String source, Exception e) {
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
try {
listener.clusterStateProcessed(source, oldState, newState);
listener.onSuccess(source);
} catch (Exception e) {
logger.error(new ParameterizedMessage(
"exception thrown by listener while notifying of cluster state processed from [{}], old cluster state:\n" +
"{}\nnew cluster state:\n{}",
source, oldState, newState), e);
"exception thrown by listener while notifying of cluster state processed from [{}]", source), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,16 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -65,9 +63,9 @@ public synchronized void publish(final ClusterChangedEvent event,
clusterState = event.state();
CountDownLatch latch = new CountDownLatch(1);

ClusterStateTaskListener listener = new ClusterStateTaskListener() {
ClusterApplyListener listener = new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
ackListener.onNodeAck(transportService.getLocalNode(), null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
Expand All @@ -34,12 +33,11 @@
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
Expand Down Expand Up @@ -789,9 +787,9 @@ boolean processNextCommittedClusterState(String reason) {

clusterApplier.onNewClusterState("apply cluster state (from master [" + reason + "])",
this::clusterState,
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
try {
pendingStatesQueue.markAsProcessed(newClusterState);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -135,9 +136,9 @@ public void testClusterStateUpdateLogging() throws Exception {
clusterApplierService.currentTimeOverride = System.nanoTime();
clusterApplierService.runOnApplierThread("test1",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

Expand All @@ -151,9 +152,9 @@ public void onFailure(String source, Exception e) {
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).nanos();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
},
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
fail();
}

Expand All @@ -166,9 +167,9 @@ public void onFailure(String source, Exception e) {
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterApplierService.runOnApplierThread("test3",
currentState -> {},
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

Expand Down Expand Up @@ -216,9 +217,9 @@ public void testLongClusterStateUpdateLogging() throws Exception {
clusterApplierService.currentTimeOverride = System.nanoTime();
clusterApplierService.runOnApplierThread("test1",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
processedFirstTask.countDown();
}
Expand All @@ -234,9 +235,9 @@ public void onFailure(String source, Exception e) {
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).nanos();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
},
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
fail();
}

Expand All @@ -247,9 +248,9 @@ public void onFailure(String source, Exception e) {
});
clusterApplierService.runOnApplierThread("test3",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).nanos(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

Expand All @@ -262,9 +263,9 @@ public void onFailure(String source, Exception e) {
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterApplierService.runOnApplierThread("test4",
currentState -> {},
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

Expand Down Expand Up @@ -340,10 +341,10 @@ public void testClusterStateApplierCantSampleClusterState() throws InterruptedEx

CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

Expand Down Expand Up @@ -390,9 +391,9 @@ public void onTimeout(TimeValue timeout) {

CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
Expand Down Expand Up @@ -72,9 +71,9 @@ public ClusterState.Builder newClusterStateBuilder() {

@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier,
ClusterStateTaskListener listener) {
ClusterApplyListener listener) {
clusterState.set(clusterStateSupplier.get());
listener.clusterStateProcessed(source, clusterState.get(), clusterState.get());
listener.onSuccess(source);
}
});
discovery.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -314,8 +313,8 @@ public ClusterState.Builder newClusterStateBuilder() {
}

@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener) {
listener.clusterStateProcessed(source, clusterStateSupplier.get(), clusterStateSupplier.get());
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener) {
listener.onSuccess(source);
}
};
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
Expand Down
Loading

0 comments on commit 9dbcb4f

Please sign in to comment.