From f3d21a2ded8eb7c8c92ea3d340dfb5be04ad502e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 19 Sep 2018 15:13:40 -0400 Subject: [PATCH 1/2] Propagate auto_id_timestamp in primary-replica resync A follow-up of #33693 to propagate max_seen_auto_id_timestamp in a primary-replica resync. --- .../resync/ResyncReplicationRequest.java | 24 +++++++++-- .../TransportResyncReplicationAction.java | 6 +++ .../index/shard/PrimaryReplicaSyncer.java | 17 +++++--- .../resync/ResyncReplicationRequestTests.java | 2 +- .../RecoveryDuringReplicationTests.java | 41 +++++++++++++++++++ .../shard/PrimaryReplicaSyncerTests.java | 4 +- 6 files changed, 82 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index d4e2c652fa875..cc3495856a3dc 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Objects; /** * Represents a batch of operations sent from the primary to its replicas during the primary-replica resync. @@ -36,15 +37,17 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest listener) { + long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener listener) { ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId); ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-) ActionListener wrappedListener = new ActionListener() { @@ -170,7 +172,7 @@ public void onFailure(Exception e) { }; try { new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(), - startingSeqNo, maxSeqNo, wrappedListener).run(); + startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run(); } catch (Exception e) { wrappedListener.onFailure(e); } @@ -191,6 +193,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener listener; private final AtomicBoolean firstMessage = new AtomicBoolean(true); @@ -199,7 +202,8 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener listener) { + Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, + long maxSeenAutoIdTimestamp, ActionListener listener) { this.logger = logger; this.syncAction = syncAction; this.task = task; @@ -210,6 +214,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener operations, f } } + public void testTransferMaxSeenAutoIdTimestampOnResync() throws Exception { + try (ReplicationGroup shards = createGroup(2)) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + IndexShard replica1 = shards.getReplicas().get(0); + IndexShard replica2 = shards.getReplicas().get(1); + long maxTimestampOnReplica1 = -1; + long maxTimestampOnReplica2 = -1; + List replicationRequests = new ArrayList<>(); + for (int numDocs = between(1, 10), i = 0; i < numDocs; i++) { + final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); + indexRequest.process(Version.CURRENT, null, index.getName()); + final IndexRequest copyRequest; + if (randomBoolean()) { + copyRequest = copyIndexRequest(indexRequest); + indexRequest.onRetry(); + } else { + copyRequest = copyIndexRequest(indexRequest); + copyRequest.onRetry(); + } + replicationRequests.add(copyRequest); + final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, primary); + if (randomBoolean()) { + indexOnReplica(bulkShardRequest, shards, replica1); + maxTimestampOnReplica1 = Math.max(maxTimestampOnReplica1, indexRequest.getAutoGeneratedTimestamp()); + } else { + indexOnReplica(bulkShardRequest, shards, replica2); + maxTimestampOnReplica2 = Math.max(maxTimestampOnReplica2, indexRequest.getAutoGeneratedTimestamp()); + } + } + assertThat(replica1.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1)); + assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica2)); + shards.promoteReplicaToPrimary(replica1).get(); + assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1)); + for (IndexRequest request : replicationRequests) { + shards.index(request); // deliver via normal replication + } + } + } + public static class BlockingTarget extends RecoveryTarget { private final CountDownLatch recoveryBlocked; diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index c5fc0e4b03477..ad0df98e8fd9e 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -76,7 +76,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { // Index doc but not advance local checkpoint. shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, SourceToParse.source(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + randomNonNegativeLong(), randomBoolean()); } long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0; @@ -105,6 +105,8 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { .findFirst() .isPresent(), is(false)); + + assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp())); } if (syncNeeded && globalCheckPoint < numDocs - 1) { if (shard.indexSettings.isSoftDeleteEnabled()) { From bf642d63f2430c9106166fb326652e5eeea21757 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 22 Sep 2018 08:29:47 -0400 Subject: [PATCH 2/2] feedback --- .../elasticsearch/action/resync/ResyncReplicationRequest.java | 3 ++- .../index/replication/RecoveryDuringReplicationTests.java | 3 +++ .../elasticsearch/index/shard/PrimaryReplicaSyncerTests.java | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index cc3495856a3dc..ca003bfc07bcd 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.resync; import org.elasticsearch.Version; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -83,7 +84,7 @@ public void readFrom(final StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { maxSeenAutoIdTimestampOnPrimary = in.readZLong(); } else { - maxSeenAutoIdTimestampOnPrimary = -1; + maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; } operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index fda9e7315e84a..c38fb8a495650 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -671,6 +671,9 @@ public void testTransferMaxSeenAutoIdTimestampOnResync() throws Exception { for (IndexRequest request : replicationRequests) { shards.index(request); // deliver via normal replication } + for (IndexShard shard : shards) { + assertThat(shard.getMaxSeenAutoIdTimestamp(), equalTo(Math.max(maxTimestampOnReplica1, maxTimestampOnReplica2))); + } } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index ad0df98e8fd9e..28e625b34dfd6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -76,7 +76,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { // Index doc but not advance local checkpoint. shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, SourceToParse.source(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON), - randomNonNegativeLong(), randomBoolean()); + randomBoolean() ? IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP : randomNonNegativeLong(), true); } long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0;