From d4538df89320c09469d9d76f09dbfe3af79b36c2 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 3 Apr 2018 11:57:58 +0200 Subject: [PATCH] Improve exception handling on TransportMasterNodeAction (#29314) We have seen exceptions bubble up to the uncaught exception handler. Checking the blocks can lead for example to IndexNotFoundException when the indices are resolved. In order to make TransportMasterNodeAction more resilient against such expected exceptions, this code change wraps the execution of doStart() into a try catch and informs the listener in case of failures. --- .../master/TransportMasterNodeAction.java | 120 ++++++++++-------- .../TransportMasterNodeActionTests.java | 33 +++++ 2 files changed, 98 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index f4a26e723dc0a..42d7da118460e 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -145,69 +145,79 @@ public void start() { } protected void doStart(ClusterState clusterState) { - final Predicate masterChangePredicate = MasterNodeChangePredicate.build(clusterState); - final DiscoveryNodes nodes = clusterState.nodes(); - if (nodes.isLocalNodeElectedMaster() || localExecute(request)) { - // check for block, if blocked, retry, else, execute locally - final ClusterBlockException blockException = checkBlock(request, clusterState); - if (blockException != null) { - if (!blockException.retryable()) { - listener.onFailure(blockException); - } else { - logger.trace("can't execute due to a cluster block, retrying", blockException); - retry(blockException, newState -> { - ClusterBlockException newException = checkBlock(request, newState); - return (newException == null || !newException.retryable()); - }); - } - } else { - ActionListener delegate = new ActionListener() { - @Override - public void onResponse(Response response) { - listener.onResponse(response); + try { + final Predicate masterChangePredicate = MasterNodeChangePredicate.build(clusterState); + final DiscoveryNodes nodes = clusterState.nodes(); + if (nodes.isLocalNodeElectedMaster() || localExecute(request)) { + // check for block, if blocked, retry, else, execute locally + final ClusterBlockException blockException = checkBlock(request, clusterState); + if (blockException != null) { + if (!blockException.retryable()) { + listener.onFailure(blockException); + } else { + logger.trace("can't execute due to a cluster block, retrying", blockException); + retry(blockException, newState -> { + try { + ClusterBlockException newException = checkBlock(request, newState); + return (newException == null || !newException.retryable()); + } catch (Exception e) { + // accept state as block will be rechecked by doStart() and listener.onFailure() then called + logger.trace("exception occurred during cluster block checking, accepting state", e); + return true; + } + }); } + } else { + ActionListener delegate = new ActionListener() { + @Override + public void onResponse(Response response) { + listener.onResponse(response); + } - @Override - public void onFailure(Exception t) { - if (t instanceof Discovery.FailedToCommitClusterStateException + @Override + public void onFailure(Exception t) { + if (t instanceof Discovery.FailedToCommitClusterStateException || (t instanceof NotMasterException)) { - logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t); - retry(t, masterChangePredicate); - } else { - listener.onFailure(t); + logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t); + retry(t, masterChangePredicate); + } else { + listener.onFailure(t); + } } - } - }; - threadPool.executor(executor).execute(new ActionRunnable(delegate) { - @Override - protected void doRun() throws Exception { - masterOperation(task, request, clusterState, delegate); - } - }); - } - } else { - if (nodes.getMasterNode() == null) { - logger.debug("no known master node, scheduling a retry"); - retry(null, masterChangePredicate); + }; + threadPool.executor(executor).execute(new ActionRunnable(delegate) { + @Override + protected void doRun() throws Exception { + masterOperation(task, request, clusterState, delegate); + } + }); + } } else { - DiscoveryNode masterNode = nodes.getMasterNode(); - final String actionName = getMasterActionName(masterNode); - transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler(listener, - TransportMasterNodeAction.this::newResponse) { - @Override - public void handleException(final TransportException exp) { - Throwable cause = exp.unwrapCause(); - if (cause instanceof ConnectTransportException) { - // we want to retry here a bit to see if a new master is elected - logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]", + if (nodes.getMasterNode() == null) { + logger.debug("no known master node, scheduling a retry"); + retry(null, masterChangePredicate); + } else { + DiscoveryNode masterNode = nodes.getMasterNode(); + final String actionName = getMasterActionName(masterNode); + transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler(listener, + TransportMasterNodeAction.this::newResponse) { + @Override + public void handleException(final TransportException exp) { + Throwable cause = exp.unwrapCause(); + if (cause instanceof ConnectTransportException) { + // we want to retry here a bit to see if a new master is elected + logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]", actionName, nodes.getMasterNode(), exp.getDetailedMessage()); - retry(cause, masterChangePredicate); - } else { - listener.onFailure(exp); + retry(cause, masterChangePredicate); + } else { + listener.onFailure(exp); + } } - } - }); + }); + } } + } catch (Exception e) { + listener.onFailure(e); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index de65d2a3f9240..f2b18a8c8f561 100644 --- a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -242,6 +242,39 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) } } + public void testCheckBlockThrowsException() throws InterruptedException { + boolean throwExceptionOnRetry = randomBoolean(); + Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(60)); + PlainActionFuture listener = new PlainActionFuture<>(); + + ClusterBlock block = new ClusterBlock(1, "", true, true, + false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL); + ClusterState stateWithBlock = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes)) + .blocks(ClusterBlocks.builder().addGlobalBlock(block)).build(); + setState(clusterService, stateWithBlock); + + new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + Set blocks = state.blocks().global(); + if (throwExceptionOnRetry == false || blocks.isEmpty()) { + throw new RuntimeException("checkBlock has thrown exception"); + } + return new ClusterBlockException(blocks); + + } + }.execute(request, listener); + + if (throwExceptionOnRetry == false) { + assertListenerThrows("checkBlock has thrown exception", listener, RuntimeException.class); + } else { + assertFalse(listener.isDone()); + setState(clusterService, ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes)) + .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build()); + assertListenerThrows("checkBlock has thrown exception", listener, RuntimeException.class); + } + } + public void testForceLocalOperation() throws ExecutionException, InterruptedException { Request request = new Request(); PlainActionFuture listener = new PlainActionFuture<>();