From 299e2f86486942540e2a646a1974887b8a837b2b Mon Sep 17 00:00:00 2001 From: Riley Laine Date: Mon, 13 May 2024 10:57:50 -0700 Subject: [PATCH] Test adding vtgates to the pool at runtime Signed-off-by: Riley Laine --- .../vtgateproxytest/rebalance_test_test.go | 170 ++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 go/test/endtoend/vtgateproxytest/rebalance_test_test.go diff --git a/go/test/endtoend/vtgateproxytest/rebalance_test_test.go b/go/test/endtoend/vtgateproxytest/rebalance_test_test.go new file mode 100644 index 00000000000..c5fa92afa1c --- /dev/null +++ b/go/test/endtoend/vtgateproxytest/rebalance_test_test.go @@ -0,0 +1,170 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This tests select/insert using the unshared keyspace added in main_test +*/ +package vtgateproxytest + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "math" + "net" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" +) + +func TestVtgateProxyRebalance(t *testing.T) { + defer cluster.PanicHandler(t) + + const targetAffinity = "use1-az1" + const vtgateCount = 10 + const vtgateproxyConnections = 4 + + vtgates, err := startAdditionalVtgates(vtgateCount) + if err != nil { + t.Fatal(err) + } + defer teardownVtgates(vtgates) + + var config []map[string]string + vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts") + + for i := 0; i < vtgateproxyConnections; i++ { + config = append(config, map[string]string{ + "host": fmt.Sprintf("vtgate%v", i), + "address": clusterInstance.Hostname, + "grpc": strconv.Itoa(vtgates[i].GrpcPort), + "az_id": targetAffinity, + "type": "pool1", + }) + } + + b, err := json.Marshal(config) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + vtgateproxyHTTPPort := clusterInstance.GetAndReservePort() + vtgateproxyGrpcPort := clusterInstance.GetAndReservePort() + vtgateproxyMySQLPort := clusterInstance.GetAndReservePort() + + vtgateproxyProcInstance := NewVtgateProxyProcess( + vtgateHostsFile, + targetAffinity, + vtgateproxyConnections, + vtgateproxyHTTPPort, + vtgateproxyGrpcPort, + vtgateproxyMySQLPort, + ) + if err := vtgateproxyProcInstance.Setup(); err != nil { + t.Fatal(err) + } + defer vtgateproxyProcInstance.Teardown() + + // Use the go mysql driver since the vitess mysql client does not support + // connectionAttributes. + dsn := fmt.Sprintf("tcp(%v)/ks?connectionAttributes=type:pool1,az_id:use1-az1", net.JoinHostPort(clusterInstance.Hostname, strconv.Itoa(vtgateproxyProcInstance.MySQLPort))) + log.Infof("Using DSN %v", dsn) + + conn, err := sql.Open("mysql", dsn) + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + if err := conn.Ping(); err != nil { + t.Fatal(err) + } + + log.Info("Inserting test value") + tx, err := conn.BeginTx(context.Background(), nil) + if err != nil { + t.Fatal(err) + } + + _, err = tx.Exec("insert into customer(id, email) values(1, 'email1')") + if err != nil { + t.Fatal(err) + } + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + + log.Info("Reading test value") + + const totalQueries = 1000 + // Queries should be routed with even distribution, with a 25% buffer to + // prevent test flakiness. + minQueryCount := int(math.Round(float64(totalQueries/vtgateCount) * 0.75)) + + remainingVtgates := len(vtgates) - vtgateproxyConnections + + for i := 0; i < totalQueries; i++ { + addVtgate := i % (totalQueries / remainingVtgates) + if addVtgate == 0 { + j := (i / (totalQueries / remainingVtgates)) + vtgateproxyConnections - 1 + log.Infof("Adding vtgate %v", j) + config = append(config, map[string]string{ + "host": fmt.Sprintf("vtgate%v", j), + "address": clusterInstance.Hostname, + "grpc": strconv.Itoa(vtgates[j].GrpcPort), + "az_id": targetAffinity, + "type": "pool1", + }) + b, err = json.Marshal(config) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + time.Sleep(1 * time.Second) + } + + result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer") + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, []customerEntry{{1, "email1"}}, result) + } + + for i, vtgate := range vtgates { + queryCount, err := getVtgateQueryCount(vtgate) + if err != nil { + t.Fatal(err) + } + + log.Infof("vtgate %v query counts: %+v", i, queryCount) + + assert.GreaterOrEqual(t, queryCount.Sum(), minQueryCount, "vtgate %v did not recieve the expected number of queries", i) + } +}