Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve exception handling on TransportMasterNodeAction #29314

Merged
merged 1 commit into from
Apr 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,69 +145,79 @@ public void start() {
}

protected void doStart(ClusterState clusterState) {
final Predicate<ClusterState> masterChangePredicate = MasterNodeChangePredicate.build(clusterState);
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
// check for block, if blocked, retry, else, execute locally
final ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException != null) {
if (!blockException.retryable()) {
listener.onFailure(blockException);
} else {
logger.trace("can't execute due to a cluster block, retrying", blockException);
retry(blockException, newState -> {
ClusterBlockException newException = checkBlock(request, newState);
return (newException == null || !newException.retryable());
});
}
} else {
ActionListener<Response> delegate = new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
listener.onResponse(response);
try {
final Predicate<ClusterState> masterChangePredicate = MasterNodeChangePredicate.build(clusterState);
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
// check for block, if blocked, retry, else, execute locally
final ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException != null) {
if (!blockException.retryable()) {
listener.onFailure(blockException);
} else {
logger.trace("can't execute due to a cluster block, retrying", blockException);
retry(blockException, newState -> {
try {
ClusterBlockException newException = checkBlock(request, newState);
return (newException == null || !newException.retryable());
} catch (Exception e) {
// accept state as block will be rechecked by doStart() and listener.onFailure() then called
logger.trace("exception occurred during cluster block checking, accepting state", e);
return true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it might be tricky to see in this diff what's actually changed: The three lines above here have been added.

}
});
}
} else {
ActionListener<Response> delegate = new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
listener.onResponse(response);
}

@Override
public void onFailure(Exception t) {
if (t instanceof Discovery.FailedToCommitClusterStateException
@Override
public void onFailure(Exception t) {
if (t instanceof Discovery.FailedToCommitClusterStateException
|| (t instanceof NotMasterException)) {
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
retry(t, masterChangePredicate);
} else {
listener.onFailure(t);
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
retry(t, masterChangePredicate);
} else {
listener.onFailure(t);
}
}
}
};
threadPool.executor(executor).execute(new ActionRunnable(delegate) {
@Override
protected void doRun() throws Exception {
masterOperation(task, request, clusterState, delegate);
}
});
}
} else {
if (nodes.getMasterNode() == null) {
logger.debug("no known master node, scheduling a retry");
retry(null, masterChangePredicate);
};
threadPool.executor(executor).execute(new ActionRunnable(delegate) {
@Override
protected void doRun() throws Exception {
masterOperation(task, request, clusterState, delegate);
}
});
}
} else {
DiscoveryNode masterNode = nodes.getMasterNode();
final String actionName = getMasterActionName(masterNode);
transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler<Response>(listener,
TransportMasterNodeAction.this::newResponse) {
@Override
public void handleException(final TransportException exp) {
Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
if (nodes.getMasterNode() == null) {
logger.debug("no known master node, scheduling a retry");
retry(null, masterChangePredicate);
} else {
DiscoveryNode masterNode = nodes.getMasterNode();
final String actionName = getMasterActionName(masterNode);
transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler<Response>(listener,
TransportMasterNodeAction.this::newResponse) {
@Override
public void handleException(final TransportException exp) {
Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
actionName, nodes.getMasterNode(), exp.getDetailedMessage());
retry(cause, masterChangePredicate);
} else {
listener.onFailure(exp);
retry(cause, masterChangePredicate);
} else {
listener.onFailure(exp);
}
}
}
});
});
}
}
} catch (Exception e) {
listener.onFailure(e);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it might be tricky to see in this diff what's actually changed: The two lines above here have been added.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,39 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
}
}

public void testCheckBlockThrowsException() throws InterruptedException {
boolean throwExceptionOnRetry = randomBoolean();
Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(60));
PlainActionFuture<Response> listener = new PlainActionFuture<>();

ClusterBlock block = new ClusterBlock(1, "", true, true,
false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
ClusterState stateWithBlock = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
setState(clusterService, stateWithBlock);

new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
Set<ClusterBlock> blocks = state.blocks().global();
if (throwExceptionOnRetry == false || blocks.isEmpty()) {
throw new RuntimeException("checkBlock has thrown exception");
}
return new ClusterBlockException(blocks);

}
}.execute(request, listener);

if (throwExceptionOnRetry == false) {
assertListenerThrows("checkBlock has thrown exception", listener, RuntimeException.class);
} else {
assertFalse(listener.isDone());
setState(clusterService, ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build());
assertListenerThrows("checkBlock has thrown exception", listener, RuntimeException.class);
}
}

public void testForceLocalOperation() throws ExecutionException, InterruptedException {
Request request = new Request();
PlainActionFuture<Response> listener = new PlainActionFuture<>();
Expand Down