-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Request Buffering during Sharding #8462
Comments
More thoughts on proactively detecting re-sharding operations and failovers: As explained on the previous comment, right now there's a I think the solution we're looking for is a global map (for each Does this seem reasonable? I think the hardest problem about this is the case where we don't proactively detect an event in the cluster, and instead we receive a failover error from one of the tablets. Converting this tablet (and hence shard-) specific error into a global event that could potentially affect more than one shard seems complicated. How would we handle this? |
|
There are currently two code paths for query execution (Execute and StreamExecute), both should be covered by buffering. It is ok to just make it for that shard event for which the tablet received the error. This way it will accumulate itself if errors are received from more tablets. |
Thank you for pointing this out! Yes, the logic for retries looks copy-and-pasted between the two Gateways, so let's scope this down to |
OK, I've spent this afternoon looking at the health checking code. I have questions about it: using this comment from @rohit-nayak-ps as a reference, here's how he believes we could model the two events (start & end of a resharding):
srvKeyspace, err := ts.GetSrvKeyspace(ctx, cell, keyspace)
if err != nil {
return err
}
for _, partition := range srvKeyspace.GetPartitions() {
for _, shardTabletControl := range partition.GetShardTabletControls() {
if shardTabletControl.QueryServiceDisabled {
// this query needs to buffer
}
}
} Questions about this: we clearly know the keyspace for a plan, but what's its cell? Do we need to use
The only place where |
There might be a simpler way to detect cutover start and stop. In the original comment @rohit-nayak-ps says that when cutover starts we transition the primary of the old shard to NON_SERVING, and when cutover ends we transition the primaries of new shards to SERVING. |
I've been playing with this and it seems complicated because the primaries of the new shards are transitioned asynchronously. The Say that we're attempting to run a query that targets shard X: we see that the primary of shard X is currently not serving, so we start buffering the query. Since we receive shard health updates one by one, we'll eventually receive a health update for a shard we've never seen before (the result of the re-sharding), which won't be serving, and then another update that marks that shard as serving. The issue here is that re-sharding will usually (always?) result in more than one new shard, but we do not know how many new shards are we expecting to see in the The best way to work around this issue is trying to guess when we have access to the whole keyspace for a shard by parsing and putting together the name of the shards -- I am just not sure whether this is always safe to do for all sharding cases (and whether we have existing code in Vitess to parse shard names and see if they add up to the whole keyspace). |
There are some KeyRange/Shard utilities here: |
Eventually we also need to worry about "Merging" shards where we may end up with fewer shards (as few as one). |
Right, I thought about this use case, and from my understanding, the resulting shard names for the merge should also add up to the whole keyspace. Is there any other sharding operation that could result in a partial keyspace and hence couldn't be detected by this heuristic? |
At the keyspace level it should be safe to decide when we have access to the whole keyspace by parsing and putting together the names of the shards. |
I think, we should put this information upfront in the topo server, so that we do not have to assume it. Whenever such operation happens, it is stored in topo server to be discovered by buffering code. |
Following up on the feedback on #8514, I've been researching other approaches to detect these sharding operations proactively that do not depend on the topology server. @harshit-gangal suggested to run the example resharding workflows in the Vitess examples locally and analyze the resulting health check event patterns to see how much information we can gather from it. It's not looking good. For shard initialization: for i in 300 301 302; do
CELL=zone1 TABLET_UID=$i ./scripts/mysqlctl-up.sh
SHARD=-80 CELL=zone1 KEYSPACE=customer TABLET_UID=$i ./scripts/vttablet-up.sh
done
for i in 400 401 402; do
CELL=zone1 TABLET_UID=$i ./scripts/mysqlctl-up.sh
SHARD=80- CELL=zone1 KEYSPACE=customer TABLET_UID=$i ./scripts/vttablet-up.sh
done
vtctldclient InitShardPrimary --force customer/-80 zone1-300
vtctldclient InitShardPrimary --force customer/80- zone1-400 Here are the actual health status changes as seen on the
For the resharding step:
There are no health events in vtgate. This is expected because we're only setting up the resharding streams, not doing a cutover. When switching reads:
There are no health events in vtgate either. This is already a bit fishy. None of the tablets change state (at least not visibly to the When switching writes:
Here's the other not-cromulent thing I'm seeing: there's just a single event in the HealthCheck stream here, the old tablet 200 going from serving to not serving:
This is not quite than what I heard from @rohit-nayak-ps / @deepthi on previous comments:
We can see how the primary of the old shard becomes non-serving, but the primaries of the new shards have been marked as SERVING since they were initially instantiated. Is this an issue with the way our examples are written (i.e. this behavior is not what we'd see in a production cluster)? Or is the previous description just not accurate at all? |
Update on this: We had a meeting last week where we made some decisions on how to approach this design without over-relying on the topology server. The two key points we agreed on are:
I hope this is a good enough summary! Work on step 2 is over, I'm now tackling step 1 which will require changing both vttablet and vtgate. |
Thank you for the update @vmg . This all makes sense to me. |
I have started working on this functionality for Vitess. To ellaborate on the problem, which is already listed in #7061: we want to improve the way we perform buffering to be able to handle more complex resharding situations.
Currently, the main entrypoint for buffering is in
tabletgateway.go
, in theTabletGateway.withRetry
helper that wraps all the method calls for ourQueryService
:vitess/go/vt/vtgate/tabletgateway.go
Lines 216 to 234 in 4bebb8e
Here's a few important things to note:
DiscoveryGateway
, but this whole API has been deprecated in favor ofTabletGateway
, so we can ignore it. 👌master
tabletLet's look at the two critical issues which we intend to fix:
The way buffering works right now is by keeping a table of "buffers" for every keyspace and shard pair. This "shard buffer" is created lazily the first time a query for a given keyspace+shard arrives to the
vtgate
, and by default it is disabled, meaning that requests pass through directly and are not buffered. The first time that a request to a given keyspace/shard returns an error, the retry code inTabletGateway
will retry the buffering before retrying the request, this time passing the upstream error. If the buffering code detects that the upstream error is a failover error (this is explicitly checked atvitess/go/vt/vtgate/buffer/buffer.go
Lines 263 to 290 in 30250f5
This is what we mean by "lazy buffering": the only way a request will start buffering is once it has failed at least once for its keyspace+shard. The retry for this request, and any follow up requests, will then buffer. In an ideal design for this system, the buffering code will be proactively health checking the different shards in the cluster so that we can buffer a request right away once we detect a failover, without having to use an error response as the detection mechanism for the failover.
The other problem with this design is caused by the scoping to keyspace+shard which we've already discussed: this works fine for a master failover in a given shard, but fails critically during re-sharding operations. When we're performing a cutover after re-sharding a Vitess cluster, we're not only handling a master-slave failover for a specific shard, we're potentially changing the number of shards in the cluster and their respective masters. There is no clear way to detect once the failover is finished in a specific shard because the re-sharding operation necessarily affects more than one shard, and the resulting topology for our cluster can potentially force us to retry the buffered query in a different shard than the one we had initially planned before we started buffering.
The suggested fix
The fix that has been agreed on in our discussions so far implies lifting the buffering code so it happens way before the current checks in
TabletGateway
, and consequently, so it is not specifically scoped to a keyspace+shard pair.A suggested location to move our buffering logic would be in the
Executor
for thevtgates
, roughly after the initial planning has succeeded, but before the plan is executed:vitess/go/vt/vtgate/plan_execute.go
Lines 52 to 82 in 30250f5
The idea would be providing an API that takes the suggested plan and verifies whether the shards it affects need buffering (i.e. because they're being re-sharded or in the middle of a failover). If that's the case, we'd buffer the request until we detect that the whole re-sharding operation is complete and then we would retry the planning to ensure the final plan we execute is updated based on the cluster's topology after resharding.
This design assumes that we can implement a pro-active API for the buffering code that can tell us whether a given plan needs buffering before execution and, ideally, also whether the given plan would need to be re-planned after the buffering because it's been affected by a topology change. This second part is not mandatory for correctness, but feels like an important optimization for V2 -- for V1 of this implementation, we can assume that any plan that has been buffered before executing always needs to be re-planned, which is safe to assume.
However: this raises an issue with actual upstream retries. Right now this code is implemented in the
withRetry
helper forTabletGateway
which we've already discussed, and as part of the retrying logic, includes the potential buffering when the error being retried is detected as a failover error. What happens with this retry code once we lift the buffering logic out of it?My assumption is that
TabletGateway
needs to keep its retry logic (mostly because this logic includes the shuffling for the different tablets in a shard so they can be retried when one of them is down), but now we also need to add retrying logic higher in the stack (i.e. where we're now buffering), so that we can enable the buffering based on the return error in cases where we didn't detect a re-sharding or failover proactively. Does this sound correct? Will this extra retry logic introduce any unforseen consequences? For starters, it seems like any failover-related errors inTabletGateway
should be changed to be bubbled up immediately and not retried, but are there any other changes that need to be introduced?@sougou @harshit-gangal @rohit-nayak-ps: this is everything I understand about the problem so far. Could you please read through this and correct me where I'm wrong? I'll keep updating the issue in follow-up comments as I research further.
The text was updated successfully, but these errors were encountered: