diff --git a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index d4e2c652fa875..ca003bfc07bcd 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.resync; import org.elasticsearch.Version; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,6 +29,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Objects; /** * Represents a batch of operations sent from the primary to its replicas during the primary-replica resync. @@ -36,15 +38,17 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest listener) { + long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener listener) { ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId); ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-) ActionListener wrappedListener = new ActionListener() { @@ -170,7 +172,7 @@ public void onFailure(Exception e) { }; try { new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(), - startingSeqNo, maxSeqNo, wrappedListener).run(); + startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run(); } catch (Exception e) { wrappedListener.onFailure(e); } @@ -191,6 +193,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener listener; private final AtomicBoolean firstMessage = new AtomicBoolean(true); @@ -199,7 +202,8 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener listener) { + Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, + long maxSeenAutoIdTimestamp, ActionListener listener) { this.logger = logger; this.syncAction = syncAction; this.task = task; @@ -210,6 +214,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener operations, f } } + public void testTransferMaxSeenAutoIdTimestampOnResync() throws Exception { + try (ReplicationGroup shards = createGroup(2)) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + IndexShard replica1 = shards.getReplicas().get(0); + IndexShard replica2 = shards.getReplicas().get(1); + long maxTimestampOnReplica1 = -1; + long maxTimestampOnReplica2 = -1; + List replicationRequests = new ArrayList<>(); + for (int numDocs = between(1, 10), i = 0; i < numDocs; i++) { + final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); + indexRequest.process(Version.CURRENT, null, index.getName()); + final IndexRequest copyRequest; + if (randomBoolean()) { + copyRequest = copyIndexRequest(indexRequest); + indexRequest.onRetry(); + } else { + copyRequest = copyIndexRequest(indexRequest); + copyRequest.onRetry(); + } + replicationRequests.add(copyRequest); + final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, primary); + if (randomBoolean()) { + indexOnReplica(bulkShardRequest, shards, replica1); + maxTimestampOnReplica1 = Math.max(maxTimestampOnReplica1, indexRequest.getAutoGeneratedTimestamp()); + } else { + indexOnReplica(bulkShardRequest, shards, replica2); + maxTimestampOnReplica2 = Math.max(maxTimestampOnReplica2, indexRequest.getAutoGeneratedTimestamp()); + } + } + assertThat(replica1.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1)); + assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica2)); + shards.promoteReplicaToPrimary(replica1).get(); + assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1)); + for (IndexRequest request : replicationRequests) { + shards.index(request); // deliver via normal replication + } + for (IndexShard shard : shards) { + assertThat(shard.getMaxSeenAutoIdTimestamp(), equalTo(Math.max(maxTimestampOnReplica1, maxTimestampOnReplica2))); + } + } + } + public static class BlockingTarget extends RecoveryTarget { private final CountDownLatch recoveryBlocked; diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index c5fc0e4b03477..28e625b34dfd6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -76,7 +76,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { // Index doc but not advance local checkpoint. shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, SourceToParse.source(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + randomBoolean() ? IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP : randomNonNegativeLong(), true); } long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0; @@ -105,6 +105,8 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { .findFirst() .isPresent(), is(false)); + + assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp())); } if (syncNeeded && globalCheckPoint < numDocs - 1) { if (shard.indexSettings.isSoftDeleteEnabled()) {