diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index 1a1d9beb85613..6fdc0b43aaa2d 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -44,7 +44,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeMetaData; @@ -291,7 +290,9 @@ public void setCurrentTerm(long currentTerm) { } else { logger.trace("queuing term update (setting term to {})", currentTerm); newCurrentTermQueued = true; - scheduleUpdate(); + if (newStateQueued == false) { + scheduleUpdate(); + } } } } @@ -305,55 +306,57 @@ public void setLastAcceptedState(ClusterState clusterState) { } else { logger.trace("queuing cluster state update (setting cluster state to {})", clusterState.version()); newStateQueued = true; - scheduleUpdate(); + if (newCurrentTermQueued == false) { + scheduleUpdate(); + } } } } private void scheduleUpdate() { assert Thread.holdsLock(mutex); - try { - threadPoolExecutor.execute(new AbstractRunnable() { + assert threadPoolExecutor.getQueue().isEmpty() : "threadPoolExecutor queue not empty"; + threadPoolExecutor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - logger.error("Exception occurred when storing new meta data", e); - } + @Override + public void onFailure(Exception e) { + logger.error("Exception occurred when storing new meta data", e); + } - @Override - protected void doRun() { - final Long term; - final ClusterState clusterState; - synchronized (mutex) { - if (newCurrentTermQueued) { - term = getCurrentTerm(); - newCurrentTermQueued = false; - } else { - term = null; - } - if (newStateQueued) { - clusterState = getLastAcceptedState(); - newStateQueued = false; - } else { - clusterState = null; - } - } - // write current term before last accepted state so that it is never below term in last accepted state - if (term != null) { - persistedState.setCurrentTerm(term); + @Override + public void onRejection(Exception e) { + assert threadPoolExecutor.isShutdown() : "only expect rejections when shutting down"; + } + + @Override + protected void doRun() { + final Long term; + final ClusterState clusterState; + synchronized (mutex) { + if (newCurrentTermQueued) { + term = getCurrentTerm(); + logger.trace("resetting newCurrentTermQueued"); + newCurrentTermQueued = false; + } else { + term = null; } - if (clusterState != null) { - persistedState.setLastAcceptedState(resetVotingConfiguration(clusterState)); + if (newStateQueued) { + clusterState = getLastAcceptedState(); + logger.trace("resetting newStateQueued"); + newStateQueued = false; + } else { + clusterState = null; } } - }); - } catch (EsRejectedExecutionException e) { - // ignore cases where we are shutting down..., there is really nothing interesting to be done here... - if (threadPoolExecutor.isShutdown() == false) { - assert false : "only expect rejections when shutting down"; - throw e; + // write current term before last accepted state so that it is never below term in last accepted state + if (term != null) { + persistedState.setCurrentTerm(term); + } + if (clusterState != null) { + persistedState.setLastAcceptedState(resetVotingConfiguration(clusterState)); + } } - } + }); } static final CoordinationMetaData.VotingConfiguration staleStateConfiguration =