diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index b39ebd51f2bc8..58a50b71f14cc 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.tasks.Task; @@ -81,47 +82,27 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests public void resync(final IndexShard indexShard, final ActionListener listener) { ActionListener resyncListener = null; + Translog.Snapshot snapshot = null; try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; - Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); - resyncListener = new ActionListener() { - @Override - public void onResponse(final ResyncTask resyncTask) { - try { - snapshot.close(); - listener.onResponse(resyncTask); - } catch (final Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(final Exception e) { - try { - snapshot.close(); - } catch (final Exception inner) { - e.addSuppressed(inner); - } finally { - listener.onFailure(e); - } - } - }; - ShardId shardId = indexShard.shardId(); - + final ShardId shardId = indexShard.shardId(); // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible // Also fail the resync early if the shard is shutting down - Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { - + snapshot = new Translog.Snapshot() { + final AtomicBoolean closed = new AtomicBoolean(); // closed once + final Translog.Snapshot originalSnapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); @Override public synchronized void close() throws IOException { - snapshot.close(); + if (closed.compareAndSet(false, true)) { + originalSnapshot.close(); + } } @Override public synchronized int totalOperations() { - return snapshot.totalOperations(); + return originalSnapshot.totalOperations(); } @Override @@ -132,12 +113,41 @@ public synchronized Translog.Operation next() throws IOException { } else { assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state; } - return snapshot.next(); + return originalSnapshot.next(); } }; + Translog.Snapshot wrappedSnapshot = snapshot; + resyncListener = new ActionListener() { + @Override + public void onResponse(final ResyncTask resyncTask) { + try { + wrappedSnapshot.close(); + listener.onResponse(resyncTask); + } catch (final Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(final Exception e) { + try { + wrappedSnapshot.close(); + } catch (final Exception inner) { + e.addSuppressed(inner); + } finally { + listener.onFailure(e); + } + } + }; + resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot, startingSeqNo, maxSeqNo, resyncListener); } catch (Exception e) { + try { + IOUtils.close(snapshot); + } catch (IOException inner) { + e.addSuppressed(inner); + } if (resyncListener != null) { resyncListener.onFailure(e); } else { diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index b290f4d45597b..48286a33db61a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -123,12 +123,10 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { public void testSyncerOnClosingShard() throws Exception { IndexShard shard = newStartedShard(true); AtomicBoolean syncActionCalled = new AtomicBoolean(); - CountDownLatch syncCalledLatch = new CountDownLatch(1); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { logger.info("Sending off {} operations", request.getOperations().length); syncActionCalled.set(true); - syncCalledLatch.countDown(); threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); }; PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, @@ -147,13 +145,27 @@ public void testSyncerOnClosingShard() throws Exception { shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId), new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet()); - PlainActionFuture fut = new PlainActionFuture<>(); - threadPool.generic().execute(() -> { - try { - syncer.resync(shard, fut); - } catch (AlreadyClosedException ace) { - fut.onFailure(ace); + CountDownLatch syncCalledLatch = new CountDownLatch(1); + PlainActionFuture fut = new PlainActionFuture() { + @Override + public void onFailure(Exception e) { + try { + super.onFailure(e); + }finally { + syncCalledLatch.countDown(); + } + } + @Override + public void onResponse(PrimaryReplicaSyncer.ResyncTask result) { + try { + super.onResponse(result); + }finally { + syncCalledLatch.countDown(); + } } + }; + threadPool.generic().execute(() -> { + syncer.resync(shard, fut); }); if (randomBoolean()) { syncCalledLatch.await();