From 2cd685a879928516189e387170105f4626556cd2 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 13 Jul 2018 12:08:20 -0400 Subject: [PATCH 1/4] Ensure to release translog snapshot in primary-replica resync We create a translog snapshot inside the resync method, and that snapshot will be closed by the resync listener. However, if the resync method throws an exception before the resync listener is initialized, the translog snapshot won't be released. Closes #32030 --- .../index/shard/PrimaryReplicaSyncer.java | 70 +++++++++++-------- .../shard/PrimaryReplicaSyncerTests.java | 28 +++++--- 2 files changed, 60 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index b39ebd51f2bc8..58a50b71f14cc 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.tasks.Task; @@ -81,47 +82,27 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests public void resync(final IndexShard indexShard, final ActionListener listener) { ActionListener resyncListener = null; + Translog.Snapshot snapshot = null; try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; - Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); - resyncListener = new ActionListener() { - @Override - public void onResponse(final ResyncTask resyncTask) { - try { - snapshot.close(); - listener.onResponse(resyncTask); - } catch (final Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(final Exception e) { - try { - snapshot.close(); - } catch (final Exception inner) { - e.addSuppressed(inner); - } finally { - listener.onFailure(e); - } - } - }; - ShardId shardId = indexShard.shardId(); - + final ShardId shardId = indexShard.shardId(); // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible // Also fail the resync early if the shard is shutting down - Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { - + snapshot = new Translog.Snapshot() { + final AtomicBoolean closed = new AtomicBoolean(); // closed once + final Translog.Snapshot originalSnapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); @Override public synchronized void close() throws IOException { - snapshot.close(); + if (closed.compareAndSet(false, true)) { + originalSnapshot.close(); + } } @Override public synchronized int totalOperations() { - return snapshot.totalOperations(); + return originalSnapshot.totalOperations(); } @Override @@ -132,12 +113,41 @@ public synchronized Translog.Operation next() throws IOException { } else { assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state; } - return snapshot.next(); + return originalSnapshot.next(); } }; + Translog.Snapshot wrappedSnapshot = snapshot; + resyncListener = new ActionListener() { + @Override + public void onResponse(final ResyncTask resyncTask) { + try { + wrappedSnapshot.close(); + listener.onResponse(resyncTask); + } catch (final Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(final Exception e) { + try { + wrappedSnapshot.close(); + } catch (final Exception inner) { + e.addSuppressed(inner); + } finally { + listener.onFailure(e); + } + } + }; + resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot, startingSeqNo, maxSeqNo, resyncListener); } catch (Exception e) { + try { + IOUtils.close(snapshot); + } catch (IOException inner) { + e.addSuppressed(inner); + } if (resyncListener != null) { resyncListener.onFailure(e); } else { diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index b290f4d45597b..48286a33db61a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -123,12 +123,10 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { public void testSyncerOnClosingShard() throws Exception { IndexShard shard = newStartedShard(true); AtomicBoolean syncActionCalled = new AtomicBoolean(); - CountDownLatch syncCalledLatch = new CountDownLatch(1); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { logger.info("Sending off {} operations", request.getOperations().length); syncActionCalled.set(true); - syncCalledLatch.countDown(); threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); }; PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, @@ -147,13 +145,27 @@ public void testSyncerOnClosingShard() throws Exception { shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId), new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet()); - PlainActionFuture fut = new PlainActionFuture<>(); - threadPool.generic().execute(() -> { - try { - syncer.resync(shard, fut); - } catch (AlreadyClosedException ace) { - fut.onFailure(ace); + CountDownLatch syncCalledLatch = new CountDownLatch(1); + PlainActionFuture fut = new PlainActionFuture() { + @Override + public void onFailure(Exception e) { + try { + super.onFailure(e); + }finally { + syncCalledLatch.countDown(); + } + } + @Override + public void onResponse(PrimaryReplicaSyncer.ResyncTask result) { + try { + super.onResponse(result); + }finally { + syncCalledLatch.countDown(); + } } + }; + threadPool.generic().execute(() -> { + syncer.resync(shard, fut); }); if (randomBoolean()) { syncCalledLatch.await(); From d3135c6cef2a31d7a96493880945033a7c64b3a3 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 13 Jul 2018 15:12:40 -0400 Subject: [PATCH 2/4] translog#snapshot is closed once already --- .../index/shard/PrimaryReplicaSyncer.java | 22 ++++++++----------- .../index/translog/TranslogTests.java | 18 +++++++++++++++ 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index 58a50b71f14cc..477fe0470ca1d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -82,7 +82,7 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests public void resync(final IndexShard indexShard, final ActionListener listener) { ActionListener resyncListener = null; - Translog.Snapshot snapshot = null; + Translog.Snapshot wrappedSnapshot = null; try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); @@ -90,19 +90,16 @@ public void resync(final IndexShard indexShard, final ActionListener // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible // Also fail the resync early if the shard is shutting down - snapshot = new Translog.Snapshot() { - final AtomicBoolean closed = new AtomicBoolean(); // closed once - final Translog.Snapshot originalSnapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); + Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); + wrappedSnapshot = new Translog.Snapshot() { @Override public synchronized void close() throws IOException { - if (closed.compareAndSet(false, true)) { - originalSnapshot.close(); - } + snapshot.close(); } @Override public synchronized int totalOperations() { - return originalSnapshot.totalOperations(); + return snapshot.totalOperations(); } @Override @@ -113,15 +110,14 @@ public synchronized Translog.Operation next() throws IOException { } else { assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state; } - return originalSnapshot.next(); + return snapshot.next(); } }; - Translog.Snapshot wrappedSnapshot = snapshot; resyncListener = new ActionListener() { @Override public void onResponse(final ResyncTask resyncTask) { try { - wrappedSnapshot.close(); + snapshot.close(); listener.onResponse(resyncTask); } catch (final Exception e) { onFailure(e); @@ -131,7 +127,7 @@ public void onResponse(final ResyncTask resyncTask) { @Override public void onFailure(final Exception e) { try { - wrappedSnapshot.close(); + snapshot.close(); } catch (final Exception inner) { e.addSuppressed(inner); } finally { @@ -144,7 +140,7 @@ public void onFailure(final Exception e) { startingSeqNo, maxSeqNo, resyncListener); } catch (Exception e) { try { - IOUtils.close(snapshot); + IOUtils.close(wrappedSnapshot); } catch (IOException inner) { e.addSuppressed(inner); } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index cf6e753684676..dbbb38090bc3b 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2932,6 +2932,24 @@ public void testSnapshotDedupOperations() throws Exception { } } + /** Make sure that it's ok to close a translog snapshot multiple times */ + public void testCloseSnapshotTwice() throws Exception { + int numOps = between(0, 10); + for (int i = 0; i < numOps; i++) { + Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), i, primaryTerm.get(), new byte[]{1}); + translog.add(op); + if (randomBoolean()) { + translog.rollGeneration(); + } + } + for (int i = 0; i < 5; i++) { + Translog.Snapshot snapshot = translog.newSnapshot(); + assertThat(snapshot, SnapshotMatchers.size(numOps)); + snapshot.close(); + snapshot.close(); + } + } + static class SortedSnapshot implements Translog.Snapshot { private final Translog.Snapshot snapshot; private List operations = null; From 2bcba248303d5e0d0ddad90dc08831cd2efa1c34 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 16 Jul 2018 16:01:08 -0400 Subject: [PATCH 3/4] hold the original snapshot --- .../index/shard/PrimaryReplicaSyncer.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index 477fe0470ca1d..d31c63b18adce 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -82,7 +82,7 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests public void resync(final IndexShard indexShard, final ActionListener listener) { ActionListener resyncListener = null; - Translog.Snapshot wrappedSnapshot = null; + Translog.Snapshot snapshot = null; try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); @@ -90,16 +90,17 @@ public void resync(final IndexShard indexShard, final ActionListener // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible // Also fail the resync early if the shard is shutting down - Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); - wrappedSnapshot = new Translog.Snapshot() { + snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); + final Translog.Snapshot originalSnapshot = snapshot; + final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { @Override public synchronized void close() throws IOException { - snapshot.close(); + originalSnapshot.close(); } @Override public synchronized int totalOperations() { - return snapshot.totalOperations(); + return originalSnapshot.totalOperations(); } @Override @@ -110,14 +111,14 @@ public synchronized Translog.Operation next() throws IOException { } else { assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state; } - return snapshot.next(); + return originalSnapshot.next(); } }; resyncListener = new ActionListener() { @Override public void onResponse(final ResyncTask resyncTask) { try { - snapshot.close(); + wrappedSnapshot.close(); listener.onResponse(resyncTask); } catch (final Exception e) { onFailure(e); @@ -127,7 +128,7 @@ public void onResponse(final ResyncTask resyncTask) { @Override public void onFailure(final Exception e) { try { - snapshot.close(); + wrappedSnapshot.close(); } catch (final Exception inner) { e.addSuppressed(inner); } finally { @@ -140,7 +141,7 @@ public void onFailure(final Exception e) { startingSeqNo, maxSeqNo, resyncListener); } catch (Exception e) { try { - IOUtils.close(wrappedSnapshot); + IOUtils.close(snapshot); } catch (IOException inner) { e.addSuppressed(inner); } From 1dbdaac770d7b6cbd09371d1c11f476a2cc7cf78 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 16 Jul 2018 17:50:57 -0400 Subject: [PATCH 4/4] remove the outer listener --- .../elasticsearch/index/shard/PrimaryReplicaSyncer.java | 8 ++------ .../index/shard/PrimaryReplicaSyncerTests.java | 4 ++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index d31c63b18adce..e66d78f2e1a05 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -81,7 +81,6 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests } public void resync(final IndexShard indexShard, final ActionListener listener) { - ActionListener resyncListener = null; Translog.Snapshot snapshot = null; try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; @@ -114,7 +113,7 @@ public synchronized Translog.Operation next() throws IOException { return originalSnapshot.next(); } }; - resyncListener = new ActionListener() { + final ActionListener resyncListener = new ActionListener() { @Override public void onResponse(final ResyncTask resyncTask) { try { @@ -144,10 +143,7 @@ public void onFailure(final Exception e) { IOUtils.close(snapshot); } catch (IOException inner) { e.addSuppressed(inner); - } - if (resyncListener != null) { - resyncListener.onFailure(e); - } else { + } finally { listener.onFailure(e); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 48286a33db61a..4444f475329b3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -151,7 +151,7 @@ public void testSyncerOnClosingShard() throws Exception { public void onFailure(Exception e) { try { super.onFailure(e); - }finally { + } finally { syncCalledLatch.countDown(); } } @@ -159,7 +159,7 @@ public void onFailure(Exception e) { public void onResponse(PrimaryReplicaSyncer.ResyncTask result) { try { super.onResponse(result); - }finally { + } finally { syncCalledLatch.countDown(); } }