diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index f4a26e723dc0a..42d7da118460e 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -145,69 +145,79 @@ public void start() { } protected void doStart(ClusterState clusterState) { - final Predicate masterChangePredicate = MasterNodeChangePredicate.build(clusterState); - final DiscoveryNodes nodes = clusterState.nodes(); - if (nodes.isLocalNodeElectedMaster() || localExecute(request)) { - // check for block, if blocked, retry, else, execute locally - final ClusterBlockException blockException = checkBlock(request, clusterState); - if (blockException != null) { - if (!blockException.retryable()) { - listener.onFailure(blockException); - } else { - logger.trace("can't execute due to a cluster block, retrying", blockException); - retry(blockException, newState -> { - ClusterBlockException newException = checkBlock(request, newState); - return (newException == null || !newException.retryable()); - }); - } - } else { - ActionListener delegate = new ActionListener() { - @Override - public void onResponse(Response response) { - listener.onResponse(response); + try { + final Predicate masterChangePredicate = MasterNodeChangePredicate.build(clusterState); + final DiscoveryNodes nodes = clusterState.nodes(); + if (nodes.isLocalNodeElectedMaster() || localExecute(request)) { + // check for block, if blocked, retry, else, execute locally + final ClusterBlockException blockException = checkBlock(request, clusterState); + if (blockException != null) { + if (!blockException.retryable()) { + listener.onFailure(blockException); + } else { + logger.trace("can't execute due to a cluster block, retrying", blockException); + retry(blockException, newState -> { + try { + ClusterBlockException newException = checkBlock(request, newState); + return (newException == null || !newException.retryable()); + } catch (Exception e) { + // accept state as block will be rechecked by doStart() and listener.onFailure() then called + logger.trace("exception occurred during cluster block checking, accepting state", e); + return true; + } + }); } + } else { + ActionListener delegate = new ActionListener() { + @Override + public void onResponse(Response response) { + listener.onResponse(response); + } - @Override - public void onFailure(Exception t) { - if (t instanceof Discovery.FailedToCommitClusterStateException + @Override + public void onFailure(Exception t) { + if (t instanceof Discovery.FailedToCommitClusterStateException || (t instanceof NotMasterException)) { - logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t); - retry(t, masterChangePredicate); - } else { - listener.onFailure(t); + logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t); + retry(t, masterChangePredicate); + } else { + listener.onFailure(t); + } } - } - }; - threadPool.executor(executor).execute(new ActionRunnable(delegate) { - @Override - protected void doRun() throws Exception { - masterOperation(task, request, clusterState, delegate); - } - }); - } - } else { - if (nodes.getMasterNode() == null) { - logger.debug("no known master node, scheduling a retry"); - retry(null, masterChangePredicate); + }; + threadPool.executor(executor).execute(new ActionRunnable(delegate) { + @Override + protected void doRun() throws Exception { + masterOperation(task, request, clusterState, delegate); + } + }); + } } else { - DiscoveryNode masterNode = nodes.getMasterNode(); - final String actionName = getMasterActionName(masterNode); - transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler(listener, - TransportMasterNodeAction.this::newResponse) { - @Override - public void handleException(final TransportException exp) { - Throwable cause = exp.unwrapCause(); - if (cause instanceof ConnectTransportException) { - // we want to retry here a bit to see if a new master is elected - logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]", + if (nodes.getMasterNode() == null) { + logger.debug("no known master node, scheduling a retry"); + retry(null, masterChangePredicate); + } else { + DiscoveryNode masterNode = nodes.getMasterNode(); + final String actionName = getMasterActionName(masterNode); + transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler(listener, + TransportMasterNodeAction.this::newResponse) { + @Override + public void handleException(final TransportException exp) { + Throwable cause = exp.unwrapCause(); + if (cause instanceof ConnectTransportException) { + // we want to retry here a bit to see if a new master is elected + logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]", actionName, nodes.getMasterNode(), exp.getDetailedMessage()); - retry(cause, masterChangePredicate); - } else { - listener.onFailure(exp); + retry(cause, masterChangePredicate); + } else { + listener.onFailure(exp); + } } - } - }); + }); + } } + } catch (Exception e) { + listener.onFailure(e); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index de65d2a3f9240..f2b18a8c8f561 100644 --- a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -242,6 +242,39 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) } } + public void testCheckBlockThrowsException() throws InterruptedException { + boolean throwExceptionOnRetry = randomBoolean(); + Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(60)); + PlainActionFuture listener = new PlainActionFuture<>(); + + ClusterBlock block = new ClusterBlock(1, "", true, true, + false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL); + ClusterState stateWithBlock = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes)) + .blocks(ClusterBlocks.builder().addGlobalBlock(block)).build(); + setState(clusterService, stateWithBlock); + + new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + Set blocks = state.blocks().global(); + if (throwExceptionOnRetry == false || blocks.isEmpty()) { + throw new RuntimeException("checkBlock has thrown exception"); + } + return new ClusterBlockException(blocks); + + } + }.execute(request, listener); + + if (throwExceptionOnRetry == false) { + assertListenerThrows("checkBlock has thrown exception", listener, RuntimeException.class); + } else { + assertFalse(listener.isDone()); + setState(clusterService, ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes)) + .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build()); + assertListenerThrows("checkBlock has thrown exception", listener, RuntimeException.class); + } + } + public void testForceLocalOperation() throws ExecutionException, InterruptedException { Request request = new Request(); PlainActionFuture listener = new PlainActionFuture<>();