Skip to content

Commit

Permalink
Line-up snapshot listeners resolution to avoid double calls (elastic#…
Browse files Browse the repository at this point in the history
…102439)

Snapshot listeners can be concurrently resolved from two different
*clusterApplierService* and *masterService* task threads. If a listener
is a **single action** listener, meaning that it has to be resolved only
once, the following traces can occur, see elastic#101876

```
at org.elasticsearch.action.ActionListener$4.assertFirstRun(ActionListener.java:324)
at org.elasticsearch.action.ActionListener$4.onFailure(ActionListener.java:335)
```

Fix

Line up *resolve listener and remove it from tracking collection*
operations over snapshot listeners in order to avoid double invocation
by separate threads

Fix for elastic#101876
  • Loading branch information
volodk85 authored Nov 27, 2023
1 parent cffb8c2 commit 3cf55db
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,6 @@ public void testEquivalentDeletesAreDeduplicated() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/101876")
public void testMasterFailoverOnFinalizationLoop() throws Exception {
internalCluster().startMasterOnlyNodes(3);
final String dataNode = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,23 +827,34 @@ public void applyClusterState(ClusterChangedEvent event) {
event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)
);
} else {
if (snapshotCompletionListeners.isEmpty() == false) {
// We have snapshot listeners but are not the master any more. Fail all waiting listeners except for those that already
// have their snapshots finalizing (those that are already finalizing will fail on their own from to update the cluster
// state).
for (Snapshot snapshot : Set.copyOf(snapshotCompletionListeners.keySet())) {
final List<Runnable> readyToResolveListeners = new ArrayList<>();
// line-up mutating concurrent operations which can be in form of clusterApplierService and masterService tasks
// to completion and deletion listeners, see #failAllListenersOnMasterFailOver
synchronized (currentlyFinalizing) {
// We have snapshot listeners but are not the master anymore. Fail all waiting listeners except for those that
// already have their snapshots finalizing (those that are already finalizing will fail on their own from to update
// the cluster state).
for (final Snapshot snapshot : snapshotCompletionListeners.keySet()) {
if (endingSnapshots.add(snapshot)) {
failSnapshotCompletionListeners(snapshot, new SnapshotException(snapshot, "no longer master"));
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "no longer master"),
readyToResolveListeners::add
);
assert endingSnapshots.contains(snapshot) == false : snapshot;
}
}
}
if (snapshotDeletionListeners.isEmpty() == false) {
final Exception e = new NotMasterException("no longer master");
for (String delete : Set.copyOf(snapshotDeletionListeners.keySet())) {
failListenersIgnoringException(snapshotDeletionListeners.remove(delete), e);
if (snapshotDeletionListeners.isEmpty() == false) {
final Exception cause = new NotMasterException("no longer master");
for (final Iterator<List<ActionListener<Void>>> it = snapshotDeletionListeners.values().iterator(); it.hasNext();) {
final List<ActionListener<Void>> listeners = it.next();
readyToResolveListeners.add(() -> failListenersIgnoringException(listeners, cause));
it.remove();
}
}
}
// fail snapshot listeners outside mutex
readyToResolveListeners.forEach(Runnable::run);
}
} catch (Exception e) {
assert false : new AssertionError(e);
Expand Down Expand Up @@ -1528,7 +1539,8 @@ private void handleFinalizationFailure(Exception e, Snapshot snapshot, Repositor
logger.debug(() -> "[" + snapshot + "] failed to update cluster state during snapshot finalization", e);
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "Failed to update cluster state during snapshot finalization", e)
new SnapshotException(snapshot, "Failed to update cluster state during snapshot finalization", e),
Runnable::run
);
failAllListenersOnMasterFailOver(e);
} else {
Expand Down Expand Up @@ -1847,14 +1859,15 @@ public void onFailure(Exception e) {
);
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "Failed to remove snapshot from cluster state", e)
new SnapshotException(snapshot, "Failed to remove snapshot from cluster state", e),
Runnable::run
);
failAllListenersOnMasterFailOver(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
failSnapshotCompletionListeners(snapshot, failure);
failSnapshotCompletionListeners(snapshot, failure, Runnable::run);
if (repositoryData != null) {
runNextQueuedOperation(repositoryData, snapshot.getRepository(), true);
}
Expand Down Expand Up @@ -1897,8 +1910,9 @@ private static SnapshotDeletionsInProgress deletionsWithoutSnapshots(
return changed ? SnapshotDeletionsInProgress.of(updatedEntries) : null;
}

private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) {
failListenersIgnoringException(endAndGetListenersToResolve(snapshot), e);
private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e, Consumer<Runnable> failingListenersConsumer) {
final List<ActionListener<SnapshotInfo>> listeners = endAndGetListenersToResolve(snapshot);
failingListenersConsumer.accept(() -> failListenersIgnoringException(listeners, e));
assert repositoryOperations.assertNotQueued(snapshot);
}

Expand Down Expand Up @@ -2093,7 +2107,11 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
logger.info("snapshots {} aborted", completedNoCleanup);
}
for (Snapshot snapshot : completedNoCleanup) {
failSnapshotCompletionListeners(snapshot, new SnapshotException(snapshot, SnapshotsInProgress.ABORTED_FAILURE_TEXT));
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, SnapshotsInProgress.ABORTED_FAILURE_TEXT),
Runnable::run
);
}
if (newDelete == null) {
listener.onResponse(null);
Expand Down Expand Up @@ -2466,17 +2484,22 @@ protected void handleListeners(List<ActionListener<Void>> deleteListeners) {
*/
private void failAllListenersOnMasterFailOver(Exception e) {
logger.debug("Failing all snapshot operation listeners because this node is not master any longer", e);
final List<Runnable> readyToResolveListeners = new ArrayList<>();
synchronized (currentlyFinalizing) {
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
repositoryOperations.clear();
for (Snapshot snapshot : Set.copyOf(snapshotCompletionListeners.keySet())) {
failSnapshotCompletionListeners(snapshot, new SnapshotException(snapshot, "no longer master"));
for (final Snapshot snapshot : snapshotCompletionListeners.keySet()) {
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "no longer master"),
readyToResolveListeners::add
);
}
final Exception wrapped = new RepositoryException("_all", "Failed to update cluster state during repository operation", e);
for (Iterator<List<ActionListener<Void>>> iterator = snapshotDeletionListeners.values().iterator(); iterator.hasNext();) {
final List<ActionListener<Void>> listeners = iterator.next();
iterator.remove();
failListenersIgnoringException(listeners, wrapped);
for (final Iterator<List<ActionListener<Void>>> it = snapshotDeletionListeners.values().iterator(); it.hasNext();) {
final List<ActionListener<Void>> listeners = it.next();
readyToResolveListeners.add(() -> failListenersIgnoringException(listeners, wrapped));
it.remove();
}
assert snapshotDeletionListeners.isEmpty() : "No new listeners should have been added but saw " + snapshotDeletionListeners;
} else {
Expand All @@ -2486,6 +2509,8 @@ private void failAllListenersOnMasterFailOver(Exception e) {
}
currentlyFinalizing.clear();
}
// fail snapshot listeners outside mutex
readyToResolveListeners.forEach(Runnable::run);
}

/**
Expand Down Expand Up @@ -2539,8 +2564,13 @@ protected SnapshotDeletionsInProgress filterDeletions(SnapshotDeletionsInProgres
@Override
public final void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
repositoryOperations.finishDeletion(deleteEntry.uuid());
final List<ActionListener<Void>> deleteListeners = snapshotDeletionListeners.remove(deleteEntry.uuid());
handleListeners(deleteListeners);
final List<Runnable> readyToResolveListeners = new ArrayList<>();
synchronized (currentlyFinalizing) {
final List<ActionListener<Void>> deleteListeners = snapshotDeletionListeners.remove(deleteEntry.uuid());
readyToResolveListeners.add(() -> handleListeners(deleteListeners));
}
// resolve listeners outside mutex
readyToResolveListeners.forEach(Runnable::run);
if (newFinalizations.isEmpty()) {
if (readyDeletions.isEmpty()) {
leaveRepoLoop(deleteEntry.repository());
Expand Down Expand Up @@ -3546,6 +3576,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
() -> format("Removed all snapshot tasks for repository [%s] from cluster state, now failing listeners", repository),
failure
);
final List<Runnable> readyToResolveListeners = new ArrayList<>();
synchronized (currentlyFinalizing) {
Tuple<Snapshot, Metadata> finalization;
while ((finalization = repositoryOperations.pollFinalization(repository)) != null) {
Expand All @@ -3554,13 +3585,16 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
}
leaveRepoLoop(repository);
for (Snapshot snapshot : snapshotsToFail) {
failSnapshotCompletionListeners(snapshot, failure);
failSnapshotCompletionListeners(snapshot, failure, readyToResolveListeners::add);
}
for (String delete : deletionsToFail) {
failListenersIgnoringException(snapshotDeletionListeners.remove(delete), failure);
final List<ActionListener<Void>> listeners = snapshotDeletionListeners.remove(delete);
readyToResolveListeners.add(() -> failListenersIgnoringException(listeners, failure));
repositoryOperations.finishDeletion(delete);
}
}
// fail snapshot listeners outside mutex
readyToResolveListeners.forEach(Runnable::run);
}
}

Expand Down

0 comments on commit 3cf55db

Please sign in to comment.