diff --git a/pkg/ccl/partitionccl/partition_test.go b/pkg/ccl/partitionccl/partition_test.go index e88a39059233..3f03e2a5430f 100644 --- a/pkg/ccl/partitionccl/partition_test.go +++ b/pkg/ccl/partitionccl/partition_test.go @@ -45,6 +45,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" yaml "gopkg.in/yaml.v2" ) @@ -974,6 +976,7 @@ func verifyScansOnNode( type partitioningTestClusterArgs struct { numDCs, nodesPerDC int + initialVersionKey cluster.VersionKey numReplicas int } @@ -1000,7 +1003,17 @@ func setupPartitioningTestCluster( serverKnobs := &server.TestingKnobs{ DefaultZoneConfigOverride: &cfg, } + var settings *cluster.Settings + if args.initialVersionKey != 0 { + version := cluster.VersionByKey(args.initialVersionKey) + serverKnobs.DisableAutomaticVersionUpgrade = 1 + storeKnobs.BootstrapVersion = &cluster.ClusterVersion{ + Version: version, + } + settings = cluster.MakeTestingClusterSettingsWithVersion(version, cluster.BinaryServerVersion) + } return base.TestServerArgs{ + Settings: settings, Knobs: base.TestingKnobs{ Store: storeKnobs, Server: serverKnobs, @@ -1205,14 +1218,26 @@ type repartitioningTest struct { index string old, new partitioningTest clusterConfig partitioningTestClusterArgs + hooks repartitioningTestHooks // ran is used to detect whether this subtest was skipped due to the test.run // flag. ran bool } +type repartitioningTestHooks struct { + name string + beforeRepartitioning func(t *testing.T, db *gosql.DB) + afterTest func(t *testing.T, db *gosql.DB) +} + func (rt *repartitioningTest) String() string { - return fmt.Sprintf("%s/%s/%s", rt.clusterConfig, rt.old.name, rt.new.name) + var parts []string + if rt.hooks.name != "" { + parts = append(parts, rt.hooks.name) + } + parts = append(parts, rt.clusterConfig.String(), rt.old.name, rt.new.name) + return strings.Join(parts, "/") } func (rt *repartitioningTest) run(t *testing.T) { @@ -1437,10 +1462,13 @@ func allRepartitioningTests( // useLeaseholderPrefs is false, 3 nodes. const numDCs = 3 + // TODO(ajwerner): remove the upgrade logic after 19.2 is released. + setupRepartitioningTest := func( base repartitioningTest, useLeaseholderPrefs bool, // skipped if nodesPerDC != 1 nodesPerDC int, + shouldPerformPreemptiveSnapshotUpgrade bool, ) (repartitioningTest, error) { t := base t.clusterConfig.numReplicas = nodesPerDC @@ -1463,9 +1491,32 @@ func allRepartitioningTests( if err := t.new.parse(useLeaseholderPrefs); err != nil { return repartitioningTest{}, err } + if shouldPerformPreemptiveSnapshotUpgrade { + var errGroup errgroup.Group + t.hooks = repartitioningTestHooks{ + name: "upgrade", + beforeRepartitioning: func(t *testing.T, db *gosql.DB) { + errGroup.Go(func() error { + // This sleep is arbitrary but the idea is that it's non-zero so + // some snapshots should be in flight. + const waitBeforeUpgrade = 500 * time.Millisecond + time.Sleep(waitBeforeUpgrade) + _, err := db.Exec("SET CLUSTER SETTING version = crdb_internal.node_executable_version();") + return err + }) + }, + afterTest: func(t *testing.T, db *gosql.DB) { + require.NoError(t, errGroup.Wait()) + }, + } + t.clusterConfig.initialVersionKey = cluster.Version19_1 + } return t, nil } - + upgradeFromPreemptiveSnapshotEnabled := cluster.BinaryServerVersion.Major == 19 && + // Only try to upgrade from 19.1 if we're an unstable 19.1 or 19.2 proper. + ((cluster.BinaryServerVersion.Minor == 1 && cluster.BinaryServerVersion.Unstable > 0) || + (cluster.BinaryServerVersion.Minor == 2 && cluster.BinaryServerVersion.Unstable == 0)) for i := range base { for _, useLeaseholderPrefs := range []bool{true, false} { for _, nodesPerDC := range []int{1, 3, 5} { @@ -1474,11 +1525,23 @@ func allRepartitioningTests( if !useLeaseholderPrefs && nodesPerDC != 1 { continue } - t, err := setupRepartitioningTest(base[i], useLeaseholderPrefs, nodesPerDC) - if err != nil { - return nil, err + if upgradeFromPreemptiveSnapshotEnabled { + for _, shouldPerformPreemptiveSnapshotUpgrade := range []bool{true, false} { + t, err := setupRepartitioningTest(base[i], useLeaseholderPrefs, nodesPerDC, + shouldPerformPreemptiveSnapshotUpgrade) + if err != nil { + return nil, err + } + tests = append(tests, t) + } + } else { + t, err := setupRepartitioningTest(base[i], useLeaseholderPrefs, nodesPerDC, + false) + if err != nil { + return nil, err + } + tests = append(tests, t) } - tests = append(tests, t) } } }