-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replica recovery could go into an endless flushing loop #28350
Conversation
@dnhatn great find. I think that we would actually want the flush to happen in this case so that the translog can be cleaned up. The current approach here says: There's more than 500mb worth of uncommitted data (which is actually all committed), but no uncommitted change to Lucene, so let's ignore this. If we would forcibly flush even though there are no changes to Lucene, this would allow us to free the translog. |
This is a great find. I'm not sure though that this is the right fix. The main problem is that the uncommitted bytes stats is off. All ops in the translog are actually in lucene. The problem is that uncommitted bytes is calculated based on the translog gen file, which is indeed pointed to by lucene. This is amplified by the fact that we now ship more of the translog to create a history on the replica, which is not relevant for the flushing logic. I wonder if we should always force flush at the end of recovery as an easy fix. Another option is to flush when lucene doesn't point to the right generation, even if there are no pending ops. I want to think about this some more.
Agreed. It is a broader issue that has implication for the entire replication group. Last we talked about it we thought of having a fail safe of in line with "if a specific in sync shard lags behind with more than x ops, fail it". x can be something large like 10K ops or something. The downside of course is that it will hide bugs. |
I agreed. I am not sure if this is a right approach either. I was trying to fix this by only sending translog operations after the local checkpoint in peer-recovery. However, this can happen in other cases hence I switched to this approach. |
We don't do this by design - we need to build a translog with history on the replica. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx Nhat. Reviewing this and talking things through with @ywelsch we came up with a model that conceptually simpler to digest and we feel better about than we came up with yesterday.
Here's the idea:
- Remove shouldFlush from the translog only have these decisions made in should in the Engine
- The shouldFlush check in the engine shouldn't rely on translog generations but rather only work with uncommitted bytes. Concretely:
a) if uncommittedBytes is < 512, return false
b) expose the Translog#sizeOfGensAboveSeqNoInBytes method that's currently unused.
c) check if sizeOfGensAboveSeqNoInBytes(localCheckpoint + 1) > uncommittedBytes . If it is , return true (as we will gain some bytes), if not return false.
WDYT?
@@ -1492,7 +1511,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti | |||
logger.trace("acquired flush lock immediately"); | |||
} | |||
try { | |||
if (indexWriter.hasUncommittedChanges() || force) { | |||
if (indexWriter.hasUncommittedChanges() || force || shouldFlush()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a comment explaining why we have 3 things? Basically something like - we check if:
- We're forced.
- There are uncommitted lucene docs in lucene
- There are translog related reasons to create a new commit which point to a different place in the translog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
/* | ||
* We should only flush ony if the shouldFlush condition can become false after flushing. This condition will change if: | ||
* 1. The min translog gen of the next commit points to a different translog gen than the last commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this deserves a comment why we don't take the IW#hasUncommittedChanges() into account.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we call ensureOpen()
here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -817,6 +817,12 @@ public final boolean refreshNeeded() { | |||
// NOTE: do NOT rename this to something containing flush or refresh! | |||
public abstract void writeIndexingBuffer() throws EngineException; | |||
|
|||
/** | |||
* Checks if this engine should be flushed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain that this can return false even if there are uncommitted changes. It's more of a maintainance function. maybe we should call it differently something like shouldFlushForMaintainance
or maintainanceFlushPending()
just suggestions to make it more clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yannick and I came up with shouldFlushToFreeTranslog
I've addressed your feedbacks. Could you please take another look? Thank you! |
@@ -306,4 +307,26 @@ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception { | |||
} | |||
} | |||
|
|||
public void testShouldFlushAfterPeerRecovery() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add Javadoc to this method to explain what the goal of this test is?
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); | ||
final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes(); | ||
// If flushThreshold is too small, we may continuously flush even there is no uncommitted operations. | ||
if (uncommittedSizeOfCurrentCommit < flushThreshold || translog.uncommittedOperations() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe put the check translog.uncommittedOperations() == 0
at the beginning of the shouldFlushToFreeTranslog
method.
shards.startAll(); | ||
long translogSizeOnPrimary = 0; | ||
int numDocs = shards.indexDocs(between(10, 100)); | ||
translogSizeOnPrimary += shards.getPrimary().getTranslog().uncommittedSizeInBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just define translogSizeOnPrimary here (no need to initialize)
Today after writing an operation to an engine, we will call `IndexShard#afterWriteOperation` to flush a new commit if needed. The `shouldFlush` condition is purely based on the uncommitted translog size and the translog flush threshold size setting. However this can cause a replica execute an infinite loop of flushing in the following situation. 1. Primary has a fully baked index commit with its local checkpoint equals to max_seqno 2. Primary sends that fully baked commit, then replays all retained translog operations to the replica 3. No operations are added to Lucence on the replica as seqno of these operations are at most the local checkpoint 4. Once translog operations are replayed, the target calls `IndexShard#afterWriteOperation` to flush. If the total size of the replaying operations exceeds the flush threshold size, this call will `Engine#flush`. However the engine won't flush as its index writer does not have any uncommitted operations. The method `IndexShard#afterWriteOperation` will keep flushing as the condition `shouldFlush` is still true. This issue can be avoided if we always flush if the `shouldFlush` condition is true.
Today after writing an operation to an engine, we will call `IndexShard#afterWriteOperation` to flush a new commit if needed. The `shouldFlush` condition is purely based on the uncommitted translog size and the translog flush threshold size setting. However this can cause a replica execute an infinite loop of flushing in the following situation. 1. Primary has a fully baked index commit with its local checkpoint equals to max_seqno 2. Primary sends that fully baked commit, then replays all retained translog operations to the replica 3. No operations are added to Lucence on the replica as seqno of these operations are at most the local checkpoint 4. Once translog operations are replayed, the target calls `IndexShard#afterWriteOperation` to flush. If the total size of the replaying operations exceeds the flush threshold size, this call will `Engine#flush`. However the engine won't flush as its index writer does not have any uncommitted operations. The method `IndexShard#afterWriteOperation` will keep flushing as the condition `shouldFlush` is still true. This issue can be avoided if we always flush if the `shouldFlush` condition is true.
Today after writing an operation to an engine, we will call `IndexShard#afterWriteOperation` to flush a new commit if needed. The `shouldFlush` condition is purely based on the uncommitted translog size and the translog flush threshold size setting. However this can cause a replica execute an infinite loop of flushing in the following situation. 1. Primary has a fully baked index commit with its local checkpoint equals to max_seqno 2. Primary sends that fully baked commit, then replays all retained translog operations to the replica 3. No operations are added to Lucence on the replica as seqno of these operations are at most the local checkpoint 4. Once translog operations are replayed, the target calls `IndexShard#afterWriteOperation` to flush. If the total size of the replaying operations exceeds the flush threshold size, this call will `Engine#flush`. However the engine won't flush as its index writer does not have any uncommitted operations. The method `IndexShard#afterWriteOperation` will keep flushing as the condition `shouldFlush` is still true. This issue can be avoided if we always flush if the `shouldFlush` condition is true.
If the translog flush threshold is too small (eg. smaller than the translog header), we may repeatedly flush even there is no uncommitted operation because the shouldFlush condition can still be true after flushing. This is currently avoided by adding an extra guard against the uncommitted operations. However, this extra guard makes the shouldFlush complicated. This commit replaces that extra guard by a lower bound for translog flush threshold. We keep the lower bound small for convenience in testing. Relates elastic#28350 Relates elastic#23606
* master: (23 commits) Update Netty to 4.1.16.Final (elastic#28345) Fix peer recovery flushing loop (elastic#28350) REST high-level client: add support for exists alias (elastic#28332) REST high-level client: move to POST when calling API to retrieve which support request body (elastic#28342) Add Indices Aliases API to the high level REST client (elastic#27876) Java Api clean up: remove deprecated `isShardsAcked` (elastic#28311) [Docs] Fix explanation for `from` and `size` example (elastic#28320) Adapt bwc version after backport elastic#28358 Always return the after_key in composite aggregation response (elastic#28358) Adds test name to MockPageCacheRecycler exception (elastic#28359) Adds a note in the `terms` aggregation docs regarding pagination (elastic#28360) [Test] Fix DiscoveryNodesTests.testDeltas() (elastic#28361) Update packaging tests to work with meta plugins (elastic#28336) Remove Painless Type from MethodWriter in favor of Java Class. (elastic#28346) [Doc] Fixs typo in reverse-nested-aggregation.asciidoc (elastic#28348) Reindex: Shore up rethrottle test Only assert single commit iff index created on 6.2 isHeldByCurrentThread should return primitive bool [Docs] Clarify `html` encoder in highlighting.asciidoc (elastic#27766) Fix GeoDistance query example (elastic#28355) ...
good change and catch @dnhatn quite some insight into the system needed to get there, the dark side of the force is strong down there ;) |
If the translog flush threshold is too small (eg. smaller than the translog header), we may repeatedly flush even there is no uncommitted operation because the shouldFlush condition can still be true after flushing. This is currently avoided by adding an extra guard against the uncommitted operations. However, this extra guard makes the shouldFlush complicated. This commit replaces that extra guard by a lower bound for translog flush threshold. We keep the lower bound small for convenience in testing. Relates #28350 Relates #23606
If the translog flush threshold is too small (eg. smaller than the translog header), we may repeatedly flush even there is no uncommitted operation because the shouldFlush condition can still be true after flushing. This is currently avoided by adding an extra guard against the uncommitted operations. However, this extra guard makes the shouldFlush complicated. This commit replaces that extra guard by a lower bound for translog flush threshold. We keep the lower bound small for convenience in testing. Relates #28350 Relates #23606
In elastic#28350, we fixed an endless flushing loop which can happen on replicas by tightening the relation between the flush action and the periodically flush condition. 1. The periodically flush condition is enabled only if it will be disabled after a flush. 2. If the periodically flush condition is true then a flush will actually happen regardless of Lucene state. (1) and (2) guarantee a flushing loop will be terminated. Sadly, the condition elastic#1 can be violated in edge cases as we used two different algorithms to evaluate the current and future uncommitted size. - We use method `uncommittedSizeInBytes` to calculate current uncommitted size. It is the sum of translogs whose generation at least the minGen (determined by a given seqno). We pick a continuous range of translogs since the minGen to evaluate the current uncommitted size. - We use method `sizeOfGensAboveSeqNoInBytes` to calculate the future uncommitted size. It is the sum of translogs whose maxSeqNo at least the given seqNo. Here we don't pick a range but select translog one by one. Suppose we have 3 translogs gen1={elastic#1,elastic#2}, gen2={}, gen3={elastic#3} and seqno=elastic#1, uncommittedSizeInBytes is the sum of gen1, gen2, and gen3 while sizeOfGensAboveSeqNoInBytes is sum of gen1 and gen3. Gen2 is excluded because its maxSeqno is still -1. This commit ensures sizeOfGensAboveSeqNoInBytes use the same algorithm from uncommittedSizeInBytes Closes elastic#29097
In #28350, we fixed an endless flushing loop which may happen on replicas by tightening the relation between the flush action and the periodically flush condition. 1. The periodically flush condition is enabled only if it is disabled after a flush. 2. If the periodically flush condition is enabled then a flush will actually happen regardless of Lucene state. (1) and (2) guarantee that a flushing loop will be terminated. Sadly, the condition 1 can be violated in edge cases as we used two different algorithms to evaluate the current and future uncommitted translog size. - We use method `uncommittedSizeInBytes` to calculate current uncommitted size. It is the sum of translogs whose generation at least the minGen (determined by a given seqno). We pick a continuous range of translogs since the minGen to evaluate the current uncommitted size. - We use method `sizeOfGensAboveSeqNoInBytes` to calculate the future uncommitted size. It is the sum of translogs whose maxSeqNo at least the given seqNo. Here we don't pick a range but select translog one by one. Suppose we have 3 translogs `gen1={#1,#2}, gen2={}, gen3={#3} and seqno=#1`, `uncommittedSizeInBytes` is the sum of gen1, gen2, and gen3 while `sizeOfGensAboveSeqNoInBytes` is the sum of gen1 and gen3. Gen2 is excluded because its maxSeqno is still -1. This commit removes both `sizeOfGensAboveSeqNoInBytes` and `uncommittedSizeInBytes` methods, then enforces an engine to use only `sizeInBytesByMinGen` method to evaluate the periodically flush condition. Closes #29097 Relates ##28350
In #28350, we fixed an endless flushing loop which may happen on replicas by tightening the relation between the flush action and the periodically flush condition. 1. The periodically flush condition is enabled only if it is disabled after a flush. 2. If the periodically flush condition is enabled then a flush will actually happen regardless of Lucene state. (1) and (2) guarantee that a flushing loop will be terminated. Sadly, the condition 1 can be violated in edge cases as we used two different algorithms to evaluate the current and future uncommitted translog size. - We use method `uncommittedSizeInBytes` to calculate current uncommitted size. It is the sum of translogs whose generation at least the minGen (determined by a given seqno). We pick a continuous range of translogs since the minGen to evaluate the current uncommitted size. - We use method `sizeOfGensAboveSeqNoInBytes` to calculate the future uncommitted size. It is the sum of translogs whose maxSeqNo at least the given seqNo. Here we don't pick a range but select translog one by one. Suppose we have 3 translogs `gen1={#1,#2}, gen2={}, gen3={#3} and seqno=#1`, `uncommittedSizeInBytes` is the sum of gen1, gen2, and gen3 while `sizeOfGensAboveSeqNoInBytes` is the sum of gen1 and gen3. Gen2 is excluded because its maxSeqno is still -1. This commit removes both `sizeOfGensAboveSeqNoInBytes` and `uncommittedSizeInBytes` methods, then enforces an engine to use only `sizeInBytesByMinGen` method to evaluate the periodically flush condition. Closes #29097 Relates ##28350
In #28350, we fixed an endless flushing loop which may happen on replicas by tightening the relation between the flush action and the periodically flush condition. 1. The periodically flush condition is enabled only if it is disabled after a flush. 2. If the periodically flush condition is enabled then a flush will actually happen regardless of Lucene state. (1) and (2) guarantee that a flushing loop will be terminated. Sadly, the condition 1 can be violated in edge cases as we used two different algorithms to evaluate the current and future uncommitted translog size. - We use method `uncommittedSizeInBytes` to calculate current uncommitted size. It is the sum of translogs whose generation at least the minGen (determined by a given seqno). We pick a continuous range of translogs since the minGen to evaluate the current uncommitted size. - We use method `sizeOfGensAboveSeqNoInBytes` to calculate the future uncommitted size. It is the sum of translogs whose maxSeqNo at least the given seqNo. Here we don't pick a range but select translog one by one. Suppose we have 3 translogs `gen1={#1,#2}, gen2={}, gen3={#3} and seqno=#1`, `uncommittedSizeInBytes` is the sum of gen1, gen2, and gen3 while `sizeOfGensAboveSeqNoInBytes` is the sum of gen1 and gen3. Gen2 is excluded because its maxSeqno is still -1. This commit removes both `sizeOfGensAboveSeqNoInBytes` and `uncommittedSizeInBytes` methods, then enforces an engine to use only `sizeInBytesByMinGen` method to evaluate the periodically flush condition. Closes #29097 Relates ##28350
testShouldFlushAfterPeerRecovery was added #28350 to make sure the flushing loop triggered by afterWriteOperation eventually terminates. This test relies on the fact that we call afterWriteOperation after making changes in translog. In #44756, we roll a new generation in RecoveryTarget#finalizeRecovery but do not call afterWriteOperation. Relates #28350 Relates #45073
testShouldFlushAfterPeerRecovery was added #28350 to make sure the flushing loop triggered by afterWriteOperation eventually terminates. This test relies on the fact that we call afterWriteOperation after making changes in translog. In #44756, we roll a new generation in RecoveryTarget#finalizeRecovery but do not call afterWriteOperation. Relates #28350 Relates #45073
Today after writing an operation to an engine, we will call
IndexShard#afterWriteOperation
to flush a new commit if needed. TheshouldFlush
condition is purely based on the uncommitted translog size and the translog flush threshold size setting. However this can cause a replica execute an infinite loop of flushing in the following situation.IndexShard#afterWriteOperation
to flush. If the total size of the replaying operations exceeds the flush threshold size, this call willEngine#flush
. However the engine won't flush as its index writer does not have any uncommitted operations. The methodIndexShard#afterWriteOperation
will keep flushing as the conditionshouldFlush
is still true.This issue can be avoided if we always flush if the
shouldFlush
condition is true.