From c8c1404516c2e74f9ecf515c8ab03f6de97d6316 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 6 May 2021 14:32:23 +0100 Subject: [PATCH] changefeedccl: add test for disabled outbound IO This adds a test to ensure that we do not allow sinkful changefeeds to be started when the --external-io-disabled flag has been set. Note that this only tests a single node setup. For the flag to be effective, it needs to be set on all nodes in a cluster. Release note: None --- pkg/base/test_server_args.go | 4 ++ pkg/ccl/changefeedccl/changefeed_test.go | 48 ++++++++++++++++++++++++ pkg/server/testserver.go | 1 + 3 files changed, 53 insertions(+) diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index eccb9d68c18f..dffd5adc3185 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -85,6 +85,10 @@ type TestServerArgs struct { // ExternalIODir is used to initialize field in cluster.Settings. ExternalIODir string + // ExternalIODirConfig is used to initialize the same-named + // field on the server.Config struct. + ExternalIODirConfig ExternalIODirConfig + // Fields copied to the server.Config. Insecure bool RetryOptions retry.Options // TODO(tbg): make testing knob. diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index c24b339f2238..a7a099049f2b 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -551,6 +551,54 @@ func TestChangefeedUserDefinedTypes(t *testing.T) { t.Run(`kafka`, kafkaTest(testFn)) } +func TestChangefeedExternalIODisabled(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + t.Run("sinkful changefeeds not allowed with disabled external io", func(t *testing.T) { + disallowedSinkProtos := []string{ + changefeedbase.SinkSchemeExperimentalSQL, + changefeedbase.SinkSchemeKafka, + changefeedbase.SinkSchemeNull, // Doesn't work because all sinkful changefeeds are disallowed + // Cloud sink schemes + "experimental-s3", + "experimental-gs", + "experimental-nodelocal", + "experimental-http", + "experimental-https", + "experimental-azure", + } + ctx := context.Background() + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + ExternalIODirConfig: base.ExternalIODirConfig{ + DisableOutbound: true, + }, + }) + defer s.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, "CREATE TABLE target_table (pk INT PRIMARY KEY)") + for _, proto := range disallowedSinkProtos { + sqlDB.ExpectErr(t, "Outbound IO is disabled by configuration, cannot create changefeed", + "CREATE CHANGEFEED FOR target_table INTO $1", + fmt.Sprintf("%s://does-not-matter", proto), + ) + } + }) + + withDisabledOutbound := func(args *base.TestServerArgs) { args.ExternalIODirConfig.DisableOutbound = true } + t.Run("sinkless changfeeds are allowed with disabled external io", + sinklessTestWithServerArgs(withDisabledOutbound, + func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, "CREATE TABLE target_table (pk INT PRIMARY KEY)") + sqlDB.Exec(t, "INSERT INTO target_table VALUES (1)") + feed := feed(t, f, "CREATE CHANGEFEED FOR target_table") + defer closeFeed(t, feed) + assertPayloads(t, feed, []string{ + `target_table: [1]->{"after": {"pk": 1}}`, + }) + })) +} + // Test how Changefeeds react to schema changes that do not require a backfill // operation. func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 439e341aa7ef..38945c680878 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -147,6 +147,7 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config { cfg.JoinList = []string{params.JoinAddr} } cfg.ClusterName = params.ClusterName + cfg.ExternalIODirConfig = params.ExternalIODirConfig cfg.Insecure = params.Insecure cfg.AutoInitializeCluster = !params.NoAutoInitializeCluster cfg.SocketFile = params.SocketFile