diff --git a/go/test/endtoend/vreplication/multi_tenant_test.go b/go/test/endtoend/vreplication/multi_tenant_test.go index 7133b731f48..7e8a01a7c7f 100644 --- a/go/test/endtoend/vreplication/multi_tenant_test.go +++ b/go/test/endtoend/vreplication/multi_tenant_test.go @@ -1,6 +1,34 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Consists of two tests. Both tests are for multi-tenant migration scenarios. + +1. TestMultiTenantSimple: migrates a single tenant to a target keyspace. + +2. TestMultiTenantComplex: migrates multiple tenants to a single target keyspace, with concurrent migrations. + +The tests use the MoveTables workflow to migrate the tenants. They are designed to simulate a real-world multi-tenant +migration scenario, where each tenant is in a separate database. +*/ + package vreplication import ( + "encoding/json" "fmt" "math/rand/v2" "strconv" @@ -11,8 +39,10 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) type tenantMigrationStatus int @@ -72,6 +102,126 @@ const ( ` ) +// TestMultiTenantSimple tests a single tenant migration. The aim here is to test all the steps of the migration process +// including keyspace routing rules, addition of tenant filters to the forward and reverse vreplication streams, and +// verifying that the data is migrated correctly. +func TestMultiTenantSimple(t *testing.T) { + setSidecarDBName("_vt") + // Don't create RDONLY tablets to reduce number of tablets created to reduce resource requirements for the test. + origDefaultRdonly := defaultRdonly + defer func() { + defaultRdonly = origDefaultRdonly + }() + defaultRdonly = 0 + vc = setupMinimalCluster(t) + defer vc.TearDown() + + targetKeyspace := "mt" + _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, targetKeyspace, "0", mtVSchema, mtSchema, 1, 0, 200, nil) + require.NoError(t, err) + + tenantId := int64(1) + sourceKeyspace := getSourceKeyspace(tenantId) + sourceAliasKeyspace := getSourceAliasKeyspace(tenantId) + _, err = vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, sourceKeyspace, "0", mtVSchema, mtSchema, 1, 0, getInitialTabletIdForTenant(tenantId), nil) + require.NoError(t, err) + + targetPrimary := vc.getPrimaryTablet(t, targetKeyspace, "0") + sourcePrimary := vc.getPrimaryTablet(t, sourceKeyspace, "0") + primaries := map[string]*cluster.VttabletProcess{ + "target": targetPrimary, + "source": sourcePrimary, + } + + vtgateConn, closeConn := getVTGateConn() + defer closeConn() + numRows := 10 + lastIndex := int64(0) + insertRows := func(lastIndex int64, keyspace string) int64 { + for i := 1; i <= numRows; i++ { + execQueryWithRetry(t, vtgateConn, + fmt.Sprintf("insert into %s.t1(id, tenant_id) values(%d, %d)", keyspace, int64(i)+lastIndex, tenantId), queryTimeout) + } + return int64(numRows) + lastIndex + } + lastIndex = insertRows(lastIndex, sourceKeyspace) + + mt := newVtctldMoveTables(&moveTablesWorkflow{ + workflowInfo: &workflowInfo{ + vc: vc, + workflowName: fmt.Sprintf("wf%d", tenantId), + targetKeyspace: targetKeyspace, + }, + sourceKeyspace: sourceKeyspace, + createFlags: []string{ + "--tenant-id", strconv.FormatInt(tenantId, 10), + "--source-keyspace-alias", sourceAliasKeyspace, + }, + }) + + preSwitchRules := &vschemapb.KeyspaceRoutingRules{ + Rules: []*vschemapb.KeyspaceRoutingRule{ + {FromKeyspace: "a1", ToKeyspace: "s1"}, + {FromKeyspace: "s1", ToKeyspace: "s1"}, + }, + } + postSwitchRules := &vschemapb.KeyspaceRoutingRules{ + Rules: []*vschemapb.KeyspaceRoutingRule{ + {FromKeyspace: "a1", ToKeyspace: "mt"}, + {FromKeyspace: "s1", ToKeyspace: "mt"}, + }, + } + rulesMap := map[string]*vschemapb.KeyspaceRoutingRules{ + "pre": preSwitchRules, + "post": postSwitchRules, + } + require.Zero(t, len(getKeyspaceRoutingRules(t, vc).Rules)) + mt.Create() + validateKeyspaceRoutingRules(t, vc, primaries, rulesMap, false) + for _, ks := range []string{sourceKeyspace, sourceAliasKeyspace, targetKeyspace} { + lastIndex = insertRows(lastIndex, ks) + } + mt.SwitchReadsAndWrites() + validateKeyspaceRoutingRules(t, vc, primaries, rulesMap, true) + for _, ks := range []string{sourceKeyspace, sourceAliasKeyspace, targetKeyspace} { + lastIndex = insertRows(lastIndex, ks) + } + mt.Complete() + validateKeyspaceRoutingRules(t, vc, primaries, rulesMap, true) + actualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", targetKeyspace, "t1")) + require.Equal(t, lastIndex, int64(actualRowsInserted)) +} + +// If switched queries with source/alias qualifiers should execute on target, else on source. Confirm that +// the routing rules are as expected and that the query executes on the expected tablet. +func validateKeyspaceRoutingRules(t *testing.T, vc *VitessCluster, primaries map[string]*cluster.VttabletProcess, rulesMap map[string]*vschemapb.KeyspaceRoutingRules, switched bool) { + currentRules := getKeyspaceRoutingRules(t, vc) + vtgateConn, closeConn := getVTGateConn() + defer closeConn() + queryTemplate := "select count(*) from %s.t1" + matchQuery := "select count(*) from t1" + + validateQueryRoute := func(qualifier, dest string) { + query := fmt.Sprintf(queryTemplate, qualifier) + assertQueryExecutesOnTablet(t, vtgateConn, primaries[dest], "", query, matchQuery) + log.Infof("query %s executed on %s", query, dest) + } + + if switched { + require.ElementsMatch(t, rulesMap["post"].Rules, currentRules.Rules) + validateQueryRoute("mt", "target") + validateQueryRoute("s1", "target") + validateQueryRoute("a1", "target") + } else { + require.ElementsMatch(t, rulesMap["pre"].Rules, currentRules.Rules) + // Note that with multi-tenant migration, we cannot redirect the target keyspace since + // there are multiple source keyspaces and the target has the aggregate of all the tenants. + validateQueryRoute("mt", "target") + validateQueryRoute("s1", "source") + validateQueryRoute("a1", "source") + } +} + func getSourceKeyspace(tenantId int64) string { return fmt.Sprintf(sourceKeyspaceTemplate, tenantId) } @@ -80,6 +230,26 @@ func getSourceAliasKeyspace(tenantId int64) string { return fmt.Sprintf(sourceAliasKeyspaceTemplate, tenantId) } +func (mtm *multiTenantMigration) insertSomeData(t *testing.T, tenantId int64, keyspace string, numRows int64) { + vtgateConn, closeConn := getVTGateConn() + defer closeConn() + idx := mtm.getLastID(tenantId) + for i := idx + 1; i <= idx+numRows; i++ { + execQueryWithRetry(t, vtgateConn, + fmt.Sprintf("insert into %s.t1(id, tenant_id) values(%d, %d)", keyspace, i, tenantId), queryTimeout) + } + mtm.setLastID(tenantId, idx+numRows) +} + +func getKeyspaceRoutingRules(t *testing.T, vc *VitessCluster) *vschemapb.KeyspaceRoutingRules { + output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetKeyspaceRoutingRules") + require.NoError(t, err) + rules := &vschemapb.KeyspaceRoutingRules{} + err = json.Unmarshal([]byte(output), rules) + require.NoError(t, err) + return rules +} + // printKeyspaceRoutingRules is used for debugging purposes. func printKeyspaceRoutingRules(t *testing.T, vc *VitessCluster, msg string) { output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetKeyspaceRoutingRules") @@ -87,6 +257,46 @@ func printKeyspaceRoutingRules(t *testing.T, vc *VitessCluster, msg string) { log.Infof("%s: Keyspace routing rules are: %s", msg, output) } +// TestMultiTenant tests a multi-tenant migration scenario where each tenant is in a separate database. +// It uses MoveTables to migrate all tenants to the same target keyspace. The test creates a separate source keyspace +// for each tenant. It then steps through the migration process for each tenant, and verifies that the data is migrated +// correctly. The migration steps are done concurrently and randomly to simulate an actual multi-tenant migration. +func TestMultiTenantComplex(t *testing.T) { + setSidecarDBName("_vt") + // Don't create RDONLY tablets to reduce number of tablets created to reduce resource requirements for the test. + origDefaultRdonly := defaultRdonly + defer func() { + defaultRdonly = origDefaultRdonly + }() + defaultRdonly = 0 + vc = setupMinimalCluster(t) + defer vc.TearDown() + + mtm := newMultiTenantMigration(t) + numTenantsMigrated := 0 + mtm.run() // Start the migration process for all tenants. + timer := time.NewTimer(waitTimeout) + for numTenantsMigrated < numTenants { + select { + case tenantId := <-chCompleted: + mtm.setTenantMigrationStatus(tenantId, tenantMigrationStatusMigrated) + numTenantsMigrated++ + timer.Reset(waitTimeout) + case <-timer.C: + require.FailNow(t, "Timed out waiting for all tenants to complete") + } + } + vtgateConn, closeConn := getVTGateConn() + defer closeConn() + t.Run("Verify all rows have been migrated", func(t *testing.T) { + numAdditionalInsertSets := 2 // during the SwitchTraffic stop + totalRowsInsertedPerTenant := numInitialRowsPerTenant + numAdditionalRowsPerTenant*numAdditionalInsertSets + totalRowsInserted := totalRowsInsertedPerTenant * numTenants + totalActualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", mtm.targetKeyspace, "t1")) + require.Equal(t, totalRowsInserted, totalActualRowsInserted) + }) +} + func newMultiTenantMigration(t *testing.T) *multiTenantMigration { _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, targetKeyspaceName, "0", mtVSchema, mtSchema, 1, 0, 200, nil) require.NoError(t, err) @@ -185,17 +395,6 @@ func (mtm *multiTenantMigration) start(tenantId int64) { mt.Create() } -func (mtm *multiTenantMigration) insertSomeData(t *testing.T, tenantId int64, sourceAliasKeyspace string, numRows int64) { - vtgateConn, closeConn := getVTGateConn() - defer closeConn() - idx := mtm.getLastID(tenantId) - for i := idx + 1; i <= idx+numRows; i++ { - execQueryWithRetry(t, vtgateConn, - fmt.Sprintf("insert into %s.t1(id, tenant_id) values(%d, %d)", sourceAliasKeyspace, i, tenantId), queryTimeout) - } - mtm.setLastID(tenantId, idx+numRows) -} - func (mtm *multiTenantMigration) switchTraffic(tenantId int64) { t := mtm.t sourceAliasKeyspace := getSourceAliasKeyspace(tenantId) @@ -256,99 +455,3 @@ func (mtm *multiTenantMigration) run() { go mtm.doStuff("Switch Traffic", chInProgress, chSwitched, &numSwitched, mtm.switchTraffic) go mtm.doStuff("Mark Migrations Complete", chSwitched, chCompleted, &numCompleted, mtm.complete) } - -// TestMultiTenant tests a multi-tenant migration scenario where each tenant is in a separate database. -// It uses MoveTables to migrate all tenants to the same target keyspace. The test creates a separate source keyspace -// for each tenant. It then steps through the migration process for each tenant, and verifies that the data is migrated -// correctly. The migration steps are done concurrently and randomly to simulate an actual multi-tenant migration. -func TestMultiTenantComplex(t *testing.T) { - setSidecarDBName("_vt") - // Don't create RDONLY tablets to reduce number of tablets created to reduce resource requirements for the test. - origDefaultRdonly := defaultRdonly - defer func() { - defaultRdonly = origDefaultRdonly - }() - defaultRdonly = 0 - vc = setupMinimalCluster(t) - defer vc.TearDown() - - mtm := newMultiTenantMigration(t) - numTenantsMigrated := 0 - mtm.run() // Start the migration process for all tenants. - timer := time.NewTimer(waitTimeout) - for numTenantsMigrated < numTenants { - select { - case tenantId := <-chCompleted: - mtm.setTenantMigrationStatus(tenantId, tenantMigrationStatusMigrated) - numTenantsMigrated++ - timer.Reset(waitTimeout) - case <-timer.C: - require.FailNow(t, "Timed out waiting for all tenants to complete") - } - } - vtgateConn, closeConn := getVTGateConn() - defer closeConn() - t.Run("Verify all rows have been migrated", func(t *testing.T) { - numAdditionalInsertSets := 2 // during the SwitchTraffic stop - totalRowsInsertedPerTenant := numInitialRowsPerTenant + numAdditionalRowsPerTenant*numAdditionalInsertSets - totalRowsInserted := totalRowsInsertedPerTenant * numTenants - totalActualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", mtm.targetKeyspace, "t1")) - require.Equal(t, totalRowsInserted, totalActualRowsInserted) - }) -} - -func TestMultiTenantSimple(t *testing.T) { - setSidecarDBName("_vt") - // Don't create RDONLY tablets to reduce number of tablets created to reduce resource requirements for the test. - origDefaultRdonly := defaultRdonly - defer func() { - defaultRdonly = origDefaultRdonly - }() - defaultRdonly = 0 - vc = setupMinimalCluster(t) - defer vc.TearDown() - targetKeyspace := "mt" - _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, targetKeyspace, "0", mtVSchema, mtSchema, 1, 0, 200, nil) - require.NoError(t, err) - tenantId := int64(1) - sourceKeyspace := getSourceKeyspace(tenantId) - sourceAliasKeyspace := getSourceAliasKeyspace(tenantId) - _, err = vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, sourceKeyspace, "0", mtVSchema, mtSchema, 1, 0, getInitialTabletIdForTenant(tenantId), nil) - require.NoError(t, err) - - vtgateConn, closeConn := getVTGateConn() - defer closeConn() - numRows := 10 - lastIndex := int64(0) - insRows := func(lastIndex int64, keyspace string) int64 { - for i := 1; i <= numRows; i++ { - execQueryWithRetry(t, vtgateConn, - fmt.Sprintf("insert into %s.t1(id, tenant_id) values(%d, %d)", keyspace, int64(i)+lastIndex, tenantId), queryTimeout) - } - return int64(numRows) + lastIndex - } - lastIndex = insRows(lastIndex, sourceKeyspace) - mt := newVtctldMoveTables(&moveTablesWorkflow{ - workflowInfo: &workflowInfo{ - vc: vc, - workflowName: fmt.Sprintf("wf%d", tenantId), - targetKeyspace: targetKeyspace, - }, - sourceKeyspace: sourceKeyspace, - createFlags: []string{ - "--tenant-id", strconv.FormatInt(tenantId, 10), - "--source-keyspace-alias", sourceAliasKeyspace, - }, - }) - mt.Create() - lastIndex = insRows(lastIndex, sourceKeyspace) - lastIndex = insRows(lastIndex, sourceAliasKeyspace) - lastIndex = insRows(lastIndex, targetKeyspace) - mt.SwitchReadsAndWrites() - lastIndex = insRows(lastIndex, sourceKeyspace) - lastIndex = insRows(lastIndex, sourceAliasKeyspace) - lastIndex = insRows(lastIndex, targetKeyspace) - mt.Complete() - actualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", targetKeyspace, "t1")) - require.Equal(t, lastIndex, int64(actualRowsInserted)) -} diff --git a/go/vt/vtgate/vindexes/vschema.go b/go/vt/vtgate/vindexes/vschema.go index 82e13b6259d..59d36fb39f5 100644 --- a/go/vt/vtgate/vindexes/vschema.go +++ b/go/vt/vtgate/vindexes/vschema.go @@ -1113,6 +1113,7 @@ func (vschema *VSchema) FirstKeyspace() *Keyspace { func (vschema *VSchema) FindRoutedTable(keyspace, tablename string, tabletType topodatapb.TabletType) (*Table, error) { routedKeyspace, ok := vschema.KeyspaceRoutingRules[keyspace] if ok { + // FIXME: remove before merging log.Infof("Found keyspace routing rule for %s routed to %s", keyspace, routedKeyspace) keyspace = routedKeyspace }