Skip to content

Commit

Permalink
Handle lower retaining seqno retention lease error (#46420)
Browse files Browse the repository at this point in the history
We renew the CCR retention lease at a fixed interval, therefore it's
possible to have more than one in-flight renewal requests at the same
time. If requests arrive out of order, then the assertion is violated.
  • Loading branch information
dnhatn committed Oct 31, 2019
1 parent d5eb460 commit 784739d
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,21 +384,27 @@ private RetentionLease innerAddRetentionLease(String id, long retainingSequenceN
* @param source the source of the retention lease
* @return the renewed retention lease
* @throws RetentionLeaseNotFoundException if the specified retention lease does not exist
* @throws IllegalArgumentException if the new retaining sequence number is lower than
* the retaining sequence number of the current retention lease.
*/
public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert primaryMode;
if (retentionLeases.contains(id) == false) {
final RetentionLease existingRetentionLease = retentionLeases.get(id);
if (existingRetentionLease == null) {
throw new RetentionLeaseNotFoundException(id);
}
if (retainingSequenceNumber < existingRetentionLease.retainingSequenceNumber()) {
assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(source) == false :
"renewing peer recovery retention lease [" + existingRetentionLease + "]" +
" with a lower retaining sequence number [" + retainingSequenceNumber + "]";
throw new IllegalArgumentException(
"the current retention lease with [" + id + "]" +
" is retaining a higher sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]" +
" than the new retaining sequence number [" + retainingSequenceNumber + "] from [" + source + "]"
);
}
final RetentionLease retentionLease =
new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
final RetentionLease existingRetentionLease = retentionLeases.get(id);
assert existingRetentionLease != null;
assert existingRetentionLease.retainingSequenceNumber() <= retentionLease.retainingSequenceNumber() :
"retention lease renewal for [" + id + "]"
+ " from [" + source + "]"
+ " renewed a lower retaining sequence number [" + retentionLease.retainingSequenceNumber() + "]"
+ " than the current lease retaining sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]";
new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
retentionLeases = new RetentionLeases(
operationPrimaryTerm,
retentionLeases.version() + 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,36 @@ public void testPersistRetentionLeasesUnderConcurrency() throws IOException {
assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
}

public void testRenewLeaseWithLowerRetainingSequenceNumber() throws Exception {
final AllocationId allocationId = AllocationId.newInitializing();
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
primaryTerm,
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {},
OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomNonNegativeLong();
final String source = randomAlphaOfLength(8);
replicationTracker.addRetentionLease(id, retainingSequenceNumber, source, ActionListener.wrap(() -> {}));
final long lowerRetainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, retainingSequenceNumber - 1);
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> replicationTracker.renewRetentionLease(id, lowerRetainingSequenceNumber, source));
assertThat(e, hasToString(containsString("the current retention lease with [" + id + "]" +
" is retaining a higher sequence number [" + retainingSequenceNumber + "]" +
" than the new retaining sequence number [" + lowerRetainingSequenceNumber + "] from [" + source + "]")));
}

private void assertRetentionLeases(
final ReplicationTracker replicationTracker,
final int size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,13 @@ public static void asyncRemoveRetentionLease(
remoteClient.execute(RetentionLeaseActions.Remove.INSTANCE, request, listener);
}

/**
* Checks if the given exception is an error from the leader shard where the retaining sequence number of a renewal request
* is lower than the retaining sequence number of the current retention lease. This method is merely used to avoid logging
* warning errors which are expected as we can have multiple outstanding renewal retention leases requests.
*/
public static boolean isInvalidRetainingSequenceNumberError(String retentionLeaseId, Throwable cause) {
final String message = "the current retention lease with [" + retentionLeaseId + "] is retaining a higher sequence number";
return cause instanceof IllegalArgumentException && cause.getMessage().startsWith(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -470,11 +470,13 @@ protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final Lo

private void logRetentionLeaseFailure(final String retentionLeaseId, final Throwable cause) {
assert cause instanceof ElasticsearchSecurityException == false : cause;
logger.warn(new ParameterizedMessage(
"{} background management of retention lease [{}] failed while following",
params.getFollowShardId(),
retentionLeaseId),
if (CcrRetentionLeases.isInvalidRetainingSequenceNumberError(retentionLeaseId, cause) == false) {
logger.warn(new ParameterizedMessage(
"{} background management of retention lease [{}] failed while following",
params.getFollowShardId(),
retentionLeaseId),
cause);
}
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@

import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.isInvalidRetainingSequenceNumberError;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncAddRetentionLease;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncRenewRetentionLease;
Expand Down Expand Up @@ -336,10 +337,13 @@ public void restoreShard(Store store, SnapshotId snapshotId,
ActionListener.wrap(
r -> {},
e -> {
assert e instanceof ElasticsearchSecurityException == false : e;
logger.warn(new ParameterizedMessage(
"{} background renewal of retention lease [{}] failed during restore", shardId,
retentionLeaseId), e);
final Throwable cause = ExceptionsHelper.unwrapCause(e);
assert cause instanceof ElasticsearchSecurityException == false : cause;
if (isInvalidRetainingSequenceNumberError(retentionLeaseId, cause) == false) {
logger.warn(new ParameterizedMessage(
"{} background renewal of retention lease [{}] failed during restore", shardId,
retentionLeaseId), cause);
}
}));
}
},
Expand Down

0 comments on commit 784739d

Please sign in to comment.