From 503e932cb12ecafc43a5e6020b8373c640ff8d04 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 1 May 2018 21:46:26 +0200 Subject: [PATCH 1/2] fix inSync on cancelled allocation --- .../index/seqno/ReplicationTracker.java | 10 ++++++++ .../index/seqno/ReplicationTrackerTests.java | 24 +++++++++++++------ 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index dcca3d48254e5..0a491e7d2a01a 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -339,6 +339,11 @@ private boolean invariant() { "shard copy " + entry.getKey() + " is in-sync but not tracked"; } + // all pending in sync shards are tracked + for (String aId: pendingInSync) { + assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked"; + } + return true; } @@ -521,6 +526,9 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync)); } } + if (removedEntries) { + pendingInSync.removeIf(aId -> checkpoints.containsKey(aId) == false); + } } else { for (String initializingId : initializingAllocationIds) { if (shardAllocationId.equals(initializingId) == false) { @@ -549,6 +557,8 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion replicationGroup = calculateReplicationGroup(); if (primaryMode && removedEntries) { updateGlobalCheckpointOnPrimary(); + // notify any waiter for local checkpoint advancement to recheck that their shard is still being tracked. + notifyAllWaiters(); } } assert invariant(); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index d89e4289e1a19..6fdce76912e04 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -305,7 +305,8 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { final AllocationId inSyncAllocationId = AllocationId.newInitializing(); final AllocationId trackingAllocationId = AllocationId.newInitializing(); final ReplicationTracker tracker = newTracker(inSyncAllocationId); - tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()), + final long clusterStateVersion = randomNonNegativeLong(); + tracker.updateFromMaster(clusterStateVersion, Collections.singleton(inSyncAllocationId.getId()), routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId), emptySet()); tracker.activatePrimaryMode(globalCheckpoint); final Thread thread = new Thread(() -> { @@ -336,13 +337,22 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { assertBusy(() -> assertTrue(tracker.pendingInSync.contains(trackingAllocationId.getId()))); } - tracker.updateLocalCheckpoint(trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64)); - // synchronize with the waiting thread to mark that it is complete - barrier.await(); - assertTrue(complete.get()); - assertTrue(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()).inSync); + if (randomBoolean()) { + // normal path, shard catches up + tracker.updateLocalCheckpoint(trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64)); + // synchronize with the waiting thread to mark that it is complete + barrier.await(); + assertTrue(complete.get()); + assertTrue(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()).inSync); + } else { + // master changes its mind and cancels the allocation + tracker.updateFromMaster(clusterStateVersion + 1, Collections.singleton(inSyncAllocationId.getId()), + routingTable(emptySet(), inSyncAllocationId), emptySet()); + barrier.await(); + assertTrue(complete.get()); + assertNull(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId())); + } assertFalse(tracker.pendingInSync.contains(trackingAllocationId.getId())); - thread.join(); } From cef12b1bab1062d8853c0afd167195784c6dc171 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 2 May 2018 13:51:16 +0200 Subject: [PATCH 2/2] spaces --- .../java/org/elasticsearch/index/seqno/ReplicationTracker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 0a491e7d2a01a..6548aad767006 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -340,7 +340,7 @@ private boolean invariant() { } // all pending in sync shards are tracked - for (String aId: pendingInSync) { + for (String aId : pendingInSync) { assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked"; }