From 8850dc169008a4cb44dab8d7b43bfd4178fbb633 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 1 Mar 2022 08:20:46 -0500 Subject: [PATCH] changefeedccl: Fix flaky test. Fix flaky TestChangefeedHandlesDrainingNodes test. The source of the flake was that cluster setting updates propagate asynchronously to the other nodes in the cluster. Thus, it was possible for the test to flake because some of the nodes were observing the old value for the setting. The flake is fixed by introducing testing utility function that sets the setting and ensures the setting propagates to all nodes in the test cluster. Fixes #76806 Release Notes: none Release Justification: test only change. --- pkg/ccl/changefeedccl/changefeed_test.go | 37 +++++++------- pkg/testutils/serverutils/BUILD.bazel | 2 + .../serverutils/test_cluster_utils.go | 50 +++++++++++++++++++ 3 files changed, 72 insertions(+), 17 deletions(-) create mode 100644 pkg/testutils/serverutils/test_cluster_utils.go diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 04f1ccb6abcf..baf05076005f 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -4227,8 +4227,8 @@ func TestChangefeedNodeShutdown(t *testing.T) { defer tc.Stopper().Stop(context.Background()) db := tc.ServerConn(1) + serverutils.SetClusterSetting(t, tc, "changefeed.experimental_poll_interval", time.Millisecond) sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '0ns'`) sqlDB.Exec(t, `CREATE DATABASE d`) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) @@ -4439,19 +4439,22 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) { skip.UnderRace(t, "Takes too long with race enabled") shouldDrain := true - knobs := base.TestingKnobs{DistSQL: &execinfra.TestingKnobs{ - DrainFast: true, - Changefeed: &TestingKnobs{}, - Flowinfra: &flowinfra.TestingKnobs{ - FlowRegistryDraining: func() bool { - if shouldDrain { - shouldDrain = false - return true - } - return false + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + DrainFast: true, + Changefeed: &TestingKnobs{}, + Flowinfra: &flowinfra.TestingKnobs{ + FlowRegistryDraining: func() bool { + if shouldDrain { + shouldDrain = false + return true + } + return false + }, }, }, - }} + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + } sinkDir, cleanupFn := testutils.TempDir(t) defer cleanupFn() @@ -4466,9 +4469,9 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) { db := tc.ServerConn(1) sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) - sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`) - sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`) + serverutils.SetClusterSetting(t, tc, "kv.rangefeed.enabled", true) + serverutils.SetClusterSetting(t, tc, "kv.closed_timestamp.target_duration", time.Second) + serverutils.SetClusterSetting(t, tc, "changefeed.experimental_poll_interval", 10*time.Millisecond) sqlutils.CreateTable( t, db, "foo", @@ -4491,9 +4494,9 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) { defer closeFeed(t, feed) // At this point, the job created by feed will fail to start running on node 0 due to draining - // registry. However, this job will be retried, and it should succeeded. + // registry. However, this job will be retried, and it should succeed. // Note: This test is a bit unrealistic in that if the registry is draining, that - // means that the server is draining (i.e being shut down). We don't do a full shutdown + // means that the server is draining (i.e. being shut down). We don't do a full shutdown // here, but we are simulating a restart by failing to start a flow the first time around. assertPayloads(t, feed, []string{ `foo: [1]->{"after": {"k": 1, "v": 1}}`, diff --git a/pkg/testutils/serverutils/BUILD.bazel b/pkg/testutils/serverutils/BUILD.bazel index a4c79352e774..f8492f82c684 100644 --- a/pkg/testutils/serverutils/BUILD.bazel +++ b/pkg/testutils/serverutils/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "serverutils", srcs = [ "test_cluster_shim.go", + "test_cluster_utils.go", "test_server_shim.go", "test_tenant_shim.go", ], @@ -20,6 +21,7 @@ go_library( "//pkg/server/status", "//pkg/settings/cluster", "//pkg/storage", + "//pkg/testutils", "//pkg/testutils/sqlutils", "//pkg/util/hlc", "//pkg/util/httputil", diff --git a/pkg/testutils/serverutils/test_cluster_utils.go b/pkg/testutils/serverutils/test_cluster_utils.go new file mode 100644 index 000000000000..1e3e92013eee --- /dev/null +++ b/pkg/testutils/serverutils/test_cluster_utils.go @@ -0,0 +1,50 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package serverutils + +import ( + "context" + "fmt" + "strconv" + + "github.com/cockroachdb/cockroach/pkg/testutils" +) + +// SetClusterSetting executes set cluster settings statement, and then ensures that +// all nodes in the test cluster see that setting update. +func SetClusterSetting(t testutils.TB, c TestClusterInterface, name string, value interface{}) { + t.Helper() + strVal := func() string { + switch v := value.(type) { + case string: + return v + case int, int32, int64: + return fmt.Sprintf("%d", v) + case bool: + return strconv.FormatBool(v) + case float32, float64: + return fmt.Sprintf("%f", v) + case fmt.Stringer: + return v.String() + default: + return fmt.Sprintf("%v", value) + } + }() + query := fmt.Sprintf("SET CLUSTER SETTING %s='%s'", name, strVal) + // Set cluster setting statement ensures the setting is propagated to the local registry. + // So, just execute the query against each node in the cluster. + for i := 0; i < c.NumServers(); i++ { + _, err := c.ServerConn(i).ExecContext(context.Background(), query) + if err != nil { + t.Fatal(err) + } + } +}