Skip to content

Commit

Permalink
Avoid concurrent snapshot finalization when deleting an initializing …
Browse files Browse the repository at this point in the history
…snapshot

With the current snapshot/restore logic, a newly created snapshot is added by
the SnapshotService.createSnapshot() method as a SnapshotInProgress object in
the cluster state. This snapshot has the INIT state. Once the cluster state
update is processed, the beginSnapshot() method is executed using the SNAPSHOT
thread pool.

The beginSnapshot() method starts the initialization of the snapshot using the
initializeSnapshot() method. This method reads the repository data and then
writes the global metadata file and an index metadata file per index to be
snapshotted. These operations can take some time to be completed (many minutes).

At this stage and if the master node is disconnected the snapshot can be stucked
in INIT state on versions 5.6.4/6.0.0 or lower (pull request elastic#27214 fixed this on
5.6.5/6.0.1 and higher).

If the snapshot is not stucked but the initialization takes some time and the
user decides to abort the snapshot, a delete snapshot request can sneak in. The
 deletion updates the cluster state to check the state of the SnapshotInProgress.
When the snapshot is in INIT, it executes the endSnapshot() method (which returns
immediately) and then the snapshot's state is updated to ABORTED in the cluster
state. The deletion will then listen for the snapshot completion in order to
continue with the deletion of the snapshot.

But before returning, the endSnapshot() method added a new Runnable to the SNAPSHOT
thread pool that forces the finalization of the initializing snapshot. This
finalization writes the snapshot metadata file and updates the index-N file in
the repository.

At this stage two things can potentially be executed concurrently: the initialization
of the snapshot and the finalization of the snapshot. When the initializeSnapshot()
is terminated, the cluster state is updated to start the snapshot and to move it to
the STARTED state (this is before elastic#27931 which prevents an ABORTED snapshot to be
started at all). The snapshot is started and shards start to be snapshotted but they
quickly fail because the snapshot was ABORTED by the deletion. All shards are
reported as FAILED to the master node, which executes endSnapshot() too (using
SnapshotStateExecutor).

Then many things can happen, depending on the execution of tasks by the SNAPSHOT
thread pool and the time taken by each read/write/delete operation by the repository
implementation. Especially on S3, where operations can take time (disconnections,
retries, timeouts) and where the data consistency model allows to read old data or
requires some time for objects to be replicated.

Here are some scenario seen in cluster logs:

a) the snapshot is finalized by the snapshot deletion. Snapshot metadata file exists
in the repository so the future finalization by the snapshot creation will fail with
a "fail to finalize snapshot" message in logs. Deletion process continues.

b) the snapshot is finalized by the snapshot creation. Snapshot metadata file exists
in the repository so the future finalization by the snapshot deletion will fail with
a "fail to finalize snapshot" message in logs. Deletion process continues.

c) both finalizations are executed concurrently, things can fail at different read or
write operations. Shards failures can be lost as well as final snapshot state, depending
on which SnapshotInProgress.Entry is used to finalize the snapshot.

d) the snapshot is finalized by the snapshot deletion, the snapshot in progress is
removed from the cluster state, triggering the execution of the completion listeners.
The deletion process continues and the deleteSnapshotFromRepository() is executed using
the SNAPSHOT thread pool. This method reads the repository data, the snapshot metadata
and the index metadata for all indices included in the snapshot before updated the index-N
 file from the repository. It can also take some time and I think these operations could
potentially be executed concurrently with the finalization of the snapshot by the snapshot
creation, leading to corrupted data.

This commit does not solve all the issues reported here, but it removes the finalization
of the snapshot by the snapshot deletion. This way, the deletion marks the snapshot as
ABORTED in cluster state and waits for the snapshot completion. It is the responsability
of the snapshot execution to detect the abortion and terminates itself correctly. This
avoids concurrent snapshot finalizations and also ordinates the operations: the deletion
aborts the snapshot and waits for the snapshot completion, the creation detects the abortion
and stops by itself and finalizes the snapshot, then the deletion resumes and continues
the deletion process.
  • Loading branch information
tlrx committed Jan 8, 2018
1 parent fd45a46 commit 76806b0
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,13 @@ public ClusterState execute(ClusterState currentState) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.snapshot().equals(snapshot.snapshot()) && entry.state() != State.ABORTED) {
// Replace the snapshot that was just created
if (entry.snapshot().equals(snapshot.snapshot()) == false) {
entries.add(entry);
continue;
}

if (entry.state() != State.ABORTED) {
// Replace the snapshot that was just intialized
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
if (!partial) {
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData());
Expand All @@ -409,11 +414,13 @@ public ClusterState execute(ClusterState currentState) {
}
updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
entries.add(updatedSnapshot);
if (!completed(shards.values())) {
if (completed(shards.values()) == false) {
accepted = true;
}
} else {
entries.add(entry);
failure = "snapshot state changed to " + entry.state() + " during initialization";
updatedSnapshot = entry;
entries.add(updatedSnapshot);
}
}
return ClusterState.builder(currentState)
Expand Down Expand Up @@ -750,6 +757,11 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}
entries.add(updatedSnapshot);
} else if (snapshot.state() == State.INIT && newMaster) {
changed = true;
// Mark the snapshot as aborted as it failed to start from the previous master
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards());
entries.add(updatedSnapshot);

// Clean up the snapshot that failed to start from the old master
deleteSnapshot(snapshot.snapshot(), new DeleteSnapshotListener() {
@Override
Expand Down Expand Up @@ -935,7 +947,7 @@ private Tuple<Set<String>, Set<String>> indicesWithMissingShards(ImmutableOpenMa
*
* @param entry snapshot
*/
void endSnapshot(SnapshotsInProgress.Entry entry) {
void endSnapshot(final SnapshotsInProgress.Entry entry) {
endSnapshot(entry, null);
}

Expand Down Expand Up @@ -1144,24 +1156,26 @@ public ClusterState execute(ClusterState currentState) throws Exception {
} else {
// This snapshot is currently running - stopping shards first
waitForSnapshot = true;
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
if (snapshotEntry.state() == State.STARTED && snapshotEntry.shards() != null) {
// snapshot is currently running - stop started shards
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();

final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;

final State state = snapshotEntry.state();
if (state == State.INIT) {
// snapshot is still initializing, mark it as aborted
shards = snapshotEntry.shards();

} else if (snapshotEntry.state() == State.STARTED) {
// snapshot is started - mark every non completed shard as aborted
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
ShardSnapshotStatus status = shardEntry.value;
if (!status.state().completed()) {
shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED,
"aborted by snapshot deletion"));
} else {
shardsBuilder.put(shardEntry.key, status);
if (status.state().completed() == false) {
status = new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted by snapshot deletion");
}
shardsBuilder.put(shardEntry.key, status);
}
shards = shardsBuilder.build();
} else if (snapshotEntry.state() == State.INIT) {
// snapshot hasn't started yet - end it
shards = snapshotEntry.shards();
endSnapshot(snapshotEntry);

} else {
boolean hasUncompletedShards = false;
// Cleanup in case a node gone missing and snapshot wasn't updated for some reason
Expand All @@ -1178,7 +1192,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes");
return currentState;
} else {
// no shards to wait for - finish the snapshot
// no shards to wait for but a node is gone - this is the only case
// where we force to finish the snapshot
logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately");
shards = snapshotEntry.shards();
endSnapshot(snapshotEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3151,7 +3151,7 @@ public void testSnapshottingWithMissingSequenceNumbers() {
assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(15L));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/27974")
@TestLogging("org.elasticsearch.snapshots:TRACE")
public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
final Client client = client();

Expand All @@ -3163,11 +3163,11 @@ public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
));

createIndex("test-idx");
final int nbDocs = scaledRandomIntBetween(1, 100);
final int nbDocs = scaledRandomIntBetween(100, 500);
for (int i = 0; i < nbDocs; i++) {
index("test-idx", "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
flushAndRefresh("test-idx");
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs));

// Create a snapshot
Expand Down

0 comments on commit 76806b0

Please sign in to comment.