Skip to content

Commit

Permalink
Enhance simple multi tenant test to validate routing better
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Mar 19, 2024
1 parent e9b9ab2 commit 434d5c0
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 107 deletions.
317 changes: 210 additions & 107 deletions go/test/endtoend/vreplication/multi_tenant_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,34 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
Consists of two tests. Both tests are for multi-tenant migration scenarios.
1. TestMultiTenantSimple: migrates a single tenant to a target keyspace.
2. TestMultiTenantComplex: migrates multiple tenants to a single target keyspace, with concurrent migrations.
The tests use the MoveTables workflow to migrate the tenants. They are designed to simulate a real-world multi-tenant
migration scenario, where each tenant is in a separate database.
*/

package vreplication

import (
"encoding/json"
"fmt"
"math/rand/v2"
"strconv"
Expand All @@ -11,8 +39,10 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

type tenantMigrationStatus int
Expand Down Expand Up @@ -72,6 +102,126 @@ const (
`
)

// TestMultiTenantSimple tests a single tenant migration. The aim here is to test all the steps of the migration process
// including keyspace routing rules, addition of tenant filters to the forward and reverse vreplication streams, and
// verifying that the data is migrated correctly.
func TestMultiTenantSimple(t *testing.T) {
setSidecarDBName("_vt")
// Don't create RDONLY tablets to reduce number of tablets created to reduce resource requirements for the test.
origDefaultRdonly := defaultRdonly
defer func() {
defaultRdonly = origDefaultRdonly
}()
defaultRdonly = 0
vc = setupMinimalCluster(t)
defer vc.TearDown()

targetKeyspace := "mt"
_, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, targetKeyspace, "0", mtVSchema, mtSchema, 1, 0, 200, nil)
require.NoError(t, err)

tenantId := int64(1)
sourceKeyspace := getSourceKeyspace(tenantId)
sourceAliasKeyspace := getSourceAliasKeyspace(tenantId)
_, err = vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, sourceKeyspace, "0", mtVSchema, mtSchema, 1, 0, getInitialTabletIdForTenant(tenantId), nil)
require.NoError(t, err)

targetPrimary := vc.getPrimaryTablet(t, targetKeyspace, "0")
sourcePrimary := vc.getPrimaryTablet(t, sourceKeyspace, "0")
primaries := map[string]*cluster.VttabletProcess{
"target": targetPrimary,
"source": sourcePrimary,
}

vtgateConn, closeConn := getVTGateConn()
defer closeConn()
numRows := 10
lastIndex := int64(0)
insertRows := func(lastIndex int64, keyspace string) int64 {
for i := 1; i <= numRows; i++ {
execQueryWithRetry(t, vtgateConn,
fmt.Sprintf("insert into %s.t1(id, tenant_id) values(%d, %d)", keyspace, int64(i)+lastIndex, tenantId), queryTimeout)
}
return int64(numRows) + lastIndex
}
lastIndex = insertRows(lastIndex, sourceKeyspace)

mt := newVtctldMoveTables(&moveTablesWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
workflowName: fmt.Sprintf("wf%d", tenantId),
targetKeyspace: targetKeyspace,
},
sourceKeyspace: sourceKeyspace,
createFlags: []string{
"--tenant-id", strconv.FormatInt(tenantId, 10),
"--source-keyspace-alias", sourceAliasKeyspace,
},
})

preSwitchRules := &vschemapb.KeyspaceRoutingRules{
Rules: []*vschemapb.KeyspaceRoutingRule{
{FromKeyspace: "a1", ToKeyspace: "s1"},
{FromKeyspace: "s1", ToKeyspace: "s1"},
},
}
postSwitchRules := &vschemapb.KeyspaceRoutingRules{
Rules: []*vschemapb.KeyspaceRoutingRule{
{FromKeyspace: "a1", ToKeyspace: "mt"},
{FromKeyspace: "s1", ToKeyspace: "mt"},
},
}
rulesMap := map[string]*vschemapb.KeyspaceRoutingRules{
"pre": preSwitchRules,
"post": postSwitchRules,
}
require.Zero(t, len(getKeyspaceRoutingRules(t, vc).Rules))
mt.Create()
validateKeyspaceRoutingRules(t, vc, primaries, rulesMap, false)
for _, ks := range []string{sourceKeyspace, sourceAliasKeyspace, targetKeyspace} {
lastIndex = insertRows(lastIndex, ks)
}
mt.SwitchReadsAndWrites()
validateKeyspaceRoutingRules(t, vc, primaries, rulesMap, true)
for _, ks := range []string{sourceKeyspace, sourceAliasKeyspace, targetKeyspace} {
lastIndex = insertRows(lastIndex, ks)
}
mt.Complete()
validateKeyspaceRoutingRules(t, vc, primaries, rulesMap, true)
actualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", targetKeyspace, "t1"))
require.Equal(t, lastIndex, int64(actualRowsInserted))
}

// If switched queries with source/alias qualifiers should execute on target, else on source. Confirm that
// the routing rules are as expected and that the query executes on the expected tablet.
func validateKeyspaceRoutingRules(t *testing.T, vc *VitessCluster, primaries map[string]*cluster.VttabletProcess, rulesMap map[string]*vschemapb.KeyspaceRoutingRules, switched bool) {
currentRules := getKeyspaceRoutingRules(t, vc)
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
queryTemplate := "select count(*) from %s.t1"
matchQuery := "select count(*) from t1"

validateQueryRoute := func(qualifier, dest string) {
query := fmt.Sprintf(queryTemplate, qualifier)
assertQueryExecutesOnTablet(t, vtgateConn, primaries[dest], "", query, matchQuery)
log.Infof("query %s executed on %s", query, dest)
}

if switched {
require.ElementsMatch(t, rulesMap["post"].Rules, currentRules.Rules)
validateQueryRoute("mt", "target")
validateQueryRoute("s1", "target")
validateQueryRoute("a1", "target")
} else {
require.ElementsMatch(t, rulesMap["pre"].Rules, currentRules.Rules)
// Note that with multi-tenant migration, we cannot redirect the target keyspace since
// there are multiple source keyspaces and the target has the aggregate of all the tenants.
validateQueryRoute("mt", "target")
validateQueryRoute("s1", "source")
validateQueryRoute("a1", "source")
}
}

func getSourceKeyspace(tenantId int64) string {
return fmt.Sprintf(sourceKeyspaceTemplate, tenantId)
}
Expand All @@ -80,13 +230,73 @@ func getSourceAliasKeyspace(tenantId int64) string {
return fmt.Sprintf(sourceAliasKeyspaceTemplate, tenantId)
}

func (mtm *multiTenantMigration) insertSomeData(t *testing.T, tenantId int64, keyspace string, numRows int64) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
idx := mtm.getLastID(tenantId)
for i := idx + 1; i <= idx+numRows; i++ {
execQueryWithRetry(t, vtgateConn,
fmt.Sprintf("insert into %s.t1(id, tenant_id) values(%d, %d)", keyspace, i, tenantId), queryTimeout)
}
mtm.setLastID(tenantId, idx+numRows)
}

func getKeyspaceRoutingRules(t *testing.T, vc *VitessCluster) *vschemapb.KeyspaceRoutingRules {
output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetKeyspaceRoutingRules")
require.NoError(t, err)
rules := &vschemapb.KeyspaceRoutingRules{}
err = json.Unmarshal([]byte(output), rules)
require.NoError(t, err)
return rules
}

// printKeyspaceRoutingRules is used for debugging purposes.
func printKeyspaceRoutingRules(t *testing.T, vc *VitessCluster, msg string) {
output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetKeyspaceRoutingRules")
require.NoError(t, err)
log.Infof("%s: Keyspace routing rules are: %s", msg, output)
}

// TestMultiTenant tests a multi-tenant migration scenario where each tenant is in a separate database.
// It uses MoveTables to migrate all tenants to the same target keyspace. The test creates a separate source keyspace
// for each tenant. It then steps through the migration process for each tenant, and verifies that the data is migrated
// correctly. The migration steps are done concurrently and randomly to simulate an actual multi-tenant migration.
func TestMultiTenantComplex(t *testing.T) {
setSidecarDBName("_vt")
// Don't create RDONLY tablets to reduce number of tablets created to reduce resource requirements for the test.
origDefaultRdonly := defaultRdonly
defer func() {
defaultRdonly = origDefaultRdonly
}()
defaultRdonly = 0
vc = setupMinimalCluster(t)
defer vc.TearDown()

mtm := newMultiTenantMigration(t)
numTenantsMigrated := 0
mtm.run() // Start the migration process for all tenants.
timer := time.NewTimer(waitTimeout)
for numTenantsMigrated < numTenants {
select {
case tenantId := <-chCompleted:
mtm.setTenantMigrationStatus(tenantId, tenantMigrationStatusMigrated)
numTenantsMigrated++
timer.Reset(waitTimeout)
case <-timer.C:
require.FailNow(t, "Timed out waiting for all tenants to complete")
}
}
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
t.Run("Verify all rows have been migrated", func(t *testing.T) {
numAdditionalInsertSets := 2 // during the SwitchTraffic stop
totalRowsInsertedPerTenant := numInitialRowsPerTenant + numAdditionalRowsPerTenant*numAdditionalInsertSets
totalRowsInserted := totalRowsInsertedPerTenant * numTenants
totalActualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", mtm.targetKeyspace, "t1"))
require.Equal(t, totalRowsInserted, totalActualRowsInserted)
})
}

func newMultiTenantMigration(t *testing.T) *multiTenantMigration {
_, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, targetKeyspaceName, "0", mtVSchema, mtSchema, 1, 0, 200, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -185,17 +395,6 @@ func (mtm *multiTenantMigration) start(tenantId int64) {
mt.Create()
}

func (mtm *multiTenantMigration) insertSomeData(t *testing.T, tenantId int64, sourceAliasKeyspace string, numRows int64) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
idx := mtm.getLastID(tenantId)
for i := idx + 1; i <= idx+numRows; i++ {
execQueryWithRetry(t, vtgateConn,
fmt.Sprintf("insert into %s.t1(id, tenant_id) values(%d, %d)", sourceAliasKeyspace, i, tenantId), queryTimeout)
}
mtm.setLastID(tenantId, idx+numRows)
}

func (mtm *multiTenantMigration) switchTraffic(tenantId int64) {
t := mtm.t
sourceAliasKeyspace := getSourceAliasKeyspace(tenantId)
Expand Down Expand Up @@ -256,99 +455,3 @@ func (mtm *multiTenantMigration) run() {
go mtm.doStuff("Switch Traffic", chInProgress, chSwitched, &numSwitched, mtm.switchTraffic)
go mtm.doStuff("Mark Migrations Complete", chSwitched, chCompleted, &numCompleted, mtm.complete)
}

// TestMultiTenant tests a multi-tenant migration scenario where each tenant is in a separate database.
// It uses MoveTables to migrate all tenants to the same target keyspace. The test creates a separate source keyspace
// for each tenant. It then steps through the migration process for each tenant, and verifies that the data is migrated
// correctly. The migration steps are done concurrently and randomly to simulate an actual multi-tenant migration.
func TestMultiTenantComplex(t *testing.T) {
setSidecarDBName("_vt")
// Don't create RDONLY tablets to reduce number of tablets created to reduce resource requirements for the test.
origDefaultRdonly := defaultRdonly
defer func() {
defaultRdonly = origDefaultRdonly
}()
defaultRdonly = 0
vc = setupMinimalCluster(t)
defer vc.TearDown()

mtm := newMultiTenantMigration(t)
numTenantsMigrated := 0
mtm.run() // Start the migration process for all tenants.
timer := time.NewTimer(waitTimeout)
for numTenantsMigrated < numTenants {
select {
case tenantId := <-chCompleted:
mtm.setTenantMigrationStatus(tenantId, tenantMigrationStatusMigrated)
numTenantsMigrated++
timer.Reset(waitTimeout)
case <-timer.C:
require.FailNow(t, "Timed out waiting for all tenants to complete")
}
}
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
t.Run("Verify all rows have been migrated", func(t *testing.T) {
numAdditionalInsertSets := 2 // during the SwitchTraffic stop
totalRowsInsertedPerTenant := numInitialRowsPerTenant + numAdditionalRowsPerTenant*numAdditionalInsertSets
totalRowsInserted := totalRowsInsertedPerTenant * numTenants
totalActualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", mtm.targetKeyspace, "t1"))
require.Equal(t, totalRowsInserted, totalActualRowsInserted)
})
}

func TestMultiTenantSimple(t *testing.T) {
setSidecarDBName("_vt")
// Don't create RDONLY tablets to reduce number of tablets created to reduce resource requirements for the test.
origDefaultRdonly := defaultRdonly
defer func() {
defaultRdonly = origDefaultRdonly
}()
defaultRdonly = 0
vc = setupMinimalCluster(t)
defer vc.TearDown()
targetKeyspace := "mt"
_, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, targetKeyspace, "0", mtVSchema, mtSchema, 1, 0, 200, nil)
require.NoError(t, err)
tenantId := int64(1)
sourceKeyspace := getSourceKeyspace(tenantId)
sourceAliasKeyspace := getSourceAliasKeyspace(tenantId)
_, err = vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, sourceKeyspace, "0", mtVSchema, mtSchema, 1, 0, getInitialTabletIdForTenant(tenantId), nil)
require.NoError(t, err)

vtgateConn, closeConn := getVTGateConn()
defer closeConn()
numRows := 10
lastIndex := int64(0)
insRows := func(lastIndex int64, keyspace string) int64 {
for i := 1; i <= numRows; i++ {
execQueryWithRetry(t, vtgateConn,
fmt.Sprintf("insert into %s.t1(id, tenant_id) values(%d, %d)", keyspace, int64(i)+lastIndex, tenantId), queryTimeout)
}
return int64(numRows) + lastIndex
}
lastIndex = insRows(lastIndex, sourceKeyspace)
mt := newVtctldMoveTables(&moveTablesWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
workflowName: fmt.Sprintf("wf%d", tenantId),
targetKeyspace: targetKeyspace,
},
sourceKeyspace: sourceKeyspace,
createFlags: []string{
"--tenant-id", strconv.FormatInt(tenantId, 10),
"--source-keyspace-alias", sourceAliasKeyspace,
},
})
mt.Create()
lastIndex = insRows(lastIndex, sourceKeyspace)
lastIndex = insRows(lastIndex, sourceAliasKeyspace)
lastIndex = insRows(lastIndex, targetKeyspace)
mt.SwitchReadsAndWrites()
lastIndex = insRows(lastIndex, sourceKeyspace)
lastIndex = insRows(lastIndex, sourceAliasKeyspace)
lastIndex = insRows(lastIndex, targetKeyspace)
mt.Complete()
actualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", targetKeyspace, "t1"))
require.Equal(t, lastIndex, int64(actualRowsInserted))
}
1 change: 1 addition & 0 deletions go/vt/vtgate/vindexes/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,7 @@ func (vschema *VSchema) FirstKeyspace() *Keyspace {
func (vschema *VSchema) FindRoutedTable(keyspace, tablename string, tabletType topodatapb.TabletType) (*Table, error) {
routedKeyspace, ok := vschema.KeyspaceRoutingRules[keyspace]
if ok {
// FIXME: remove before merging
log.Infof("Found keyspace routing rule for %s routed to %s", keyspace, routedKeyspace)
keyspace = routedKeyspace
}
Expand Down

0 comments on commit 434d5c0

Please sign in to comment.