diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 78c427766a1..dc5b10e0512 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -28,6 +28,8 @@ Flags: --allow-kill-statement Allows the execution of kill statement --allowed_tablet_types strings Specifies the tablet types this vtgate is allowed to route queries to. Should be provided as a comma-separated set of tablet types. --alsologtostderr log to standard error as well as files + --balancer-keyspaces strings When in balanced mode, a comma-separated list of keyspaces for which to use the balancer (optional) + --balancer-vtgate-cells strings When in balanced mode, a comma-separated list of cells that contain vtgates (required) --bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system. --buffer_drain_concurrency int Maximum number of requests retried simultaneously. More concurrency will increase the load on the PRIMARY vttablet when draining the buffer. (default 1) --buffer_keyspace_shards string If not empty, limit buffering to these entries (comma separated). Entry format: keyspace or keyspace/shard. Requires --enable_buffer=true. @@ -53,6 +55,7 @@ Flags: --discovery_high_replication_lag_minimum_serving duration Threshold above which replication lag is considered too high when applying the min_number_serving_vttablets flag. (default 2h0m0s) --discovery_low_replication_lag duration Threshold below which replication lag is considered low enough to be healthy. (default 30s) --emit_stats If set, emit stats to push-based monitoring and stats backends + --enable-balancer Enable the tablet balancer to evenly spread query load for a given tablet type --enable-partial-keyspace-migration (Experimental) Follow shard routing rules: enable only while migrating a keyspace shard by shard. See documentation on Partial MoveTables for more. (default false) --enable-views Enable views support in vtgate. --enable_buffer Enable buffering (stalling) of primary traffic during failovers. diff --git a/go/vt/vtgate/balancer/balancer.go b/go/vt/vtgate/balancer/balancer.go new file mode 100644 index 00000000000..bfe85194c05 --- /dev/null +++ b/go/vt/vtgate/balancer/balancer.go @@ -0,0 +1,367 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package balancer + +import ( + "encoding/json" + "fmt" + "math/rand/v2" + "net/http" + "sync" + + "vitess.io/vitess/go/vt/discovery" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +/* + +The tabletBalancer probabalistically orders the list of available tablets into +a ranked order of preference in order to satisfy two high-level goals: + +1. Balance the load across the available replicas +2. Prefer a replica in the same cell as the vtgate if possible + +In some topologies this is trivial to accomplish by simply preferring tablets in the +local cell, assuming there are a proportional number of local tablets in each cell to +satisfy the inbound traffic to the vtgates in that cell. + +However, for topologies with a relatively small number of tablets in each cell, a simple +affinity algorithm does not effectively balance the load. + +As a simple example: + + Given three cells with vtgates, four replicas spread into those cells, where each vtgate + receives an equal query share. If each routes only to its local cell, the tablets will be + unbalanced since two of them receive 1/3 of the queries, but the two replicas in the same + cell will only receive 1/6 of the queries. + + Cell A: 1/3 --> vtgate --> 1/3 => vttablet + + Cell B: 1/3 --> vtgate --> 1/3 => vttablet + + Cell C: 1/3 --> vtgate --> 1/6 => vttablet + \-> 1/6 => vttablet + +Other topologies that can cause similar pathologies include cases where there may be cells +containing replicas but no local vtgates, and/or cells that have only vtgates but no replicas. + +For these topologies, the tabletBalancer proportionally assigns the output flow to each tablet, +preferring the local cell where possible, but only as long as the global query balance is +maintained. + +To accomplish this goal, the balancer is given: + +* The list of cells that receive inbound traffic to vtgates +* The local cell where the vtgate exists +* The set of tablets and their cells (learned from discovery) + +The model assumes there is an equal probablility of a query coming from each vtgate cell, i.e. +traffic is effectively load balanced between the cells with vtgates. + +Given that information, the balancer builds a simple model to determine how much query load +would go to each tablet if vtgate only routed to its local cell. Then if any tablets are +unbalanced, it shifts the desired allocation away from the local cell preference in order to +even out the query load. + +Based on this global model, the vtgate then probabalistically picks a destination for each +query to be sent and uses these weights to order the available tablets accordingly. + +Assuming each vtgate is configured with and discovers the same information about the topology, +and the input flow is balanced across the vtgate cells (as mentioned above), then each vtgate +should come the the same conclusion about the global flows, and cooperatively should +converge on the desired balanced query load. + +*/ + +type TabletBalancer interface { + // Pick is the main entry point to the balancer. Returns the best tablet out of the list + // for a given query to maintain the desired balanced allocation over multiple executions. + Pick(target *querypb.Target, tablets []*discovery.TabletHealth) *discovery.TabletHealth + + // DebugHandler provides a summary of tablet balancer state + DebugHandler(w http.ResponseWriter, r *http.Request) +} + +func NewTabletBalancer(localCell string, vtGateCells []string) TabletBalancer { + return &tabletBalancer{ + localCell: localCell, + vtGateCells: vtGateCells, + allocations: map[discovery.KeyspaceShardTabletType]*targetAllocation{}, + } +} + +type tabletBalancer struct { + // + // Configuration + // + + // The local cell for the vtgate + localCell string + + // The set of cells that have vtgates + vtGateCells []string + + // mu protects the allocation map + mu sync.Mutex + + // + // Allocations for balanced mode, calculated once per target and invalidated + // whenever the topology changes. + // + allocations map[discovery.KeyspaceShardTabletType]*targetAllocation +} + +type targetAllocation struct { + // Target flow per cell based on the number of tablets discovered in the cell + Target map[string]int // json:target + + // Input flows allocated for each cell + Inflows map[string]int + + // Output flows from each vtgate cell to each target cell + Outflows map[string]map[string]int + + // Allocation routed to each tablet from the local cell used for ranking + Allocation map[uint32]int + + // Tablets that local cell does not route to + Unallocated map[uint32]struct{} + + // Total allocation which is basically 1,000,000 / len(vtgatecells) + TotalAllocation int +} + +func (b *tabletBalancer) print() string { + allocations, _ := json.Marshal(&b.allocations) + return fmt.Sprintf("LocalCell: %s, VtGateCells: %s, allocations: %s", + b.localCell, b.vtGateCells, string(allocations)) +} + +func (b *tabletBalancer) DebugHandler(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/plain") + fmt.Fprintf(w, "Local Cell: %v\r\n", b.localCell) + fmt.Fprintf(w, "Vtgate Cells: %v\r\n", b.vtGateCells) + + b.mu.Lock() + defer b.mu.Unlock() + allocations, _ := json.MarshalIndent(b.allocations, "", " ") + fmt.Fprintf(w, "Allocations: %v\r\n", string(allocations)) +} + +// Pick is the main entry point to the balancer. +// +// Given the total allocation for the set of tablets, choose the best target +// by a weighted random sample so that over time the system will achieve the +// desired balanced allocation. +func (b *tabletBalancer) Pick(target *querypb.Target, tablets []*discovery.TabletHealth) *discovery.TabletHealth { + + numTablets := len(tablets) + if numTablets == 0 { + return nil + } + + allocationMap, totalAllocation := b.getAllocation(target, tablets) + + r := rand.IntN(totalAllocation) + for i := 0; i < numTablets; i++ { + flow := allocationMap[tablets[i].Tablet.Alias.Uid] + if r < flow { + return tablets[i] + } + r -= flow + } + + return tablets[0] +} + +// To stick with integer arithmetic, use 1,000,000 as the full load +const ALLOCATION = 1000000 + +func (b *tabletBalancer) allocateFlows(allTablets []*discovery.TabletHealth) *targetAllocation { + // Initialization: Set up some data structures and derived values + a := targetAllocation{ + Target: map[string]int{}, + Inflows: map[string]int{}, + Outflows: map[string]map[string]int{}, + Allocation: map[uint32]int{}, + Unallocated: map[uint32]struct{}{}, + } + flowPerVtgateCell := ALLOCATION / len(b.vtGateCells) + flowPerTablet := ALLOCATION / len(allTablets) + cellExistsWithNoTablets := false + + for _, th := range allTablets { + a.Target[th.Tablet.Alias.Cell] += flowPerTablet + } + + // + // First pass: Allocate vtgate flow to the local cell where the vtgate exists + // and along the way figure out if there are any vtgates with no local tablets. + // + for _, cell := range b.vtGateCells { + outflow := map[string]int{} + target := a.Target[cell] + + if target > 0 { + a.Inflows[cell] += flowPerVtgateCell + outflow[cell] = flowPerVtgateCell + } else { + cellExistsWithNoTablets = true + } + + a.Outflows[cell] = outflow + } + + // + // Figure out if there is a shortfall + // + underAllocated := make(map[string]int) + unbalancedFlow := 0 + for cell, allocation := range a.Target { + if a.Inflows[cell] < allocation { + underAllocated[cell] = allocation - a.Inflows[cell] + unbalancedFlow += underAllocated[cell] + } + } + + // + // Second pass: if there are any vtgates with no local tablets, allocate the underallocated amount + // proportionally to all cells that may need it + // + if cellExistsWithNoTablets { + for _, vtgateCell := range b.vtGateCells { + target := a.Target[vtgateCell] + if target != 0 { + continue + } + + for underAllocatedCell, underAllocatedFlow := range underAllocated { + allocation := flowPerVtgateCell * underAllocatedFlow / unbalancedFlow + a.Inflows[underAllocatedCell] += allocation + a.Outflows[vtgateCell][underAllocatedCell] += allocation + } + } + + // Recompute underallocated after these flows were assigned + unbalancedFlow = 0 + underAllocated = make(map[string]int) + for cell, allocation := range a.Target { + if a.Inflows[cell] < allocation { + underAllocated[cell] = allocation - a.Inflows[cell] + unbalancedFlow += underAllocated[cell] + } + } + } + + // + // Third pass: Shift remaining imbalance if any cell is over/under allocated after + // assigning local cell traffic and distributing load from cells without tablets. + // + if /* fudge for integer arithmetic */ unbalancedFlow > 10 { + + // cells which are overallocated + overAllocated := make(map[string]int) + for cell, allocation := range a.Target { + if a.Inflows[cell] > allocation { + overAllocated[cell] = a.Inflows[cell] - allocation + } + } + + // fmt.Printf("outflows %v over %v under %v\n", a.Outflows, overAllocated, underAllocated) + + // + // For each overallocated cell, proportionally shift flow from targets that are overallocated + // to targets that are underallocated. + // + // Note this is an O(N^3) loop, but only over the cells which need adjustment. + // + for _, vtgateCell := range b.vtGateCells { + for underAllocatedCell, underAllocatedFlow := range underAllocated { + for overAllocatedCell, overAllocatedFlow := range overAllocated { + + currentFlow := a.Outflows[vtgateCell][overAllocatedCell] + if currentFlow == 0 { + continue + } + + // Shift a proportional fraction of the amount that the cell is currently allocated weighted + // by the fraction that this vtgate cell is already sending to the overallocated cell, and the + // fraction that the new target is underallocated + // + // Note that the operator order matters -- multiplications need to occur before divisions + // to avoid truncating the integer values. + shiftFlow := overAllocatedFlow * currentFlow * underAllocatedFlow / a.Inflows[overAllocatedCell] / unbalancedFlow + + //fmt.Printf("shift %d %s %s -> %s (over %d current %d in %d under %d unbalanced %d) \n", shiftFlow, vtgateCell, overAllocatedCell, underAllocatedCell, + // overAllocatedFlow, currentFlow, a.Inflows[overAllocatedCell], underAllocatedFlow, unbalancedFlow) + + a.Outflows[vtgateCell][overAllocatedCell] -= shiftFlow + a.Inflows[overAllocatedCell] -= shiftFlow + + a.Inflows[underAllocatedCell] += shiftFlow + a.Outflows[vtgateCell][underAllocatedCell] += shiftFlow + } + } + } + } + + // + // Finally, once the cell flows are all adjusted, figure out the local allocation to each + // tablet in the target cells + // + outflow := a.Outflows[b.localCell] + for _, tablet := range allTablets { + cell := tablet.Tablet.Alias.Cell + flow := outflow[cell] + if flow > 0 { + a.Allocation[tablet.Tablet.Alias.Uid] = flow * flowPerTablet / a.Target[cell] + a.TotalAllocation += flow * flowPerTablet / a.Target[cell] + } else { + a.Unallocated[tablet.Tablet.Alias.Uid] = struct{}{} + } + } + + return &a +} + +// getAllocation builds the allocation map if needed and returns a copy of the map +func (b *tabletBalancer) getAllocation(target *querypb.Target, tablets []*discovery.TabletHealth) (map[uint32]int, int) { + b.mu.Lock() + defer b.mu.Unlock() + + allocation, exists := b.allocations[discovery.KeyFromTarget(target)] + if exists && (len(allocation.Allocation)+len(allocation.Unallocated)) == len(tablets) { + mismatch := false + for _, tablet := range tablets { + if _, ok := allocation.Allocation[tablet.Tablet.Alias.Uid]; !ok { + if _, ok := allocation.Unallocated[tablet.Tablet.Alias.Uid]; !ok { + mismatch = true + break + } + } + } + if !mismatch { + // No change in tablets for this target. Return computed allocation + return allocation.Allocation, allocation.TotalAllocation + } + } + + allocation = b.allocateFlows(tablets) + b.allocations[discovery.KeyFromTarget(target)] = allocation + + return allocation.Allocation, allocation.TotalAllocation +} diff --git a/go/vt/vtgate/balancer/balancer_test.go b/go/vt/vtgate/balancer/balancer_test.go new file mode 100644 index 00000000000..1c6a72421fc --- /dev/null +++ b/go/vt/vtgate/balancer/balancer_test.go @@ -0,0 +1,371 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package balancer + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/vt/discovery" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" +) + +var nextTestTabletUID int + +func createTestTablet(cell string) *discovery.TabletHealth { + nextTestTabletUID++ + tablet := topo.NewTablet(uint32(nextTestTabletUID), cell, strconv.Itoa(nextTestTabletUID)) + tablet.PortMap["vt"] = 1 + tablet.PortMap["grpc"] = 2 + tablet.Keyspace = "k" + tablet.Shard = "s" + + return &discovery.TabletHealth{ + Tablet: tablet, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Serving: false, + Stats: nil, + PrimaryTermStartTime: 0, + } +} + +func TestAllocateFlows(t *testing.T) { + cases := []struct { + test string + tablets []*discovery.TabletHealth + vtgateCells []string + }{ + { + "balanced one tablet per cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d"}, + }, + { + "balanced multiple tablets per cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d"}, + }, + { + "vtgate in cell with no tablets", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d", "e"}, + }, + { + "vtgates in multiple cells with no tablets", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d", "e", "f", "g"}, + }, + { + "imbalanced multiple tablets in one cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + }, + []string{"a", "b", "c"}, + }, + { + "imbalanced multiple tablets in multiple cells", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + createTestTablet("d"), + createTestTablet("d"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d"}, + }, + { + "heavy imbalance", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("c"), + }, + []string{"a", "b", "c", "d"}, + }, + } + + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + + for _, c := range cases { + t.Logf("\n\nTest Case: %s\n\n", c.test) + + tablets := c.tablets + vtGateCells := c.vtgateCells + + tabletsByCell := make(map[string][]*discovery.TabletHealth) + for _, tablet := range tablets { + cell := tablet.Tablet.Alias.Cell + tabletsByCell[cell] = append(tabletsByCell[cell], tablet) + } + + allocationPerTablet := make(map[uint32]int) + expectedPerTablet := ALLOCATION / len(tablets) + + expectedPerCell := make(map[string]int) + for cell := range tabletsByCell { + expectedPerCell[cell] = ALLOCATION / len(tablets) * len(tabletsByCell[cell]) + } + + // Run the balancer over each vtgate cell + for _, localCell := range vtGateCells { + b := NewTabletBalancer(localCell, vtGateCells).(*tabletBalancer) + a := b.allocateFlows(tablets) + b.allocations[discovery.KeyFromTarget(target)] = a + + t.Logf("Target Flows %v, Balancer: %s, Allocations: %v \n", expectedPerCell, b.print(), b.allocations) + + // Accumulate all the output per tablet cell + outflowPerCell := make(map[string]int) + for _, outflow := range a.Outflows { + for tabletCell, flow := range outflow { + assert.GreaterOrEqual(t, flow, 0, b.print()) + outflowPerCell[tabletCell] += flow + } + } + + // Check in / out flow to each tablet cell + for cell := range tabletsByCell { + expectedForCell := expectedPerCell[cell] + + assert.InEpsilonf(t, expectedForCell, a.Inflows[cell], 0.01, + "did not allocate correct inflow to cell %s. Balancer {%s} ExpectedPerCell {%v}", + cell, b.print(), expectedPerCell) + assert.InEpsilonf(t, expectedForCell, outflowPerCell[cell], 0.01, + "did not allocate correct outflow to cell %s. Balancer {%s} ExpectedPerCell {%v}", + cell, b.print(), expectedPerCell) + } + + // Accumulate the allocations for all runs to compare what the system does as a whole + // when routing from all vtgate cells + for uid, flow := range a.Allocation { + allocationPerTablet[uid] += flow + } + } + + // Check that the allocations all add up + for _, tablet := range tablets { + uid := tablet.Tablet.Alias.Uid + + allocation := allocationPerTablet[uid] + assert.InEpsilonf(t, expectedPerTablet, allocation, 0.01, + "did not allocate full allocation to tablet %d", uid) + } + } +} + +func TestBalancedPick(t *testing.T) { + cases := []struct { + test string + tablets []*discovery.TabletHealth + vtgateCells []string + }{ + { + "simple balanced", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + + []string{"a", "b", "c", "d"}, + }, + { + "simple unbalanced", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + + []string{"a", "b", "c", "d"}, + }, + { + "mixed unbalanced", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("c"), + }, + + []string{"a", "b", "c", "d"}, + }, + { + "one target same cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + }, + + []string{"a"}, + }, + { + "one target other cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + }, + + []string{"b", "c", "d"}, + }, + } + + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + for _, c := range cases { + t.Logf("\n\nTest Case: %s\n\n", c.test) + + tablets := c.tablets + vtGateCells := c.vtgateCells + + // test unbalanced distribution + + routed := make(map[uint32]int) + + expectedPerCell := make(map[string]int) + for _, tablet := range tablets { + cell := tablet.Tablet.Alias.Cell + expectedPerCell[cell] += ALLOCATION / len(tablets) + } + + // Run the algorithm a bunch of times to get a random enough sample + N := 1000000 + for _, localCell := range vtGateCells { + b := NewTabletBalancer(localCell, vtGateCells).(*tabletBalancer) + + for i := 0; i < N/len(vtGateCells); i++ { + th := b.Pick(target, tablets) + if i == 0 { + t.Logf("Target Flows %v, Balancer: %s\n", expectedPerCell, b.print()) + } + + routed[th.Tablet.Alias.Uid]++ + } + } + + expected := N / len(tablets) + delta := make(map[uint32]int) + for _, tablet := range tablets { + got := routed[tablet.Tablet.Alias.Uid] + delta[tablet.Tablet.Alias.Uid] = got - expected + assert.InEpsilonf(t, expected, got, 0.01, + "routing to tablet %d", tablet.Tablet.Alias.Uid) + } + } +} + +func TestTopologyChanged(t *testing.T) { + allTablets := []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("b"), + } + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + + b := NewTabletBalancer("b", []string{"a", "b"}).(*tabletBalancer) + + N := 1 + + // initially create a slice of tablets with just the two in cell a + tablets := allTablets + tablets = tablets[0:2] + + for i := 0; i < N; i++ { + th := b.Pick(target, tablets) + allocation, totalAllocation := b.getAllocation(target, tablets) + + assert.Equalf(t, ALLOCATION/2, totalAllocation, "totalAllocation mismatch %s", b.print()) + assert.Equalf(t, ALLOCATION/4, allocation[th.Tablet.Alias.Uid], "allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell) + assert.Equalf(t, "a", th.Tablet.Alias.Cell, "shuffle promoted wrong tablet from cell %s", allTablets[0].Tablet.Alias.Cell) + } + + // Run again with the full topology. Now traffic should go to cell b + for i := 0; i < N; i++ { + th := b.Pick(target, allTablets) + + allocation, totalAllocation := b.getAllocation(target, allTablets) + + assert.Equalf(t, ALLOCATION/2, totalAllocation, "totalAllocation mismatch %s", b.print()) + assert.Equalf(t, ALLOCATION/4, allocation[th.Tablet.Alias.Uid], "allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell) + assert.Equalf(t, "b", th.Tablet.Alias.Cell, "shuffle promoted wrong tablet from cell %s", allTablets[0].Tablet.Alias.Cell) + } + + // Run again with a node in the topology replaced. + newTablet := createTestTablet("b") + allTablets[2] = newTablet + for i := 0; i < N; i++ { + th := b.Pick(target, allTablets) + + allocation, totalAllocation := b.getAllocation(target, allTablets) + + assert.Equalf(t, ALLOCATION/2, totalAllocation, "totalAllocation mismatch %s", b.print()) + assert.Equalf(t, ALLOCATION/4, allocation[th.Tablet.Alias.Uid], "allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell) + assert.Equalf(t, "b", th.Tablet.Alias.Cell, "shuffle promoted wrong tablet from cell %s", allTablets[0].Tablet.Alias.Cell) + } + +} diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index a5e180c4ddf..2e224b6379e 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -19,8 +19,10 @@ package vtgate import ( "context" "fmt" - "math/rand" + "math/rand/v2" + "net/http" "runtime/debug" + "slices" "sort" "sync" "sync/atomic" @@ -31,11 +33,13 @@ import ( "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/balancer" "vitess.io/vitess/go/vt/vtgate/buffer" "vitess.io/vitess/go/vt/vttablet/queryservice" @@ -52,6 +56,13 @@ var ( initialTabletTimeout = 30 * time.Second // retryCount is the number of times a query will be retried on error retryCount = 2 + + // configuration flags for the tablet balancer + balancerEnabled bool + balancerVtgateCells []string + balancerKeyspaces []string + + logCollations = logutil.NewThrottledLogger("CollationInconsistent", 1*time.Minute) ) func init() { @@ -59,6 +70,9 @@ func init() { fs.StringVar(&CellsToWatch, "cells_to_watch", "", "comma-separated list of cells for watching tablets") fs.DurationVar(&initialTabletTimeout, "gateway_initial_tablet_timeout", 30*time.Second, "At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type") fs.IntVar(&retryCount, "retry-count", 2, "retry count") + fs.BoolVar(&balancerEnabled, "enable-balancer", false, "Enable the tablet balancer to evenly spread query load for a given tablet type") + fs.StringSliceVar(&balancerVtgateCells, "balancer-vtgate-cells", []string{}, "When in balanced mode, a comma-separated list of cells that contain vtgates (required)") + fs.StringSliceVar(&balancerKeyspaces, "balancer-keyspaces", []string{}, "When in balanced mode, a comma-separated list of keyspaces for which to use the balancer (optional)") }) } @@ -81,6 +95,9 @@ type TabletGateway struct { // buffer, if enabled, buffers requests during a detected PRIMARY failover. buffer *buffer.Buffer + + // balancer used for routing to tablets + balancer balancer.TabletBalancer } func createHealthCheck(ctx context.Context, retryDelay, timeout time.Duration, ts *topo.Server, cell, cellsToWatch string) discovery.HealthCheck { @@ -109,6 +126,9 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop statusAggregators: make(map[string]*TabletStatusAggregator), } gw.setupBuffering(ctx) + if balancerEnabled { + gw.setupBalancer(ctx) + } gw.QueryService = queryservice.Wrap(nil, gw.withRetry) return gw } @@ -142,6 +162,13 @@ func (gw *TabletGateway) setupBuffering(ctx context.Context) { }(bufferCtx, ksChan, gw.buffer) } +func (gw *TabletGateway) setupBalancer(ctx context.Context) { + if len(balancerVtgateCells) == 0 { + log.Exitf("balancer-vtgate-cells is required for balanced mode") + } + gw.balancer = balancer.NewTabletBalancer(gw.localCell, balancerVtgateCells) +} + // QueryServiceByAlias satisfies the Gateway interface func (gw *TabletGateway) QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) { qs, err := gw.hc.TabletConnection(alias, target) @@ -217,6 +244,15 @@ func (gw *TabletGateway) CacheStatus() TabletCacheStatusList { return res } +func (gw *TabletGateway) DebugBalancerHandler(w http.ResponseWriter, r *http.Request) { + if balancerEnabled { + gw.balancer.DebugHandler(w, r) + } else { + w.Header().Set("Content-Type", "text/plain") + w.Write([]byte("not enabled")) + } +} + // withRetry gets available connections and executes the action. If there are retryable errors, // it retries retryCount times before failing. It does not retry if the connection is in // the middle of a transaction. While returning the error check if it maybe a result of @@ -303,16 +339,35 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, break } - gw.shuffleTablets(gw.localCell, tablets) - var th *discovery.TabletHealth - // skip tablets we tried before - for _, t := range tablets { - if _, ok := invalidTablets[topoproto.TabletAliasString(t.Tablet.Alias)]; !ok { - th = t - break + + useBalancer := balancerEnabled + if balancerEnabled && len(balancerKeyspaces) > 0 { + useBalancer = slices.Contains(balancerKeyspaces, target.Keyspace) + } + if useBalancer { + // filter out the tablets that we've tried before (if any), then pick the best one + if len(invalidTablets) > 0 { + tablets = slices.DeleteFunc(tablets, func(t *discovery.TabletHealth) bool { + _, isInvalid := invalidTablets[topoproto.TabletAliasString(t.Tablet.Alias)] + return isInvalid + }) + } + + th = gw.balancer.Pick(target, tablets) + + } else { + gw.shuffleTablets(gw.localCell, tablets) + + // skip tablets we tried before + for _, t := range tablets { + if _, ok := invalidTablets[topoproto.TabletAliasString(t.Tablet.Alias)]; !ok { + th = t + break + } } } + if th == nil { // do not override error from last attempt. if err == nil { @@ -400,13 +455,13 @@ func (gw *TabletGateway) shuffleTablets(cell string, tablets []*discovery.Tablet // shuffle in same cell tablets for i := sameCellMax; i > 0; i-- { - swap := rand.Intn(i + 1) + swap := rand.IntN(i + 1) tablets[i], tablets[swap] = tablets[swap], tablets[i] } // shuffle in diff cell tablets for i, diffCellMin := length-1, sameCellMax+1; i > diffCellMin; i-- { - swap := rand.Intn(i-sameCellMax) + diffCellMin + swap := rand.IntN(i-sameCellMax) + diffCellMin tablets[i], tablets[swap] = tablets[swap], tablets[i] } } diff --git a/go/vt/vtgate/tabletgateway_test.go b/go/vt/vtgate/tabletgateway_test.go index 32d18dcc9ab..b1e79b7803d 100644 --- a/go/vt/vtgate/tabletgateway_test.go +++ b/go/vt/vtgate/tabletgateway_test.go @@ -34,6 +34,7 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/sandboxconn" ) func TestTabletGatewayExecute(t *testing.T) { @@ -41,7 +42,10 @@ func TestTabletGatewayExecute(t *testing.T) { testTabletGatewayGeneric(t, ctx, func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error { _, err := tg.Execute(ctx, target, "query", nil, 0, 0, nil) return err - }) + }, + func(t *testing.T, sc *sandboxconn.SandboxConn, want int64) { + assert.Equal(t, want, sc.ExecCount.Load()) + }) testTabletGatewayTransact(t, ctx, func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error { _, err := tg.Execute(ctx, target, "query", nil, 1, 0, nil) return err @@ -55,7 +59,10 @@ func TestTabletGatewayExecuteStream(t *testing.T) { return nil }) return err - }) + }, + func(t *testing.T, sc *sandboxconn.SandboxConn, want int64) { + assert.Equal(t, want, sc.ExecCount.Load()) + }) } func TestTabletGatewayBegin(t *testing.T) { @@ -63,7 +70,10 @@ func TestTabletGatewayBegin(t *testing.T) { testTabletGatewayGeneric(t, ctx, func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error { _, err := tg.Begin(ctx, target, nil) return err - }) + }, + func(t *testing.T, sc *sandboxconn.SandboxConn, want int64) { + assert.Equal(t, want, sc.BeginCount.Load()) + }) } func TestTabletGatewayCommit(t *testing.T) { @@ -87,7 +97,11 @@ func TestTabletGatewayBeginExecute(t *testing.T) { testTabletGatewayGeneric(t, ctx, func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error { _, _, err := tg.BeginExecute(ctx, target, nil, "query", nil, 0, nil) return err - }) + }, + func(t *testing.T, sc *sandboxconn.SandboxConn, want int64) { + t.Helper() + assert.Equal(t, want, sc.BeginCount.Load()) + }) } func TestTabletGatewayShuffleTablets(t *testing.T) { @@ -177,7 +191,20 @@ func TestTabletGatewayReplicaTransactionError(t *testing.T) { verifyContainsError(t, err, "query service can only be used for non-transactional queries on replicas", vtrpcpb.Code_INTERNAL) } -func testTabletGatewayGeneric(t *testing.T, ctx context.Context, f func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error) { +func testTabletGatewayGeneric(t *testing.T, ctx context.Context, f func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error, verifyExpectedCount func(t *testing.T, sc *sandboxconn.SandboxConn, want int64)) { + t.Helper() + testTabletGatewayGenericHelper(t, ctx, f, verifyExpectedCount) + + // test again with the balancer enabled assuming vtgates in both cells where there + // are tablets, so that it will still route to the local cell always, but this way + // it will test both implementations of skipping invalid tablets for retry + balancerEnabled = true + balancerVtgateCells = []string{"cell", "cell2"} + testTabletGatewayGenericHelper(t, ctx, f, verifyExpectedCount) + balancerEnabled = false +} + +func testTabletGatewayGenericHelper(t *testing.T, ctx context.Context, f func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error, verifyExpectedCount func(t *testing.T, sc *sandboxconn.SandboxConn, want int64)) { t.Helper() keyspace := "ks" shard := "0" @@ -193,7 +220,6 @@ func testTabletGatewayGeneric(t *testing.T, ctx context.Context, f func(ctx cont ts := &fakeTopoServer{} tg := NewTabletGateway(ctx, hc, ts, "cell") defer tg.Close(ctx) - // no tablet want := []string{"target: ks.0.replica", `no healthy tablet available for 'keyspace:"ks" shard:"0" tablet_type:REPLICA`} err := f(ctx, tg, target) @@ -217,31 +243,50 @@ func testTabletGatewayGeneric(t *testing.T, ctx context.Context, f func(ctx cont sc2 := hc.AddTestTablet("cell", host, port+1, keyspace, shard, tabletType, true, 10, nil) sc1.MustFailCodes[vtrpcpb.Code_FAILED_PRECONDITION] = 1 sc2.MustFailCodes[vtrpcpb.Code_FAILED_PRECONDITION] = 1 - err = f(ctx, tg, target) verifyContainsError(t, err, "target: ks.0.replica", vtrpcpb.Code_FAILED_PRECONDITION) + verifyExpectedCount(t, sc1, 1) + verifyExpectedCount(t, sc2, 1) // fatal error hc.Reset() sc1 = hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil) - sc2 = hc.AddTestTablet("cell", host, port+1, keyspace, shard, tabletType, true, 10, nil) + sc2 = hc.AddTestTablet("cell2", host, port+1, keyspace, shard, tabletType, true, 10, nil) sc1.MustFailCodes[vtrpcpb.Code_FAILED_PRECONDITION] = 1 sc2.MustFailCodes[vtrpcpb.Code_FAILED_PRECONDITION] = 1 err = f(ctx, tg, target) verifyContainsError(t, err, "target: ks.0.replica", vtrpcpb.Code_FAILED_PRECONDITION) + verifyExpectedCount(t, sc1, 1) + verifyExpectedCount(t, sc2, 1) // server error - no retry hc.Reset() sc1 = hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil) + sc2 = hc.AddTestTablet("cell2", host, port+1, keyspace, shard, tabletType, true, 10, nil) sc1.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 err = f(ctx, tg, target) assert.Equal(t, vtrpcpb.Code_INVALID_ARGUMENT, vterrors.Code(err)) + verifyExpectedCount(t, sc1, 1) + verifyExpectedCount(t, sc2, 0) // no failure hc.Reset() - hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil) + sc1 = hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil) + sc2 = hc.AddTestTablet("cell2", host, port, keyspace, shard, tabletType, true, 10, nil) + err = f(ctx, tg, target) + assert.NoError(t, err) + verifyExpectedCount(t, sc1, 0) + verifyExpectedCount(t, sc2, 1) + + // retry successful to other cell + hc.Reset() + sc1 = hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil) + sc2 = hc.AddTestTablet("cell2", host, port+1, keyspace, shard, tabletType, true, 10, nil) + sc1.MustFailCodes[vtrpcpb.Code_FAILED_PRECONDITION] = 1 err = f(ctx, tg, target) assert.NoError(t, err) + verifyExpectedCount(t, sc1, 1) + verifyExpectedCount(t, sc2, 1) } func testTabletGatewayTransact(t *testing.T, ctx context.Context, f func(ctx context.Context, tg *TabletGateway, target *querypb.Target) error) { diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index fa895f10913..726cf527ed2 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -372,6 +372,7 @@ func Init( }) vtgateInst.registerDebugHealthHandler() vtgateInst.registerDebugEnvHandler() + vtgateInst.registerDebugBalancerHandler() initAPI(gw.hc) return vtgateInst @@ -436,6 +437,12 @@ func (vtg *VTGate) registerDebugHealthHandler() { }) } +func (vtg *VTGate) registerDebugBalancerHandler() { + http.HandleFunc("/debug/balancer", func(w http.ResponseWriter, r *http.Request) { + vtg.Gateway().DebugBalancerHandler(w, r) + }) +} + // IsHealthy returns nil if server is healthy. // Otherwise, it returns an error indicating the reason. func (vtg *VTGate) IsHealthy() error { diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index ad45487e893..c4aef5ba0eb 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -397,7 +397,7 @@ func TestGetKeyspaceShardsToWatch(t *testing.T) { res, err := getKeyspaceShardsToWatch() assert.NoError(t, err) - assert.EqualValues(t, testcase.expected, res) + assert.ElementsMatch(t, testcase.expected, res) }) } }