Skip to content

Commit

Permalink
Propagate auto_id_timestamp in primary-replica resync (#33964)
Browse files Browse the repository at this point in the history
A follow-up of #33693 to propagate max_seen_auto_id_timestamp in a
primary-replica resync.

Relates #33693
  • Loading branch information
dnhatn authored and kcm committed Oct 30, 2018
1 parent 2122632 commit ea7bca8
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.action.resync;

import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -28,6 +29,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

/**
* Represents a batch of operations sent from the primary to its replicas during the primary-replica resync.
Expand All @@ -36,22 +38,28 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn

private long trimAboveSeqNo;
private Translog.Operation[] operations;
private long maxSeenAutoIdTimestampOnPrimary;

ResyncReplicationRequest() {
super();
}

public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo,
final Translog.Operation[] operations) {
public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo, final long maxSeenAutoIdTimestampOnPrimary,
final Translog.Operation[]operations) {
super(shardId);
this.trimAboveSeqNo = trimAboveSeqNo;
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
this.operations = operations;
}

public long getTrimAboveSeqNo() {
return trimAboveSeqNo;
}

public long getMaxSeenAutoIdTimestampOnPrimary() {
return maxSeenAutoIdTimestampOnPrimary;
}

public Translog.Operation[] getOperations() {
return operations;
}
Expand All @@ -73,6 +81,11 @@ public void readFrom(final StreamInput in) throws IOException {
} else {
trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
} else {
maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}

Expand All @@ -82,6 +95,9 @@ public void writeTo(final StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
out.writeZLong(trimAboveSeqNo);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
}
out.writeArray(Translog.Operation::writeOperation, operations);
}

Expand All @@ -90,13 +106,13 @@ public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final ResyncReplicationRequest that = (ResyncReplicationRequest) o;
return trimAboveSeqNo == that.trimAboveSeqNo
return trimAboveSeqNo == that.trimAboveSeqNo && maxSeenAutoIdTimestampOnPrimary == that.maxSeenAutoIdTimestampOnPrimary
&& Arrays.equals(operations, that.operations);
}

@Override
public int hashCode() {
return Long.hashCode(trimAboveSeqNo) + 31 * Arrays.hashCode(operations);
return Objects.hash(trimAboveSeqNo, maxSeenAutoIdTimestampOnPrimary, operations);
}

@Override
Expand All @@ -106,6 +122,7 @@ public String toString() {
", timeout=" + timeout +
", index='" + index + '\'' +
", trimAboveSeqNo=" + trimAboveSeqNo +
", maxSeenAutoIdTimestampOnPrimary=" + maxSeenAutoIdTimestampOnPrimary +
", ops=" + operations.length +
"}";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest re

public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
/*
* Operations received from resync do not have auto_id_timestamp individually, we need to bootstrap this max_seen_timestamp
* (at least the highest timestamp from any of these operations) to make sure that we will disable optimization for the same
* append-only requests with timestamp (sources of these operations) that are replicated; otherwise we may have duplicates.
*/
replica.updateMaxUnsafeAutoIdTimestamp(request.getMaxSeenAutoIdTimestampOnPrimary());
for (Translog.Operation operation : request.getOperations()) {
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,11 @@ public void onFailure(final Exception e) {
}
}
};

// We must capture the timestamp after snapshotting a snapshot of operations to make sure
// that the auto_id_timestamp of every operation in the snapshot is at most this value.
final long maxSeenAutoIdTimestamp = indexShard.getMaxSeenAutoIdTimestamp();
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPendingPrimaryTerm(), wrappedSnapshot,
startingSeqNo, maxSeqNo, resyncListener);
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, resyncListener);
} catch (Exception e) {
try {
IOUtils.close(snapshot);
Expand All @@ -150,7 +152,7 @@ public void onFailure(final Exception e) {
}

private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot,
long startingSeqNo, long maxSeqNo, ActionListener<ResyncTask> listener) {
long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener<ResyncTask> listener) {
ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
ActionListener<Void> wrappedListener = new ActionListener<Void>() {
Expand All @@ -170,7 +172,7 @@ public void onFailure(Exception e) {
};
try {
new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(),
startingSeqNo, maxSeqNo, wrappedListener).run();
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run();
} catch (Exception e) {
wrappedListener.onFailure(e);
}
Expand All @@ -191,6 +193,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
private final Translog.Snapshot snapshot;
private final long startingSeqNo;
private final long maxSeqNo;
private final long maxSeenAutoIdTimestamp;
private final int chunkSizeInBytes;
private final ActionListener<Void> listener;
private final AtomicBoolean firstMessage = new AtomicBoolean(true);
Expand All @@ -199,7 +202,8 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
private AtomicBoolean closed = new AtomicBoolean();

SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm,
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, ActionListener<Void> listener) {
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo,
long maxSeenAutoIdTimestamp, ActionListener<Void> listener) {
this.logger = logger;
this.syncAction = syncAction;
this.task = task;
Expand All @@ -210,6 +214,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
this.chunkSizeInBytes = chunkSizeInBytes;
this.startingSeqNo = startingSeqNo;
this.maxSeqNo = maxSeqNo;
this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp;
this.listener = listener;
task.setTotalOperations(snapshot.totalOperations());
}
Expand Down Expand Up @@ -260,7 +265,7 @@ protected void doRun() throws Exception {
if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
task.setPhase("sending_ops");
ResyncReplicationRequest request =
new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, operations.toArray(EMPTY_ARRAY));
new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, maxSeenAutoIdTimestamp, operations.toArray(EMPTY_ARRAY));
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
firstMessage.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void testSerialization() throws IOException {
final Translog.Index index = new Translog.Index("type", "id", 0, randomNonNegativeLong(),
randomNonNegativeLong(), bytes, null, -1);
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, new Translog.Operation[]{index});
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, 100, new Translog.Operation[]{index});

final BytesStreamOutput out = new BytesStreamOutput();
before.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -633,6 +634,49 @@ public long indexTranslogOperations(final List<Translog.Operation> operations, f
}
}

public void testTransferMaxSeenAutoIdTimestampOnResync() throws Exception {
try (ReplicationGroup shards = createGroup(2)) {
shards.startAll();
IndexShard primary = shards.getPrimary();
IndexShard replica1 = shards.getReplicas().get(0);
IndexShard replica2 = shards.getReplicas().get(1);
long maxTimestampOnReplica1 = -1;
long maxTimestampOnReplica2 = -1;
List<IndexRequest> replicationRequests = new ArrayList<>();
for (int numDocs = between(1, 10), i = 0; i < numDocs; i++) {
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON);
indexRequest.process(Version.CURRENT, null, index.getName());
final IndexRequest copyRequest;
if (randomBoolean()) {
copyRequest = copyIndexRequest(indexRequest);
indexRequest.onRetry();
} else {
copyRequest = copyIndexRequest(indexRequest);
copyRequest.onRetry();
}
replicationRequests.add(copyRequest);
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, primary);
if (randomBoolean()) {
indexOnReplica(bulkShardRequest, shards, replica1);
maxTimestampOnReplica1 = Math.max(maxTimestampOnReplica1, indexRequest.getAutoGeneratedTimestamp());
} else {
indexOnReplica(bulkShardRequest, shards, replica2);
maxTimestampOnReplica2 = Math.max(maxTimestampOnReplica2, indexRequest.getAutoGeneratedTimestamp());
}
}
assertThat(replica1.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1));
assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica2));
shards.promoteReplicaToPrimary(replica1).get();
assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1));
for (IndexRequest request : replicationRequests) {
shards.index(request); // deliver via normal replication
}
for (IndexShard shard : shards) {
assertThat(shard.getMaxSeenAutoIdTimestamp(), equalTo(Math.max(maxTimestampOnReplica1, maxTimestampOnReplica2)));
}
}
}

public static class BlockingTarget extends RecoveryTarget {

private final CountDownLatch recoveryBlocked;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
// Index doc but not advance local checkpoint.
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
SourceToParse.source(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
randomBoolean() ? IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP : randomNonNegativeLong(), true);
}

long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0;
Expand Down Expand Up @@ -105,6 +105,8 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
.findFirst()
.isPresent(),
is(false));

assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp()));
}
if (syncNeeded && globalCheckPoint < numDocs - 1) {
if (shard.indexSettings.isSoftDeleteEnabled()) {
Expand Down

0 comments on commit ea7bca8

Please sign in to comment.