From d006988edf9b7a78996f1cc1a4f6577fa6d2981f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 8 Nov 2017 14:06:58 -0500 Subject: [PATCH] add the replication test --- .../IndexLevelReplicationTests.java | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index cf4dab733f237..a9810b09c5f8c 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -46,16 +46,23 @@ import org.hamcrest.Matcher; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase { @@ -299,6 +306,65 @@ public void testRequestFailureReplication() throws Exception { } } + public void testTranslogDedupOperations() throws Exception { + try (ReplicationGroup shards = createGroup(2)) { + shards.startAll(); + int initDocs = shards.indexDocs(randomInt(10)); + List replicas = shards.getReplicas(); + IndexShard replica1 = replicas.get(0); + IndexShard replica2 = replicas.get(1); + + logger.info("--> Isolate replica1"); + IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON); + BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary()); + for (int i = 1; i < replicas.size(); i++) { + indexOnReplica(replicationRequest, replicas.get(i)); + } + + final Translog.Operation op1; + final List initOperations = new ArrayList<>(initDocs); + try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); + for (int i = 0; i < initDocs; i++) { + Translog.Operation op = snapshot.next(); + assertThat(op, is(notNullValue())); + initOperations.add(op); + } + op1 = snapshot.next(); + assertThat(op1, notNullValue()); + assertThat(snapshot.next(), nullValue()); + assertThat(snapshot.skippedOperations(), equalTo(0)); + } + + // Make sure that replica2 receives translog from replica1 and overwrites its stale operation (op1). + logger.info("--> Promote replica1 as the primary"); + shards.promoteReplicaToPrimary(replica1); + shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON)); + final Translog.Operation op2; + try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), greaterThanOrEqualTo(initDocs + 2)); + op2 = snapshot.next(); + assertThat(op2.seqNo(), equalTo(op1.seqNo())); + assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm())); + assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); + assertThat(snapshot.skippedOperations(), greaterThanOrEqualTo(1)); + } + + // Make sure that peer-recovery transfers all but non-duplicated operations. + IndexShard replica3 = shards.addReplica(); + logger.info("--> Promote replica2 as the primary"); + shards.promoteReplicaToPrimary(replica2); + logger.info("--> Recover replica3 from replica2"); + recoverReplica(replica3, replica2); + try (Translog.Snapshot snapshot = replica3.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); + assertThat(snapshot.next(), equalTo(op2)); + assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); + assertThat(snapshot.skippedOperations(), equalTo(0)); + } + } + } + /** Throws documentFailure on every indexing operation */ static class ThrowingDocumentFailureEngineFactory implements EngineFactory { final String documentFailureMessage;