-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CCR: Do not minimize requesting range on leader #30980
Conversation
Today before reading operations on the leading shard, we minimization the requesting range with the global checkpoint. However, this might generate an invalid range if the following shard generates a requesting range based on the global checkpoint from a primary shard and sends that request to a replica whose global checkpoint is lagged. I see two possible solutions for this: 1. Remove the minimization as the requesting ranges are safely generated by the following shards 2. Apply minimization to both min_seqno and max_seqno I pick the first approach in this PR.
Pinging @elastic/es-distributed |
Thanks @dnhatn although option seems reasonable to me, it doesn't explain what I have been seeing in the latest benchmark run. It looks like the invalid range occurred on all shard copies when retrieving write operations via the shard changes api. The error happened on the follower node:
And this the corresponding on the coordinating node in the follower cluster:
So I'm afraid that this change will hide the underlying issue that we're trying to find? |
I agree with @martijnvg. The issue that we looked at together earlier this week occurred on the primary shard; this shard copy can not be lagging in any knowledge here. |
I talked to @martijnvg before working on this but did not gather enough information. I will close this and dig again. Thanks for looking @martijnvg and @jasontedor! |
@martijnvg and @jasontedor This is ready. Can you please have a look? Thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
request.maxSeqNo = Math.min(request.maxSeqNo, indexShard.getGlobalCheckpoint()); | ||
// The following shard generates the request based on the global checkpoint which may not be synced to all leading copies. | ||
// However, this guarantees that the requesting range always be below the local-checkpoint of any leading copies. | ||
final long localCheckpoint = indexShard.getLocalCheckpoint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to use local check point instead of global checkpoint here. Should be just a safe as using global checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, but I don't think we should use the local checkpoint. I understand that it's safe because of the follower semantics and I also understand that the request is not supposed to be bellow the local checkpoint (this can be an assertion, as Jason noted) but I don't think we should rely on it. It's too subtle and difficult to understand. If there's no good reason to use the local checkpoint here (please share if there is, I can't see it ) can we please go back to using the global checkpoint?
We also don't really need to fail the request here but rather return what we have, if we have it (as before).
PS - can you also add a comment that this all best effort and that the true check is done when creating the snapshot? (merge policies etc can change availability of operations)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also don't really need to fail the request here but rather return what we have, if we have it (as before).
This will make the logic a bit more complex in the shard follow task. I prefer to fail here, knowing that the primary copy will have the requested range.
Alternatively the shard follow task can maybe use the global checkpoint of the shard copy with the lowest global checkpoint (liker was discussed in the es-ccr channel last night). Then this problem shouldn't occur either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will make the logic a bit more complex in the shard follow task.
Why is that? we already account for partial results due to byte size limits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is true. I guess I reacted too quick. We already do this correctly, the assumption right now is that byte size limit has been reached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this an assertion in 2a9a200#diff-64da2b915e53a36fdc911178059a02e5R242. The only purpose of this assertion is to make sure that the follower never requests a wrong range. However, we cannot use the global checkpoint here, and I "loosen" the condition by using the local-checkpoint - as the best effort. I am okay to remove this assertion. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bleskes I do not agree that using the local checkpoint is too subtle and difficult to understand; we are relying on fundamental relationships between local and global checkpoints here? The problem is the global checkpoint on the replica is not the global checkpoint, it's only local knowledge (say "local global checkpoint" three times fast) that could be out of date but we know:
local checkpoint on replica >= actual global checkpoint >= global checkpoint on request
and that's why we can have an assertion here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I also don't think we should use the global checkpoint and return partial results here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not agree that using the local checkpoint is too subtle and difficult to understand;
Let me unpack a bit what I meant with subtle. The current approach relies on a well behaved client that follows the pattern of formulating its requests based on some knowledge of the global checkpoint. That behavior is not clear when you look at the ShardChangesAction. It is a part of the shard following task which is not trivial to follow. That's what I meant with subtle. The system is complex and the more we can understand by reading a single file the better.
we are relying on fundamental relationships between local and global checkpoints here?
It is true that if you sample the global checkpoint from somewhere, all local checkpoints of in sync shards are above it and therefore it is safe to trim any request that using it uses a global checkpoint as an upper bound by the local checkpoint of an in-sync shard. It is also true that search requests should never be routed to not-in-sync shards, if we could manage it. Sadly that's not true and I'm not sure how to achieve that without other draw backs that are worse or some schemes that are complicated and will take time to bake.
Search requests are routed based on a cluster state they sample, which may be stale. They use a list of shard copies and prefer to go to active shard but if those fail they will go and try initializing shards. We don't know at what phase of recovery they are. We also don't know what their local checkpoint mean. It is highly likely it will be lower than the local checkpoint of the primary and thus will be safe (based on the behavior of the client), but maybe it's not? maybe it was constructed by a primary that has since failed and it has transferred operations that weren't safe and those aren't rolled back it? I'm not saying that's necessarily broken. I am saying that this gets complicated very quickly and I'm not sure it's right.
Using the local knowledge of the global checkpoint is always safe and is simple to understand. The complicated part is how the global checkpoint is maintained but you don't need to know that.
PS I want to go back to the notion of a well behaved client, from a different angle then complexity. It's true that we are currently building CCR and not the changes API but we do plan to build infrastructure that will power the Changes API (which CCR would be based on if we had it). With that in mind, I would rather avoid adding assumptions to the code that rely on some correctness aspects of the request. The logic can hopefully stay simple - you can ask for anything you want but we're not exposing unsafe ops. Also, this is why the original API was designed to say "give me X operation starting at this point up" (X be a number or size) rather than the current API of "give me this range please". To be clear - I'm OK with the range change (for now - we'll see how the changes API develops) but I want to be conscious of the Changes API and potential implications to it.
Thanks @martijnvg and @jasontedor |
Good catch @dnhatn! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry that I have missed this before it was merged but I left some comments.
// The following shard generates the request based on the global checkpoint which may not be synced to all leading copies. | ||
// However, this guarantees that the requesting range always be below the local-checkpoint of any leading copies. | ||
final long localCheckpoint = indexShard.getLocalCheckpoint(); | ||
if (localCheckpoint < request.minSeqNo || localCheckpoint < request.maxSeqNo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that all we need here is that the local history at least covers the requested range so the opposite of localCheckpoint >= request.maxSeqNo
should be sufficient here as we already validate on the request that minSeqNo < maxSeqNo
. Therefore, I think that this condition can be indexShard.getLocalCheckpoint() < request.maxSeqNo
.
// However, this guarantees that the requesting range always be below the local-checkpoint of any leading copies. | ||
final long localCheckpoint = indexShard.getLocalCheckpoint(); | ||
if (localCheckpoint < request.minSeqNo || localCheckpoint < request.maxSeqNo) { | ||
throw new IllegalStateException("invalid request from_seqno=[" + request.minSeqNo + "], " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should add an assertion here? This should never happen in production because the global checkpoint on the primary exceeds is not more than, by definition, the local checkpoints on all of the in-sync shard copies. This shard copy must be in-sync or it would not be receiving this request and therefore I think we should treat this as a fatal condition? I am not sure if we are being harsh enough here.
final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion(); | ||
request.maxSeqNo = Math.min(request.maxSeqNo, indexShard.getGlobalCheckpoint()); | ||
// The following shard generates the request based on the global checkpoint which may not be synced to all leading copies. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you amend this comment that the primary copy on the follower generates the request based on its knowledge of the global checkpoint on the primary copy on the leader?
This commit clarifies the origin of the global checkpoint that the following shard uses and replaces illegal_state_exc E by an assertion. Relates #30980
@jasontedor I've pushed 2a9a200 to address all your comments. Thanks for an extra look. |
Today before reading operations on the leading shard, we minimization the requesting range with the global checkpoint. However, this might make the request invalid if the following shard generates a requesting range based on the global-checkpoint from a primary shard and sends that request to a replica whose global checkpoint is lagged. Another issue is that we are mutating the request when applying minimization. If the request becomes invalid on a replica, we will reroute the mutated request instead of the original one to the primary. This commit removes the minimization and replaces it by a range check with the local checkpoint.
This commit clarifies the origin of the global checkpoint that the following shard uses and replaces illegal_state_exc E by an assertion. Relates #30980
That looks good to me @dnhatn. Thank you. ❤️ |
Today before reading operations on the leading shard, we apply minimization
the requesting range with the global checkpoint. However, this might
make the request invalid if the following shard generates a requesting
range based on the global-checkpoint from a primary shard and sends that
request to a replica whose global checkpoint is lagged.
Another issue is that we are mutating the request when applying
minimization. If the request becomes invalid on a replica, we will
retry that mutated request on the primary instead of the original one.
This commit removes the minimization and replaces it by a range check
with the local checkpoint.