From ca39dea6450c1ac3f42ba77c473a4175372b9914 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Wed, 24 Aug 2022 15:13:38 +0000 Subject: [PATCH 1/9] *: infuse `CollectionFactory` for storage and backup This is part of the migration of existing DDL statement with internal executor to `descs.CollectionFactory.TxnWithExecutor()`. DDL statements should only be run with an internal executor that's created via this function. Release justification: Low risk, high benefit changes to existing functionality Release note: none --- pkg/ccl/backupccl/backup_metadata_test.go | 7 +- pkg/ccl/backupccl/backup_test.go | 13 +- .../backupccl/restore_data_processor_test.go | 7 +- .../changefeedccl/sink_cloudstorage_test.go | 8 +- pkg/ccl/cliccl/debug_backup.go | 15 +- pkg/ccl/storageccl/BUILD.bazel | 1 + .../storageccl/external_sst_reader_test.go | 7 +- pkg/ccl/workloadccl/storage.go | 15 +- pkg/cloud/BUILD.bazel | 1 + pkg/cloud/amazon/s3_storage_test.go | 102 +++++++-- pkg/cloud/azure/azure_storage_test.go | 14 +- pkg/cloud/cloudtestutils/BUILD.bazel | 1 + .../cloudtestutils/cloud_test_helpers.go | 47 ++-- pkg/cloud/external_storage.go | 2 + pkg/cloud/externalconn/connection_storage.go | 2 +- pkg/cloud/gcp/gcs_storage_test.go | 207 +++++++++++------- pkg/cloud/httpsink/http_storage_test.go | 51 +++-- pkg/cloud/impl_registry.go | 7 +- pkg/cloud/nodelocal/nodelocal_storage_test.go | 10 +- pkg/cloud/nullsink/nullsink_storage_test.go | 9 +- pkg/cloud/userfile/BUILD.bazel | 1 + pkg/cloud/userfile/file_table_storage.go | 2 +- pkg/cloud/userfile/file_table_storage_test.go | 21 +- pkg/cloud/userfile/filetable/BUILD.bazel | 1 + .../filetable/file_table_read_writer.go | 6 +- .../filetable/filetabletest/BUILD.bazel | 1 + .../file_table_read_writer_test.go | 13 +- pkg/server/external_storage_builder.go | 9 +- pkg/server/server.go | 4 + pkg/server/server_sql.go | 3 + pkg/server/tenant.go | 4 + pkg/sql/importer/import_processor_test.go | 7 +- pkg/sql/importer/import_stmt_test.go | 9 +- 33 files changed, 444 insertions(+), 163 deletions(-) diff --git a/pkg/ccl/backupccl/backup_metadata_test.go b/pkg/ccl/backupccl/backup_metadata_test.go index 76d541f067d3..47febbea773a 100644 --- a/pkg/ccl/backupccl/backup_metadata_test.go +++ b/pkg/ccl/backupccl/backup_metadata_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -86,7 +87,11 @@ func checkMetadata( tc.Servers[0].ClusterSettings(), blobs.TestEmptyBlobClientFactory, username.RootUserName(), - tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil) + tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), + tc.Servers[0].CollectionFactory().(*descs.CollectionFactory), + tc.Servers[0].DB(), + nil, /* limiters */ + ) if err != nil { t.Fatal(err) } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index af2de50b8c60..e02733b04c35 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -572,7 +572,11 @@ func TestBackupRestoreAppend(t *testing.T) { tc.Servers[0].ClusterSettings(), blobs.TestEmptyBlobClientFactory, username.RootUserName(), - tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil) + tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), + tc.Servers[0].CollectionFactory().(*descs.CollectionFactory), + tc.Servers[0].DB(), + nil, /* limiters */ + ) require.NoError(t, err) defer store.Close() var files []string @@ -8021,7 +8025,12 @@ func TestReadBackupManifestMemoryMonitoring(t *testing.T) { base.ExternalIODirConfig{}, st, blobs.TestBlobServiceClient(dir), - username.RootUserName(), nil, nil, nil) + username.RootUserName(), + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) require.NoError(t, err) m := mon.NewMonitor("test-monitor", mon.MemoryResource, nil, nil, 0, 0, st) diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index 6f61e5ced6c3..238339db3ec1 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -253,7 +253,12 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { DB: kvDB, ExternalStorage: func(ctx context.Context, dest cloudpb.ExternalStorage, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) { return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{}, - s.ClusterSettings(), blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir), nil, nil, nil, opts...) + s.ClusterSettings(), blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir), + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + opts...) }, Settings: s.ClusterSettings(), Codec: keys.SystemSQLCodec, diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 4545d73c3730..a0e6e305f0af 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -169,7 +169,13 @@ func TestCloudStorageSink(t *testing.T) { externalStorageFromURI := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) { return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, settings, - clientFactory, user, nil, nil, nil, opts...) + clientFactory, + user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + opts...) } user := username.RootUserName() diff --git a/pkg/ccl/cliccl/debug_backup.go b/pkg/ccl/cliccl/debug_backup.go index 3057ec621010..ada7b714b5cc 100644 --- a/pkg/ccl/cliccl/debug_backup.go +++ b/pkg/ccl/cliccl/debug_backup.go @@ -291,7 +291,12 @@ func externalStorageFromURIFactory( defaultSettings := &cluster.Settings{} defaultSettings.SV.Init(ctx, nil /* opaque */) return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, - defaultSettings, newBlobFactory, user, nil /*Internal Executor*/, nil /*kvDB*/, nil, opts...) + defaultSettings, newBlobFactory, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + opts...) } func getManifestFromURI(ctx context.Context, path string) (backuppb.BackupManifest, error) { @@ -588,7 +593,13 @@ func makeIters( var err error clusterSettings := cluster.MakeClusterSettings() dirStorage[i], err = cloud.MakeExternalStorage(ctx, file.Dir, base.ExternalIODirConfig{}, - clusterSettings, newBlobFactory, nil /*internal executor*/, nil /*kvDB*/, nil) + clusterSettings, + newBlobFactory, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) if err != nil { return nil, nil, errors.Wrapf(err, "making external storage") } diff --git a/pkg/ccl/storageccl/BUILD.bazel b/pkg/ccl/storageccl/BUILD.bazel index d54954e9d01e..6ed03b91393f 100644 --- a/pkg/ccl/storageccl/BUILD.bazel +++ b/pkg/ccl/storageccl/BUILD.bazel @@ -46,6 +46,7 @@ go_test( "//pkg/security/username", "//pkg/server", "//pkg/sql", + "//pkg/sql/catalog/descs", "//pkg/storage", "//pkg/testutils", "//pkg/testutils/serverutils", diff --git a/pkg/ccl/storageccl/external_sst_reader_test.go b/pkg/ccl/storageccl/external_sst_reader_test.go index 23fd41c74246..9525ad84fd75 100644 --- a/pkg/ccl/storageccl/external_sst_reader_test.go +++ b/pkg/ccl/storageccl/external_sst_reader_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" @@ -122,7 +123,11 @@ func TestNewExternalSSTReader(t *testing.T) { clusterSettings, blobs.TestBlobServiceClient(tempDir), username.RootUserName(), - tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil) + tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), + tc.Servers[0].CollectionFactory().(*descs.CollectionFactory), + tc.Servers[0].DB(), + nil, /* limiters */ + ) require.NoError(t, err) fileStores[i].Store = store diff --git a/pkg/ccl/workloadccl/storage.go b/pkg/ccl/workloadccl/storage.go index 4588cf64e479..6856a6e10f0d 100644 --- a/pkg/ccl/workloadccl/storage.go +++ b/pkg/ccl/workloadccl/storage.go @@ -35,9 +35,18 @@ func GetStorage(ctx context.Context, cfg FixtureConfig) (cloud.ExternalStorage, return nil, errors.AssertionFailedf("unsupported external storage provider; valid providers are gs, s3, and azure") } - s, err := cloud.ExternalStorageFromURI(ctx, cfg.ObjectPathToURI(), - base.ExternalIODirConfig{}, clustersettings.MakeClusterSettings(), - nil, username.SQLUsername{}, nil, nil, nil) + s, err := cloud.ExternalStorageFromURI( + ctx, + cfg.ObjectPathToURI(), + base.ExternalIODirConfig{}, + clustersettings.MakeClusterSettings(), + nil, /* blobClientFactory */ + username.SQLUsername{}, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) if err != nil { return nil, errors.Wrap(err, storageError) } diff --git a/pkg/cloud/BUILD.bazel b/pkg/cloud/BUILD.bazel index 130caa7f945e..d80a2d38922a 100644 --- a/pkg/cloud/BUILD.bazel +++ b/pkg/cloud/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//pkg/security/username", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/sql/catalog/descs", "//pkg/sql/sqlutil", "//pkg/util/ctxgroup", "//pkg/util/ioctx", diff --git a/pkg/cloud/amazon/s3_storage_test.go b/pkg/cloud/amazon/s3_storage_test.go index 2569b338b3a9..ac983b9ef0ea 100644 --- a/pkg/cloud/amazon/s3_storage_test.go +++ b/pkg/cloud/amazon/s3_storage_test.go @@ -46,7 +46,12 @@ func makeS3Storage( // Setup a sink for the given args. clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, - clientFactory, nil, nil, nil) + clientFactory, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) if err != nil { return nil, err } @@ -76,7 +81,12 @@ func TestPutS3(t *testing.T) { t.Run("auth-empty-no-cred", func(t *testing.T) { _, err := cloud.ExternalStorageFromURI(ctx, fmt.Sprintf("s3://%s/%s", bucket, "backup-test-default"), base.ExternalIODirConfig{}, testSettings, - blobs.TestEmptyBlobClientFactory, user, nil, nil, nil) + blobs.TestEmptyBlobClientFactory, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) require.EqualError(t, err, fmt.Sprintf( `%s is set to '%s', but %s is not set`, cloud.AuthParam, @@ -100,14 +110,23 @@ func TestPutS3(t *testing.T) { "s3://%s/%s?%s=%s", bucket, "backup-test-default", cloud.AuthParam, cloud.AuthParamImplicit, - ), false, user, nil, nil, testSettings) + ), false, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings) }) t.Run("auth-specified", func(t *testing.T) { uri := S3URI(bucket, "backup-test", &cloudpb.ExternalStorage_S3{AccessKey: creds.AccessKeyID, Secret: creds.SecretAccessKey, Region: "us-east-1"}, ) - cloudtestutils.CheckExportStore(t, uri, false, user, nil, nil, testSettings) - cloudtestutils.CheckListFiles(t, uri, user, nil, nil, testSettings) + cloudtestutils.CheckExportStore(t, uri, false, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) + cloudtestutils.CheckListFiles(t, uri, user, nil, nil, nil, testSettings) }) // Tests that we can put an object with server side encryption specified. @@ -128,7 +147,14 @@ func TestPutS3(t *testing.T) { bucket, "backup-test-sse-256", cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode, "AES256", - ), false, user, nil, nil, testSettings) + ), + false, + user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) v := os.Getenv("AWS_KMS_KEY_ARN") if v == "" { @@ -139,7 +165,13 @@ func TestPutS3(t *testing.T) { bucket, "backup-test-sse-kms", cloud.AuthParam, cloud.AuthParamImplicit, AWSServerSideEncryptionMode, "aws:kms", AWSServerSideEncryptionKMSID, v, - ), false, user, nil, nil, testSettings) + ), + false, + user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings) }) t.Run("server-side-encryption-invalid-params", func(t *testing.T) { @@ -208,16 +240,36 @@ func TestPutS3AssumeRole(t *testing.T) { uri := S3URI(bucket, "backup-test", &cloudpb.ExternalStorage_S3{Auth: cloud.AuthParamImplicit, RoleARN: roleArn, Region: "us-east-1"}, ) - cloudtestutils.CheckExportStore(t, uri, false, user, nil, nil, testSettings) - cloudtestutils.CheckListFiles(t, uri, user, nil, nil, testSettings) + cloudtestutils.CheckExportStore(t, uri, false, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) + cloudtestutils.CheckListFiles(t, uri, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) }) t.Run("auth-specified", func(t *testing.T) { uri := S3URI(bucket, "backup-test", &cloudpb.ExternalStorage_S3{Auth: cloud.AuthParamSpecified, RoleARN: roleArn, AccessKey: creds.AccessKeyID, Secret: creds.SecretAccessKey, Region: "us-east-1"}, ) - cloudtestutils.CheckExportStore(t, uri, false, user, nil, nil, testSettings) - cloudtestutils.CheckListFiles(t, uri, user, nil, nil, testSettings) + cloudtestutils.CheckExportStore(t, uri, false, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) + cloudtestutils.CheckListFiles(t, uri, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) }) t.Run("role-chaining", func(t *testing.T) { @@ -247,7 +299,12 @@ func TestPutS3AssumeRole(t *testing.T) { Region: "us-east-1", }, ) - cloudtestutils.CheckNoPermission(t, roleURI, user, nil, nil, testSettings) + cloudtestutils.CheckNoPermission(t, roleURI, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) } // Finally, check that the chain of roles can be used to access the storage. @@ -262,7 +319,12 @@ func TestPutS3AssumeRole(t *testing.T) { }, ) - cloudtestutils.CheckExportStore(t, uri, false, user, nil, nil, testSettings) + cloudtestutils.CheckExportStore(t, uri, false, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) }) } }) @@ -301,7 +363,12 @@ func TestPutS3Endpoint(t *testing.T) { testSettings := cluster.MakeTestingClusterSettings() - cloudtestutils.CheckExportStore(t, u.String(), false, user, nil, nil, testSettings) + cloudtestutils.CheckExportStore(t, u.String(), false, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) } func TestS3DisallowCustomEndpoints(t *testing.T) { @@ -372,7 +439,12 @@ func TestS3BucketDoesNotExist(t *testing.T) { // Setup a sink for the given args. clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, - clientFactory, nil, nil, nil) + clientFactory, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) if err != nil { t.Fatal(err) } diff --git a/pkg/cloud/azure/azure_storage_test.go b/pkg/cloud/azure/azure_storage_test.go index 11182c729a6d..9517afa25e16 100644 --- a/pkg/cloud/azure/azure_storage_test.go +++ b/pkg/cloud/azure/azure_storage_test.go @@ -67,9 +67,17 @@ func TestAzure(t *testing.T) { } testSettings := cluster.MakeTestingClusterSettings() cloudtestutils.CheckExportStore(t, cfg.filePath("backup-test"), - false, username.RootUserName(), nil, nil, testSettings) - cloudtestutils.CheckListFiles( - t, cfg.filePath("listing-test"), username.RootUserName(), nil, nil, testSettings, + false, username.RootUserName(), + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) + cloudtestutils.CheckListFiles(t, cfg.filePath("listing-test"), username.RootUserName(), + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, ) } diff --git a/pkg/cloud/cloudtestutils/BUILD.bazel b/pkg/cloud/cloudtestutils/BUILD.bazel index db24a23ab80b..d99cbebc2797 100644 --- a/pkg/cloud/cloudtestutils/BUILD.bazel +++ b/pkg/cloud/cloudtestutils/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//pkg/kv", "//pkg/security/username", "//pkg/settings/cluster", + "//pkg/sql/catalog/descs", "//pkg/sql/sqlutil", "//pkg/util/ioctx", "//pkg/util/randutil", diff --git a/pkg/cloud/cloudtestutils/cloud_test_helpers.go b/pkg/cloud/cloudtestutils/cloud_test_helpers.go index 09d48c6eb405..6a7028418569 100644 --- a/pkg/cloud/cloudtestutils/cloud_test_helpers.go +++ b/pkg/cloud/cloudtestutils/cloud_test_helpers.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/randutil" @@ -108,6 +109,7 @@ func storeFromURI( clientFactory blobs.BlobClientFactory, user username.SQLUsername, ie sqlutil.InternalExecutor, + cf *descs.CollectionFactory, kvDB *kv.DB, testSettings *cluster.Settings, ) cloud.ExternalStorage { @@ -117,7 +119,7 @@ func storeFromURI( } // Setup a sink for the given args. s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, - clientFactory, ie, kvDB, nil) + clientFactory, ie, cf, kvDB, nil) if err != nil { t.Fatal(err) } @@ -131,6 +133,7 @@ func CheckExportStore( skipSingleFile bool, user username.SQLUsername, ie sqlutil.InternalExecutor, + cf *descs.CollectionFactory, kvDB *kv.DB, testSettings *cluster.Settings, ) { @@ -144,7 +147,8 @@ func CheckExportStore( // Setup a sink for the given args. clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) - s, err := cloud.MakeExternalStorage(ctx, conf, ioConf, testSettings, clientFactory, ie, kvDB, nil) + s, err := cloud.MakeExternalStorage(ctx, conf, ioConf, testSettings, clientFactory, + ie, cf, kvDB, nil) if err != nil { t.Fatal(err) } @@ -252,7 +256,7 @@ func CheckExportStore( t.Fatal(err) } singleFile := storeFromURI(ctx, t, appendPath(t, storeURI, testingFilename), clientFactory, - user, ie, kvDB, testSettings) + user, ie, cf, kvDB, testSettings) defer singleFile.Close() res, err := singleFile.ReadFile(ctx, "") @@ -273,7 +277,7 @@ func CheckExportStore( t.Run("write-single-file-by-uri", func(t *testing.T) { const testingFilename = "B" singleFile := storeFromURI(ctx, t, appendPath(t, storeURI, testingFilename), clientFactory, - user, ie, kvDB, testSettings) + user, ie, cf, kvDB, testSettings) defer singleFile.Close() if err := cloud.WriteFile(ctx, singleFile, "", bytes.NewReader([]byte("bbb"))); err != nil { @@ -304,7 +308,7 @@ func CheckExportStore( if err := cloud.WriteFile(ctx, s, testingFilename, bytes.NewReader([]byte("aaa"))); err != nil { t.Fatal(err) } - singleFile := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, kvDB, testSettings) + singleFile := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, cf, kvDB, testSettings) defer singleFile.Close() // Read a valid file. @@ -346,10 +350,11 @@ func CheckListFiles( storeURI string, user username.SQLUsername, ie sqlutil.InternalExecutor, + cf *descs.CollectionFactory, kvDB *kv.DB, testSettings *cluster.Settings, ) { - CheckListFilesCanonical(t, storeURI, "", user, ie, kvDB, testSettings) + CheckListFilesCanonical(t, storeURI, "", user, ie, cf, kvDB, testSettings) } // CheckListFilesCanonical is like CheckListFiles but takes a canonical prefix @@ -361,6 +366,7 @@ func CheckListFilesCanonical( canonical string, user username.SQLUsername, ie sqlutil.InternalExecutor, + cf *descs.CollectionFactory, kvDB *kv.DB, testSettings *cluster.Settings, ) { @@ -374,7 +380,7 @@ func CheckListFilesCanonical( clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) for _, fileName := range fileNames { - file := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, kvDB, testSettings) + file := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, cf, kvDB, testSettings) if err := cloud.WriteFile(ctx, file, fileName, bytes.NewReader([]byte("bbb"))); err != nil { t.Fatal(err) } @@ -462,7 +468,7 @@ func CheckListFilesCanonical( }, } { t.Run(tc.name, func(t *testing.T) { - s := storeFromURI(ctx, t, tc.uri, clientFactory, user, ie, kvDB, testSettings) + s := storeFromURI(ctx, t, tc.uri, clientFactory, user, ie, cf, kvDB, testSettings) var actual []string require.NoError(t, s.List(ctx, tc.prefix, tc.delimiter, func(f string) error { actual = append(actual, f) @@ -475,7 +481,7 @@ func CheckListFilesCanonical( }) for _, fileName := range fileNames { - file := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, kvDB, testSettings) + file := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, cf, kvDB, testSettings) if err := file.Delete(ctx, fileName); err != nil { t.Fatal(err) } @@ -493,9 +499,13 @@ func uploadData( data := randutil.RandBytes(rnd, 16<<20) ctx := context.Background() - s, err := cloud.MakeExternalStorage( - ctx, dest, base.ExternalIODirConfig{}, testSettings, - nil, nil, nil, nil) + s, err := cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{}, testSettings, + nil, /* blobClientFactory */ + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) require.NoError(t, err) require.NoError(t, cloud.WriteFile(ctx, s, basename, bytes.NewReader(data))) return data, func() { @@ -534,9 +544,13 @@ func CheckAntagonisticRead( }() ctx := context.Background() - s, err := cloud.MakeExternalStorage( - ctx, conf, base.ExternalIODirConfig{}, testSettings, - nil, nil, nil, nil) + s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, + nil, /* blobClientFactory */ + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) require.NoError(t, err) defer s.Close() @@ -555,6 +569,7 @@ func CheckNoPermission( storeURI string, user username.SQLUsername, ie sqlutil.InternalExecutor, + cf *descs.CollectionFactory, kvDB *kv.DB, testSettings *cluster.Settings, ) { @@ -567,7 +582,7 @@ func CheckNoPermission( } clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) - s, err := cloud.MakeExternalStorage(ctx, conf, ioConf, testSettings, clientFactory, ie, kvDB, nil) + s, err := cloud.MakeExternalStorage(ctx, conf, ioConf, testSettings, clientFactory, ie, cf, kvDB, nil) if err != nil { t.Fatal(err) } diff --git a/pkg/cloud/external_storage.go b/pkg/cloud/external_storage.go index 48482ff45fcf..901118ce0a03 100644 --- a/pkg/cloud/external_storage.go +++ b/pkg/cloud/external_storage.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/errors" @@ -153,6 +154,7 @@ type ExternalStorageContext struct { Settings *cluster.Settings BlobClientFactory blobs.BlobClientFactory InternalExecutor sqlutil.InternalExecutor + CollectionFactory *descs.CollectionFactory DB *kv.DB Options []ExternalStorageOption Limiters Limiters diff --git a/pkg/cloud/externalconn/connection_storage.go b/pkg/cloud/externalconn/connection_storage.go index 902563ed9ec6..e98078e6e2eb 100644 --- a/pkg/cloud/externalconn/connection_storage.go +++ b/pkg/cloud/externalconn/connection_storage.go @@ -92,7 +92,7 @@ func makeExternalConnectionStorage( uri.Path = path.Join(uri.Path, cfg.Path) return cloud.ExternalStorageFromURI(ctx, uri.String(), args.IOConf, args.Settings, args.BlobClientFactory, username.MakeSQLUsernameFromPreNormalizedString(cfg.User), - args.InternalExecutor, args.DB, args.Limiters, args.Options...) + args.InternalExecutor, args.CollectionFactory, args.DB, args.Limiters, args.Options...) default: return nil, errors.Newf("cannot connect to %T; unsupported resource for an ExternalStorage connection", d) } diff --git a/pkg/cloud/gcp/gcs_storage_test.go b/pkg/cloud/gcp/gcs_storage_test.go index e573687697fb..cb0e302c451d 100644 --- a/pkg/cloud/gcp/gcs_storage_test.go +++ b/pkg/cloud/gcp/gcs_storage_test.go @@ -60,18 +60,20 @@ func TestPutGoogleCloud(t *testing.T) { if specified { uri += fmt.Sprintf("&%s=%s", cloud.AuthParam, cloud.AuthParamSpecified) } - cloudtestutils.CheckExportStore(t, uri, false, user, nil, nil, testSettings) - cloudtestutils.CheckListFiles(t, - fmt.Sprintf("gs://%s/%s/%s?%s=%s&%s=%s", - bucket, - "backup-test-specified", - "listing-test", - cloud.AuthParam, - cloud.AuthParamSpecified, - CredentialsParam, - url.QueryEscape(encoded), - ), - username.RootUserName(), nil, nil, testSettings, + cloudtestutils.CheckExportStore(t, uri, false, user, nil, nil, nil, testSettings) + cloudtestutils.CheckListFiles(t, fmt.Sprintf("gs://%s/%s/%s?%s=%s&%s=%s", + bucket, + "backup-test-specified", + "listing-test", + cloud.AuthParam, + cloud.AuthParamSpecified, + CredentialsParam, + url.QueryEscape(encoded), + ), username.RootUserName(), + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, ) }) t.Run("auth-implicit", func(t *testing.T) { @@ -80,16 +82,18 @@ func TestPutGoogleCloud(t *testing.T) { } cloudtestutils.CheckExportStore(t, fmt.Sprintf("gs://%s/%s?%s=%s", bucket, "backup-test-implicit", - cloud.AuthParam, cloud.AuthParamImplicit), false, user, nil, nil, testSettings) - cloudtestutils.CheckListFiles(t, - fmt.Sprintf("gs://%s/%s/%s?%s=%s", - bucket, - "backup-test-implicit", - "listing-test", - cloud.AuthParam, - cloud.AuthParamImplicit, - ), - username.RootUserName(), nil, nil, testSettings, + cloud.AuthParam, cloud.AuthParamImplicit), false, user, nil, nil, nil, testSettings) + cloudtestutils.CheckListFiles(t, fmt.Sprintf("gs://%s/%s/%s?%s=%s", + bucket, + "backup-test-implicit", + "listing-test", + cloud.AuthParam, + cloud.AuthParamImplicit, + ), username.RootUserName(), + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, ) }) @@ -114,18 +118,20 @@ func TestPutGoogleCloud(t *testing.T) { token.AccessToken, ) uri += fmt.Sprintf("&%s=%s", cloud.AuthParam, cloud.AuthParamSpecified) - cloudtestutils.CheckExportStore(t, uri, false, user, nil, nil, testSettings) - cloudtestutils.CheckListFiles(t, - fmt.Sprintf("gs://%s/%s/%s?%s=%s&%s=%s", - bucket, - "backup-test-specified", - "listing-test", - cloud.AuthParam, - cloud.AuthParamSpecified, - BearerTokenParam, - token.AccessToken, - ), - username.RootUserName(), nil, nil, testSettings, + cloudtestutils.CheckExportStore(t, uri, false, user, nil, nil, nil, testSettings) + cloudtestutils.CheckListFiles(t, fmt.Sprintf("gs://%s/%s/%s?%s=%s&%s=%s", + bucket, + "backup-test-specified", + "listing-test", + cloud.AuthParam, + cloud.AuthParamSpecified, + BearerTokenParam, + token.AccessToken, + ), username.RootUserName(), + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, ) }) } @@ -153,7 +159,12 @@ func TestGCSAssumeRole(t *testing.T) { // Verify that specified permissions with the credentials do not give us // access to the bucket. cloudtestutils.CheckNoPermission(t, fmt.Sprintf("gs://%s/%s?%s=%s", limitedBucket, "backup-test-assume-role", - CredentialsParam, url.QueryEscape(encoded)), user, nil, nil, testSettings) + CredentialsParam, url.QueryEscape(encoded)), user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) cloudtestutils.CheckExportStore(t, fmt.Sprintf("gs://%s/%s?%s=%s&%s=%s&%s=%s", limitedBucket, @@ -163,21 +174,22 @@ func TestGCSAssumeRole(t *testing.T) { AssumeRoleParam, assumedAccount, CredentialsParam, url.QueryEscape(encoded), - ), - false, user, nil, nil, testSettings) - cloudtestutils.CheckListFiles(t, - fmt.Sprintf("gs://%s/%s/%s?%s=%s&%s=%s&%s=%s", - limitedBucket, - "backup-test-assume-role", - "listing-test", - cloud.AuthParam, - cloud.AuthParamSpecified, - AssumeRoleParam, - assumedAccount, - CredentialsParam, - url.QueryEscape(encoded), - ), - username.RootUserName(), nil, nil, testSettings, + ), false, user, nil, nil, nil, testSettings) + cloudtestutils.CheckListFiles(t, fmt.Sprintf("gs://%s/%s/%s?%s=%s&%s=%s&%s=%s", + limitedBucket, + "backup-test-assume-role", + "listing-test", + cloud.AuthParam, + cloud.AuthParamSpecified, + AssumeRoleParam, + assumedAccount, + CredentialsParam, + url.QueryEscape(encoded), + ), username.RootUserName(), + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, ) }) @@ -189,21 +201,33 @@ func TestGCSAssumeRole(t *testing.T) { // Verify that implicit permissions with the credentials do not give us // access to the bucket. cloudtestutils.CheckNoPermission(t, fmt.Sprintf("gs://%s/%s?%s=%s", limitedBucket, "backup-test-assume-role", - cloud.AuthParam, cloud.AuthParamImplicit), user, nil, nil, testSettings) + cloud.AuthParam, cloud.AuthParamImplicit), user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) cloudtestutils.CheckExportStore(t, fmt.Sprintf("gs://%s/%s?%s=%s&%s=%s", limitedBucket, "backup-test-assume-role", - cloud.AuthParam, cloud.AuthParamImplicit, AssumeRoleParam, assumedAccount), false, user, nil, nil, testSettings) - cloudtestutils.CheckListFiles(t, - fmt.Sprintf("gs://%s/%s/%s?%s=%s&%s=%s", - limitedBucket, - "backup-test-assume-role", - "listing-test", - cloud.AuthParam, - cloud.AuthParamImplicit, - AssumeRoleParam, - assumedAccount, - ), - username.RootUserName(), nil, nil, testSettings, + cloud.AuthParam, cloud.AuthParamImplicit, AssumeRoleParam, assumedAccount), false, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) + cloudtestutils.CheckListFiles(t, fmt.Sprintf("gs://%s/%s/%s?%s=%s&%s=%s", + limitedBucket, + "backup-test-assume-role", + "listing-test", + cloud.AuthParam, + cloud.AuthParamImplicit, + AssumeRoleParam, + assumedAccount, + ), username.RootUserName(), + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, ) }) @@ -243,7 +267,12 @@ func TestGCSAssumeRole(t *testing.T) { "listing-test", q.Encode(), ) - cloudtestutils.CheckNoPermission(t, roleURI, user, nil, nil, testSettings) + cloudtestutils.CheckNoPermission(t, roleURI, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) } // Finally, check that the chain of roles can be used to access the storage. @@ -254,8 +283,18 @@ func TestGCSAssumeRole(t *testing.T) { "listing-test", q.Encode(), ) - cloudtestutils.CheckExportStore(t, uri, false, user, nil, nil, testSettings) - cloudtestutils.CheckListFiles(t, uri, user, nil, nil, testSettings) + cloudtestutils.CheckExportStore(t, uri, false, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) + cloudtestutils.CheckListFiles(t, uri, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) }) } }) @@ -297,9 +336,13 @@ func TestFileDoesNotExist(t *testing.T) { conf, err := cloud.ExternalStorageConfFromURI(gsFile, user) require.NoError(t, err) - s, err := cloud.MakeExternalStorage( - context.Background(), conf, base.ExternalIODirConfig{}, testSettings, - nil, nil, nil, nil) + s, err := cloud.MakeExternalStorage(context.Background(), conf, base.ExternalIODirConfig{}, testSettings, + nil, /* blobClientFactory */ + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) require.NoError(t, err) _, err = s.ReadFile(context.Background(), "") require.Error(t, err, "") @@ -312,9 +355,13 @@ func TestFileDoesNotExist(t *testing.T) { conf, err := cloud.ExternalStorageConfFromURI(gsFile, user) require.NoError(t, err) - s, err := cloud.MakeExternalStorage( - context.Background(), conf, base.ExternalIODirConfig{}, testSettings, nil, - nil, nil, nil) + s, err := cloud.MakeExternalStorage(context.Background(), conf, base.ExternalIODirConfig{}, testSettings, + nil, /* blobClientFactory */ + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) require.NoError(t, err) _, err = s.ReadFile(context.Background(), "") require.Error(t, err, "") @@ -345,9 +392,21 @@ func TestCompressedGCS(t *testing.T) { conf2, err := cloud.ExternalStorageConfFromURI(gsFile2, user) require.NoError(t, err) - s1, err := cloud.MakeExternalStorage(ctx, conf1, base.ExternalIODirConfig{}, testSettings, nil, nil, nil, nil) + s1, err := cloud.MakeExternalStorage(ctx, conf1, base.ExternalIODirConfig{}, testSettings, + nil, /* blobClientFactory */ + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) require.NoError(t, err) - s2, err := cloud.MakeExternalStorage(ctx, conf2, base.ExternalIODirConfig{}, testSettings, nil, nil, nil, nil) + s2, err := cloud.MakeExternalStorage(ctx, conf2, base.ExternalIODirConfig{}, testSettings, + nil, /* blobClientFactory */ + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) require.NoError(t, err) reader1, err := s1.ReadFile(context.Background(), "") diff --git a/pkg/cloud/httpsink/http_storage_test.go b/pkg/cloud/httpsink/http_storage_test.go index a90d836618af..6e8823be4b3c 100644 --- a/pkg/cloud/httpsink/http_storage_test.go +++ b/pkg/cloud/httpsink/http_storage_test.go @@ -121,7 +121,12 @@ func TestPutHttp(t *testing.T) { t.Run("singleHost", func(t *testing.T) { srv, files, cleanup := makeServer() defer cleanup() - cloudtestutils.CheckExportStore(t, srv.String(), false, user, nil, nil, testSettings) + cloudtestutils.CheckExportStore(t, srv.String(), false, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) if expected, actual := 14, files(); expected != actual { t.Fatalf("expected %d files to be written to single http store, got %d", expected, actual) } @@ -138,7 +143,12 @@ func TestPutHttp(t *testing.T) { combined := *srv1 combined.Host = strings.Join([]string{srv1.Host, srv2.Host, srv3.Host}, ",") - cloudtestutils.CheckExportStore(t, combined.String(), true, user, nil, nil, testSettings) + cloudtestutils.CheckExportStore(t, combined.String(), true, user, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) if expected, actual := 3, files1(); expected != actual { t.Fatalf("expected %d files written to http host 1, got %d", expected, actual) } @@ -161,8 +171,12 @@ func TestPutHttp(t *testing.T) { if err != nil { t.Fatal(err) } - s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, - testSettings, blobs.TestEmptyBlobClientFactory, nil, nil, nil) + s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, testSettings, blobs.TestEmptyBlobClientFactory, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) if err != nil { t.Fatal(err) } @@ -315,10 +329,12 @@ func TestCanDisableHttp(t *testing.T) { } testSettings := cluster.MakeTestingClusterSettings() - s, err := cloud.MakeExternalStorage( - context.Background(), - cloudpb.ExternalStorage{Provider: cloudpb.ExternalStorageProvider_http}, - conf, testSettings, blobs.TestEmptyBlobClientFactory, nil, nil, nil) + s, err := cloud.MakeExternalStorage(context.Background(), cloudpb.ExternalStorage{Provider: cloudpb.ExternalStorageProvider_http}, conf, testSettings, blobs.TestEmptyBlobClientFactory, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) require.Nil(t, s) require.Error(t, err) } @@ -336,10 +352,12 @@ func TestCanDisableOutbound(t *testing.T) { cloudpb.ExternalStorageProvider_gs, cloudpb.ExternalStorageProvider_nodelocal, } { - s, err := cloud.MakeExternalStorage( - context.Background(), - cloudpb.ExternalStorage{Provider: provider}, - conf, testSettings, blobs.TestEmptyBlobClientFactory, nil, nil, nil) + s, err := cloud.MakeExternalStorage(context.Background(), cloudpb.ExternalStorage{Provider: provider}, conf, testSettings, blobs.TestEmptyBlobClientFactory, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) require.Nil(t, s) require.Error(t, err) } @@ -368,9 +386,12 @@ func TestExternalStorageCanUseHTTPProxy(t *testing.T) { conf, err := cloud.ExternalStorageConfFromURI("http://my-server", username.RootUserName()) require.NoError(t, err) - s, err := cloud.MakeExternalStorage( - context.Background(), conf, base.ExternalIODirConfig{}, testSettings, nil, - nil, nil, nil) + s, err := cloud.MakeExternalStorage(context.Background(), conf, base.ExternalIODirConfig{}, testSettings, nil, + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) require.NoError(t, err) stream, err := s.ReadFile(context.Background(), "file") require.NoError(t, err) diff --git a/pkg/cloud/impl_registry.go b/pkg/cloud/impl_registry.go index 1f63eb0def7d..065258c0bd85 100644 --- a/pkg/cloud/impl_registry.go +++ b/pkg/cloud/impl_registry.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -151,6 +152,7 @@ func ExternalStorageFromURI( blobClientFactory blobs.BlobClientFactory, user username.SQLUsername, ie sqlutil.InternalExecutor, + cf *descs.CollectionFactory, kvDB *kv.DB, limiters Limiters, opts ...ExternalStorageOption, @@ -159,7 +161,8 @@ func ExternalStorageFromURI( if err != nil { return nil, err } - return MakeExternalStorage(ctx, conf, externalConfig, settings, blobClientFactory, ie, kvDB, limiters, opts...) + return MakeExternalStorage(ctx, conf, externalConfig, settings, blobClientFactory, + ie, cf, kvDB, limiters, opts...) } // SanitizeExternalStorageURI returns the external storage URI with with some @@ -204,6 +207,7 @@ func MakeExternalStorage( settings *cluster.Settings, blobClientFactory blobs.BlobClientFactory, ie sqlutil.InternalExecutor, + cf *descs.CollectionFactory, kvDB *kv.DB, limiters Limiters, opts ...ExternalStorageOption, @@ -213,6 +217,7 @@ func MakeExternalStorage( Settings: settings, BlobClientFactory: blobClientFactory, InternalExecutor: ie, + CollectionFactory: cf, DB: kvDB, Options: opts, Limiters: limiters, diff --git a/pkg/cloud/nodelocal/nodelocal_storage_test.go b/pkg/cloud/nodelocal/nodelocal_storage_test.go index 254f96f6c37d..4792c601b8e9 100644 --- a/pkg/cloud/nodelocal/nodelocal_storage_test.go +++ b/pkg/cloud/nodelocal/nodelocal_storage_test.go @@ -30,7 +30,11 @@ func TestPutLocal(t *testing.T) { testSettings.ExternalIODir = p dest := MakeLocalStorageURI(p) - cloudtestutils.CheckExportStore(t, dest, false, username.RootUserName(), nil, nil, testSettings) - cloudtestutils.CheckListFiles(t, "nodelocal://0/listing-test/basepath", - username.RootUserName(), nil, nil, testSettings) + cloudtestutils.CheckExportStore(t, dest, false, username.RootUserName(), nil, nil, nil, testSettings) + cloudtestutils.CheckListFiles(t, "nodelocal://0/listing-test/basepath", username.RootUserName(), + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + testSettings, + ) } diff --git a/pkg/cloud/nullsink/nullsink_storage_test.go b/pkg/cloud/nullsink/nullsink_storage_test.go index a2a68c1d6a3e..04f898ab33d6 100644 --- a/pkg/cloud/nullsink/nullsink_storage_test.go +++ b/pkg/cloud/nullsink/nullsink_storage_test.go @@ -36,7 +36,14 @@ func TestNullSinkReadAndWrite(t *testing.T) { t.Fatal(err) } - s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, nil, nil, nil, nil, nil) + s, err := cloud.MakeExternalStorage(ctx, conf, base.ExternalIODirConfig{}, + nil, /* Cluster Settings */ + nil, /* blobClientFactory */ + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) if err != nil { t.Fatal(err) } diff --git a/pkg/cloud/userfile/BUILD.bazel b/pkg/cloud/userfile/BUILD.bazel index 05ca28916e74..da46209f30b1 100644 --- a/pkg/cloud/userfile/BUILD.bazel +++ b/pkg/cloud/userfile/BUILD.bazel @@ -46,6 +46,7 @@ go_test( "//pkg/security/username", "//pkg/server", "//pkg/settings/cluster", + "//pkg/sql/catalog/descs", "//pkg/sql/sem/tree", "//pkg/sql/sqlutil", "//pkg/sql/tests", diff --git a/pkg/cloud/userfile/file_table_storage.go b/pkg/cloud/userfile/file_table_storage.go index b7cf6c42c055..c30b9fe7c06a 100644 --- a/pkg/cloud/userfile/file_table_storage.go +++ b/pkg/cloud/userfile/file_table_storage.go @@ -122,7 +122,7 @@ func makeFileTableStorage( // cfg.User is already a normalized SQL username. user := username.MakeSQLUsernameFromPreNormalizedString(cfg.User) - executor := filetable.MakeInternalFileToTableExecutor(args.InternalExecutor, args.DB) + executor := filetable.MakeInternalFileToTableExecutor(args.InternalExecutor, args.CollectionFactory, args.DB) fileToTableSystem, err := filetable.NewFileToTableSystem(ctx, cfg.QualifiedTableName, executor, user) diff --git a/pkg/cloud/userfile/file_table_storage_test.go b/pkg/cloud/userfile/file_table_storage_test.go index a835ccb875df..940d009a61b0 100644 --- a/pkg/cloud/userfile/file_table_storage_test.go +++ b/pkg/cloud/userfile/file_table_storage_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud/cloudtestutils" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/tests" @@ -48,19 +49,21 @@ func TestPutUserFileTable(t *testing.T) { dest := MakeUserFileStorageURI(qualifiedTableName, filename) ie := s.InternalExecutor().(sqlutil.InternalExecutor) - cloudtestutils.CheckExportStore(t, dest, false, username.RootUserName(), ie, kvDB, testSettings) + cf := s.CollectionFactory().(*descs.CollectionFactory) + cloudtestutils.CheckExportStore(t, dest, false, username.RootUserName(), ie, cf, kvDB, testSettings) cloudtestutils.CheckListFiles(t, "userfile://defaultdb.public.file_list_table/listing-test/basepath", - username.RootUserName(), ie, kvDB, testSettings) + username.RootUserName(), ie, cf, kvDB, testSettings) t.Run("empty-qualified-table-name", func(t *testing.T) { dest := MakeUserFileStorageURI("", filename) ie := s.InternalExecutor().(sqlutil.InternalExecutor) - cloudtestutils.CheckExportStore(t, dest, false, username.RootUserName(), ie, kvDB, testSettings) + cf := s.CollectionFactory().(*descs.CollectionFactory) + cloudtestutils.CheckExportStore(t, dest, false, username.RootUserName(), ie, cf, kvDB, testSettings) cloudtestutils.CheckListFilesCanonical(t, "userfile:///listing-test/basepath", "userfile://defaultdb.public.userfiles_root/listing-test/basepath", - username.RootUserName(), ie, kvDB, testSettings) + username.RootUserName(), ie, cf, kvDB, testSettings) }) t.Run("reject-normalized-basename", func(t *testing.T) { @@ -68,8 +71,7 @@ func TestPutUserFileTable(t *testing.T) { userfileURL := url.URL{Scheme: "userfile", Host: qualifiedTableName, Path: ""} store, err := cloud.ExternalStorageFromURI(ctx, userfileURL.String()+"/", - base.ExternalIODirConfig{}, cluster.NoSettings, blobs.TestEmptyBlobClientFactory, - username.RootUserName(), ie, kvDB, nil) + base.ExternalIODirConfig{}, cluster.NoSettings, blobs.TestEmptyBlobClientFactory, username.RootUserName(), ie, cf, kvDB, nil) require.NoError(t, err) defer store.Close() @@ -107,6 +109,7 @@ func TestUserScoping(t *testing.T) { dest := MakeUserFileStorageURI(qualifiedTableName, "") ie := s.InternalExecutor().(sqlutil.InternalExecutor) + cf := s.CollectionFactory().(*descs.CollectionFactory) // Create two users and grant them all privileges on defaultdb. user1 := username.MakeSQLUsernameFromPreNormalizedString("foo") @@ -116,13 +119,13 @@ func TestUserScoping(t *testing.T) { // Write file as user1. fileTableSystem1, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, - cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user1, ie, kvDB, nil) + cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user1, ie, cf, kvDB, nil) require.NoError(t, err) require.NoError(t, cloud.WriteFile(ctx, fileTableSystem1, filename, bytes.NewReader([]byte("aaa")))) // Attempt to read/write file as user2 and expect to fail. fileTableSystem2, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, - cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user2, ie, kvDB, nil) + cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user2, ie, cf, kvDB, nil) require.NoError(t, err) _, err = fileTableSystem2.ReadFile(ctx, filename) require.Error(t, err) @@ -130,7 +133,7 @@ func TestUserScoping(t *testing.T) { // Read file as root and expect to succeed. fileTableSystem3, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, - cluster.NoSettings, blobs.TestEmptyBlobClientFactory, username.RootUserName(), ie, kvDB, nil) + cluster.NoSettings, blobs.TestEmptyBlobClientFactory, username.RootUserName(), ie, cf, kvDB, nil) require.NoError(t, err) _, err = fileTableSystem3.ReadFile(ctx, filename) require.NoError(t, err) diff --git a/pkg/cloud/userfile/filetable/BUILD.bazel b/pkg/cloud/userfile/filetable/BUILD.bazel index 4e2f82feda0c..5ca59c1ae3d4 100644 --- a/pkg/cloud/userfile/filetable/BUILD.bazel +++ b/pkg/cloud/userfile/filetable/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/cloud", "//pkg/kv", "//pkg/security/username", + "//pkg/sql/catalog/descs", "//pkg/sql/parser", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", diff --git a/pkg/cloud/userfile/filetable/file_table_read_writer.go b/pkg/cloud/userfile/filetable/file_table_read_writer.go index 75b5bbfd1514..994163e315d6 100644 --- a/pkg/cloud/userfile/filetable/file_table_read_writer.go +++ b/pkg/cloud/userfile/filetable/file_table_read_writer.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -60,6 +61,7 @@ type FileToTableSystemExecutor interface { // SQL connection to interact with the database. type InternalFileToTableExecutor struct { ie sqlutil.InternalExecutor + cf *descs.CollectionFactory db *kv.DB } @@ -68,9 +70,9 @@ var _ FileToTableSystemExecutor = &InternalFileToTableExecutor{} // MakeInternalFileToTableExecutor returns an instance of a // InternalFileToTableExecutor. func MakeInternalFileToTableExecutor( - ie sqlutil.InternalExecutor, db *kv.DB, + ie sqlutil.InternalExecutor, cf *descs.CollectionFactory, db *kv.DB, ) *InternalFileToTableExecutor { - return &InternalFileToTableExecutor{ie, db} + return &InternalFileToTableExecutor{ie, cf, db} } // Query implements the FileToTableSystemExecutor interface. diff --git a/pkg/cloud/userfile/filetable/filetabletest/BUILD.bazel b/pkg/cloud/userfile/filetable/filetabletest/BUILD.bazel index dfd5e05f2e18..e99f83fa5def 100644 --- a/pkg/cloud/userfile/filetable/filetabletest/BUILD.bazel +++ b/pkg/cloud/userfile/filetable/filetabletest/BUILD.bazel @@ -17,6 +17,7 @@ go_test( "//pkg/security/username", "//pkg/server", "//pkg/sql", + "//pkg/sql/catalog/descs", "//pkg/sql/tests", "//pkg/testutils", "//pkg/testutils/serverutils", diff --git a/pkg/cloud/userfile/filetable/filetabletest/file_table_read_writer_test.go b/pkg/cloud/userfile/filetable/filetabletest/file_table_read_writer_test.go index 8fab37c5489b..d5ad2777b2b6 100644 --- a/pkg/cloud/userfile/filetable/filetabletest/file_table_read_writer_test.go +++ b/pkg/cloud/userfile/filetable/filetabletest/file_table_read_writer_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -107,7 +108,7 @@ func TestListAndDeleteFiles(t *testing.T) { defer s.Stopper().Stop(ctx) executor := filetable.MakeInternalFileToTableExecutor(s.InternalExecutor().(*sql. - InternalExecutor), kvDB) + InternalExecutor), s.CollectionFactory().(*descs.CollectionFactory), kvDB) fileTableReadWriter, err := filetable.NewFileToTableSystem(ctx, qualifiedTableName, executor, username.RootUserName()) require.NoError(t, err) @@ -158,7 +159,7 @@ func TestReadWriteFile(t *testing.T) { defer s.Stopper().Stop(ctx) executor := filetable.MakeInternalFileToTableExecutor(s.InternalExecutor().(*sql. - InternalExecutor), kvDB) + InternalExecutor), s.CollectionFactory().(*descs.CollectionFactory), kvDB) fileTableReadWriter, err := filetable.NewFileToTableSystem(ctx, qualifiedTableName, executor, username.RootUserName()) require.NoError(t, err) @@ -341,7 +342,7 @@ func TestUserGrants(t *testing.T) { // Operate under non-admin user. executor := filetable.MakeInternalFileToTableExecutor(s.InternalExecutor().(*sql. - InternalExecutor), kvDB) + InternalExecutor), s.CollectionFactory().(*descs.CollectionFactory), kvDB) johnUser := username.MakeSQLUsernameFromPreNormalizedString("john") fileTableReadWriter, err := filetable.NewFileToTableSystem(ctx, qualifiedTableName, executor, johnUser) @@ -425,7 +426,7 @@ func TestDifferentUserDisallowed(t *testing.T) { // Operate under non-admin user john. executor := filetable.MakeInternalFileToTableExecutor(s.InternalExecutor().(*sql. - InternalExecutor), kvDB) + InternalExecutor), s.CollectionFactory().(*descs.CollectionFactory), kvDB) johnUser := username.MakeSQLUsernameFromPreNormalizedString("john") fileTableReadWriter, err := filetable.NewFileToTableSystem(ctx, qualifiedTableName, executor, johnUser) @@ -483,7 +484,7 @@ func TestDifferentRoleDisallowed(t *testing.T) { // Operate under non-admin user john. executor := filetable.MakeInternalFileToTableExecutor(s.InternalExecutor().(*sql. - InternalExecutor), kvDB) + InternalExecutor), s.CollectionFactory().(*descs.CollectionFactory), kvDB) johnUser := username.MakeSQLUsernameFromPreNormalizedString("john") fileTableReadWriter, err := filetable.NewFileToTableSystem(ctx, qualifiedTableName, executor, johnUser) @@ -518,7 +519,7 @@ func TestDatabaseScope(t *testing.T) { defer s.Stopper().Stop(ctx) executor := filetable.MakeInternalFileToTableExecutor(s.InternalExecutor().(*sql. - InternalExecutor), kvDB) + InternalExecutor), s.CollectionFactory().(*descs.CollectionFactory), kvDB) fileTableReadWriter, err := filetable.NewFileToTableSystem(ctx, qualifiedTableName, executor, username.RootUserName()) require.NoError(t, err) diff --git a/pkg/server/external_storage_builder.go b/pkg/server/external_storage_builder.go index 77780f680cf9..d2da4d4da08f 100644 --- a/pkg/server/external_storage_builder.go +++ b/pkg/server/external_storage_builder.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/errors" ) @@ -38,6 +39,7 @@ type externalStorageBuilder struct { blobClientFactory blobs.BlobClientFactory initCalled bool ie *sql.InternalExecutor + cf *descs.CollectionFactory db *kv.DB limiters cloud.Limiters recorder multitenant.TenantSideExternalIORecorder @@ -51,6 +53,7 @@ func (e *externalStorageBuilder) init( nodeDialer *nodedialer.Dialer, testingKnobs base.TestingKnobs, ie *sql.InternalExecutor, + cf *descs.CollectionFactory, db *kv.DB, recorder multitenant.TenantSideExternalIORecorder, ) { @@ -66,6 +69,7 @@ func (e *externalStorageBuilder) init( e.blobClientFactory = blobClientFactory e.initCalled = true e.ie = ie + e.cf = cf e.db = db e.limiters = cloud.MakeLimiters(ctx, &settings.SV) e.recorder = recorder @@ -78,7 +82,7 @@ func (e *externalStorageBuilder) makeExternalStorage( return nil, errors.New("cannot create external storage before init") } return cloud.MakeExternalStorage(ctx, dest, e.conf, e.settings, e.blobClientFactory, e.ie, - e.db, e.limiters, append(e.defaultOptions(), opts...)...) + e.cf, e.db, e.limiters, append(e.defaultOptions(), opts...)...) } func (e *externalStorageBuilder) makeExternalStorageFromURI( @@ -87,7 +91,8 @@ func (e *externalStorageBuilder) makeExternalStorageFromURI( if !e.initCalled { return nil, errors.New("cannot create external storage before init") } - return cloud.ExternalStorageFromURI(ctx, uri, e.conf, e.settings, e.blobClientFactory, user, e.ie, e.db, e.limiters, append(e.defaultOptions(), opts...)...) + return cloud.ExternalStorageFromURI(ctx, uri, e.conf, e.settings, e.blobClientFactory, + user, e.ie, e.cf, e.db, e.limiters, append(e.defaultOptions(), opts...)...) } func (e *externalStorageBuilder) defaultOptions() []cloud.ExternalStorageOption { diff --git a/pkg/server/server.go b/pkg/server/server.go index a25b70ab119f..a45df05d2055 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -64,6 +64,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" _ "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry" // register schedules declared outside of pkg/sql "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" @@ -487,6 +488,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { // InternalExecutor uses this one instance. internalExecutor := &sql.InternalExecutor{} jobRegistry := &jobs.Registry{} // ditto + collectionFactory := &descs.CollectionFactory{} // Create an ExternalStorageBuilder. This is only usable after Start() where // we initialize all the configuration params. @@ -836,6 +838,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { closedSessionCache: closedSessionCache, flowScheduler: flowScheduler, circularInternalExecutor: internalExecutor, + collectionFactory: collectionFactory, internalExecutorFactory: nil, // will be initialized in server.newSQLServer. circularJobRegistry: jobRegistry, jobAdoptionStopFile: jobAdoptionStopFile, @@ -1083,6 +1086,7 @@ func (s *Server) PreStart(ctx context.Context) error { s.nodeDialer, s.cfg.TestingKnobs, &fileTableInternalExecutor, + s.sqlServer.execCfg.CollectionFactory, s.db, nil, /* TenantExternalIORecorder */ ) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index f8b48364579e..560777c008f0 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -306,6 +306,8 @@ type sqlServerArgs struct { // TODO(tbg): make this less hacky. circularInternalExecutor *sql.InternalExecutor // empty initially + collectionFactory *descs.CollectionFactory + // internalExecutorFactory is to initialize an internal executor. internalExecutorFactory sqlutil.InternalExecutorFactory @@ -988,6 +990,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.registry.AddMetricStruct(m) } *cfg.circularInternalExecutor = sql.MakeInternalExecutor(pgServer.SQLServer, internalMemMetrics, ieFactoryMonitor) + *cfg.collectionFactory = *collectionFactory cfg.internalExecutorFactory = ieFactory execCfg.InternalExecutor = cfg.circularInternalExecutor stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry( diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index adb9802b286f..e83e31883b9d 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/optionalnodeliveness" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" @@ -500,6 +501,7 @@ func makeTenantSQLServerArgs( circularInternalExecutor := &sql.InternalExecutor{} circularJobRegistry := &jobs.Registry{} + collectionFactory := &descs.CollectionFactory{} // Initialize the protectedts subsystem in multi-tenant clusters. var protectedTSProvider protectedts.Provider @@ -539,6 +541,7 @@ func makeTenantSQLServerArgs( nodeDialer, baseCfg.TestingKnobs, circularInternalExecutor, + collectionFactory, db, costController, ) @@ -607,6 +610,7 @@ func makeTenantSQLServerArgs( sessionRegistry: sessionRegistry, flowScheduler: flowScheduler, circularInternalExecutor: circularInternalExecutor, + collectionFactory: collectionFactory, circularJobRegistry: circularJobRegistry, protectedtsProvider: protectedTSProvider, rangeFeedFactory: rangeFeedFactory, diff --git a/pkg/sql/importer/import_processor_test.go b/pkg/sql/importer/import_processor_test.go index a12967e259b8..41ac6aac9511 100644 --- a/pkg/sql/importer/import_processor_test.go +++ b/pkg/sql/importer/import_processor_test.go @@ -876,7 +876,12 @@ func externalStorageFactory( return nil, err } return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{}, - nil, blobs.TestBlobServiceClient(workdir), nil, nil, nil) + nil, blobs.TestBlobServiceClient(workdir), + nil, /* ie */ + nil, /* cf */ + nil, /* kvDB */ + nil, /* limiters */ + ) } // Helper to create and initialize testSpec. diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index 013c27596508..e9345f9f0d52 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -2700,8 +2700,9 @@ func TestImportObjectLevelRBAC(t *testing.T) { writeToUserfile := func(filename, data string) { // Write to userfile storage now that testuser has CREATE privileges. ie := tc.Server(0).InternalExecutor().(*sql.InternalExecutor) + cf := tc.Server(0).CollectionFactory().(*descs.CollectionFactory) fileTableSystem1, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, - cluster.NoSettings, blobs.TestEmptyBlobClientFactory, username.TestUserName(), ie, tc.Server(0).DB(), nil) + cluster.NoSettings, blobs.TestEmptyBlobClientFactory, username.TestUserName(), ie, cf, tc.Server(0).DB(), nil) require.NoError(t, err) require.NoError(t, cloud.WriteFile(ctx, fileTableSystem1, filename, bytes.NewReader([]byte(data)))) } @@ -5848,7 +5849,11 @@ func TestImportPgDumpIgnoredStmts(t *testing.T) { tc.Server(0).ClusterSettings(), blobs.TestEmptyBlobClientFactory, username.RootUserName(), - tc.Server(0).InternalExecutor().(*sql.InternalExecutor), tc.Server(0).DB(), nil) + tc.Server(0).InternalExecutor().(*sql.InternalExecutor), + tc.Server(0).CollectionFactory().(*descs.CollectionFactory), + tc.Server(0).DB(), + nil, + ) require.NoError(t, err) defer store.Close() From 4a2912c4b4556df71294948a9a1659d5ffcf6280 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Wed, 24 Aug 2022 15:17:13 +0000 Subject: [PATCH 2/9] filetable: use `CollectionFactory.TxnWithExecutor()` for DDL statements fixes #76764 Release justification: Low risk, high benefit changes to existing functionality Release note: none --- .../backup-restore/file_table_read_write | 43 ++++++++++++++++ .../filetable/file_table_read_writer.go | 49 ++++++++++--------- 2 files changed, 69 insertions(+), 23 deletions(-) create mode 100644 pkg/ccl/backupccl/testdata/backup-restore/file_table_read_write diff --git a/pkg/ccl/backupccl/testdata/backup-restore/file_table_read_write b/pkg/ccl/backupccl/testdata/backup-restore/file_table_read_write new file mode 100644 index 000000000000..328263642493 --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/file_table_read_write @@ -0,0 +1,43 @@ +subtest backup_file_table + +new-server name=s1 +---- + +exec-sql +CREATE DATABASE to_backup; +---- + +exec-sql +CREATE DATABASE backups; +---- + +exec-sql +BACKUP DATABASE to_backup INTO 'userfile://backups.public.userfiles_$user/data'; +---- + +query-sql +SELECT * FROM backups.crdb_internal.invalid_objects; +---- + +exec-sql +USE backups; +---- + +query-sql +SELECT * FROM pg_catalog.pg_tables where schemaname='public'; +---- +public userfiles_$user_upload_files root true false false false +public userfiles_$user_upload_payload root true false false false + +query-sql +SELECT conname FROM pg_catalog.pg_constraint con +INNER JOIN pg_catalog.pg_class rel ON rel.oid = con.conrelid +INNER JOIN pg_catalog.pg_namespace nsp +ON nsp.oid = connamespace +WHERE rel.relname='userfiles_$user_upload_payload' +ORDER BY conname; +---- +file_id_fk +userfiles_$user_upload_payload_pkey + +subtest end diff --git a/pkg/cloud/userfile/filetable/file_table_read_writer.go b/pkg/cloud/userfile/filetable/file_table_read_writer.go index 994163e315d6..182c6720112f 100644 --- a/pkg/cloud/userfile/filetable/file_table_read_writer.go +++ b/pkg/cloud/userfile/filetable/file_table_read_writer.go @@ -245,25 +245,27 @@ func NewFileToTableSystem( if err != nil { return nil, err } - if err := e.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := e.cf.TxnWithExecutor(ctx, e.db, nil /* SessionData */, func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor, + ) error { // TODO(adityamaru): Handle scenario where the user has already created // tables with the same names not via the FileToTableSystem // object. Not sure if we want to error out or work around it. - tablesExist, err := f.checkIfFileAndPayloadTableExist(ctx, txn, e.ie) + tablesExist, err := f.checkIfFileAndPayloadTableExist(ctx, txn, ie) if err != nil { return err } if !tablesExist { - if err := f.createFileAndPayloadTables(ctx, txn, e.ie); err != nil { + if err := f.createFileAndPayloadTables(ctx, txn, ie); err != nil { return err } - if err := f.grantCurrentUserTablePrivileges(ctx, txn, e.ie); err != nil { + if err := f.grantCurrentUserTablePrivileges(ctx, txn, ie); err != nil { return err } - if err := f.revokeOtherUserTablePrivileges(ctx, txn, e.ie); err != nil { + if err := f.revokeOtherUserTablePrivileges(ctx, txn, ie); err != nil { return err } } @@ -364,26 +366,27 @@ func DestroyUserFileSystem(ctx context.Context, f *FileToTableSystem) error { return err } - if err := e.db.Txn(ctx, - func(ctx context.Context, txn *kv.Txn) error { - dropPayloadTableQuery := fmt.Sprintf(`DROP TABLE %s`, f.GetFQPayloadTableName()) - _, err := e.ie.ExecEx(ctx, "drop-payload-table", txn, - sessiondata.InternalExecutorOverride{User: f.username}, - dropPayloadTableQuery) - if err != nil { - return errors.Wrap(err, "failed to drop payload table") - } + if err := e.cf.TxnWithExecutor(ctx, e.db, nil, func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor, + ) error { + dropPayloadTableQuery := fmt.Sprintf(`DROP TABLE %s`, f.GetFQPayloadTableName()) + _, err := ie.ExecEx(ctx, "drop-payload-table", txn, + sessiondata.InternalExecutorOverride{User: f.username}, + dropPayloadTableQuery) + if err != nil { + return errors.Wrap(err, "failed to drop payload table") + } - dropFileTableQuery := fmt.Sprintf(`DROP TABLE %s CASCADE`, f.GetFQFileTableName()) - _, err = e.ie.ExecEx(ctx, "drop-file-table", txn, - sessiondata.InternalExecutorOverride{User: f.username}, - dropFileTableQuery) - if err != nil { - return errors.Wrap(err, "failed to drop file table") - } + dropFileTableQuery := fmt.Sprintf(`DROP TABLE %s CASCADE`, f.GetFQFileTableName()) + _, err = ie.ExecEx(ctx, "drop-file-table", txn, + sessiondata.InternalExecutorOverride{User: f.username}, + dropFileTableQuery) + if err != nil { + return errors.Wrap(err, "failed to drop file table") + } - return nil - }); err != nil { + return nil + }); err != nil { return err } From 152ae04105d7bf16155d48a5ba5103bf06366e7a Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Wed, 24 Aug 2022 15:18:39 +0000 Subject: [PATCH 3/9] sql: migreate authorization_test to `CollectionFactory.TxnWithExecutor()` for DDLs This is part of the project to migrate existing DDL statements running with an internal executor to `descs.CollectionFactory()`. DDLs are only allowed to run with internal executor inited via this function. Release justification: Low risk, high benefit changes to existing functionality Release note: None --- pkg/sql/authorization_test.go | 83 +++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 37 deletions(-) diff --git a/pkg/sql/authorization_test.go b/pkg/sql/authorization_test.go index cae7b84ab9d7..c628a982382f 100644 --- a/pkg/sql/authorization_test.go +++ b/pkg/sql/authorization_test.go @@ -15,8 +15,10 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -30,7 +32,7 @@ func TestCheckAnyPrivilegeForNodeUser(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - s, _, kv := serverutils.StartServer(t, base.TestServerArgs{}) + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(ctx) @@ -38,45 +40,52 @@ func TestCheckAnyPrivilegeForNodeUser(t *testing.T) { require.NotNil(t, ts.InternalExecutor()) - ie := ts.InternalExecutor().(sqlutil.InternalExecutor) + cf := ts.CollectionFactory().(*descs.CollectionFactory) - txn := kv.NewTxn(ctx, "get-all-databases") - row, err := ie.QueryRowEx( - ctx, "get-all-databases", txn, sessiondata.NodeUserSessionDataOverride, - "SELECT count(1) FROM crdb_internal.databases", - ) - require.NoError(t, err) - // 3 databases (system, defaultdb, postgres). - require.Equal(t, row.String(), "(3)") + if err := cf.TxnWithExecutor(ctx, s.DB(), nil, func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor, + ) error { + row, err := ie.QueryRowEx( + ctx, "get-all-databases", txn, sessiondata.NodeUserSessionDataOverride, + "SELECT count(1) FROM crdb_internal.databases", + ) + require.NoError(t, err) + // 3 databases (system, defaultdb, postgres). + require.Equal(t, row.String(), "(3)") - _, err = ie.ExecEx(ctx, "create-database1", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - "CREATE DATABASE test1") - require.NoError(t, err) + _, err = ie.ExecEx(ctx, "create-database1", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + "CREATE DATABASE test1") + require.NoError(t, err) - _, err = ie.ExecEx(ctx, "create-database2", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - "CREATE DATABASE test2") - require.NoError(t, err) + _, err = ie.ExecEx(ctx, "create-database2", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + "CREATE DATABASE test2") + require.NoError(t, err) - // Revoke CONNECT on all non-system databases and ensure that when querying - // with node, we can still see all the databases. - _, err = ie.ExecEx(ctx, "revoke-privileges", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - "REVOKE CONNECT ON DATABASE test1 FROM public") - require.NoError(t, err) - _, err = ie.ExecEx(ctx, "revoke-privileges", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - "REVOKE CONNECT ON DATABASE test2 FROM public") - require.NoError(t, err) - _, err = ie.ExecEx(ctx, "revoke-privileges", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - "REVOKE CONNECT ON DATABASE defaultdb FROM public") - require.NoError(t, err) - _, err = ie.ExecEx(ctx, "revoke-privileges", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - "REVOKE CONNECT ON DATABASE postgres FROM public") - require.NoError(t, err) + // Revoke CONNECT on all non-system databases and ensure that when querying + // with node, we can still see all the databases. + _, err = ie.ExecEx(ctx, "revoke-privileges", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + "REVOKE CONNECT ON DATABASE test1 FROM public") + require.NoError(t, err) + _, err = ie.ExecEx(ctx, "revoke-privileges", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + "REVOKE CONNECT ON DATABASE test2 FROM public") + require.NoError(t, err) + _, err = ie.ExecEx(ctx, "revoke-privileges", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + "REVOKE CONNECT ON DATABASE defaultdb FROM public") + require.NoError(t, err) + _, err = ie.ExecEx(ctx, "revoke-privileges", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + "REVOKE CONNECT ON DATABASE postgres FROM public") + require.NoError(t, err) + + row, err = ie.QueryRowEx( + ctx, "get-all-databases", txn, sessiondata.NodeUserSessionDataOverride, + "SELECT count(1) FROM crdb_internal.databases", + ) + require.NoError(t, err) + // 3 databases (system, defaultdb, postgres, test1, test2). + require.Equal(t, row.String(), "(5)") + return nil + }); err != nil { + t.Fatal(err) + } - row, err = ie.QueryRowEx( - ctx, "get-all-databases", txn, sessiondata.NodeUserSessionDataOverride, - "SELECT count(1) FROM crdb_internal.databases", - ) - require.NoError(t, err) - // 3 databases (system, defaultdb, postgres, test1, test2). - require.Equal(t, row.String(), "(5)") } From d4f2e6471b222952f952b542ee2ffbde96504501 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Wed, 24 Aug 2022 15:21:00 +0000 Subject: [PATCH 4/9] sql: migrate create_external_connection to `planner.WithInternalExecutor()` for DDLs This commit is to migrate the existing DDLs to using `planner.WithInternalExecutor()`. DDLs with internal executors are only allowed if the latter is bounded with txn-realated metadata. Release justification: Low risk, high benefit changes to existing functionality Release note: none --- pkg/sql/create_external_connection.go | 36 +++++++++++++++------------ 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/pkg/sql/create_external_connection.go b/pkg/sql/create_external_connection.go index 56056a841399..16538b2ec257 100644 --- a/pkg/sql/create_external_connection.go +++ b/pkg/sql/create_external_connection.go @@ -16,12 +16,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud/externalconn" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" "github.com/cockroachdb/errors" ) @@ -112,22 +114,24 @@ func (p *planner) createExternalConnection( ex.SetConnectionType(exConn.ConnectionType()) ex.SetOwner(p.User()) - // Create the External Connection and persist it in the - // `system.external_connections` table. - if err := ex.Create(params.ctx, params.ExecCfg().InternalExecutor, params.p.User(), p.Txn()); err != nil { - return errors.Wrap(err, "failed to create external connection") - } - - // Grant user `ALL` on the newly created External Connection. - grantStatement := fmt.Sprintf(`GRANT ALL ON EXTERNAL CONNECTION "%s" TO %s`, - name, p.User().SQLIdentifier()) - _, err = params.ExecCfg().InternalExecutor.ExecEx(params.ctx, - "grant-on-create-external-connection", p.Txn(), - sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, grantStatement) - if err != nil { - return errors.Wrap(err, "failed to grant on newly created External Connection") - } - return nil + return p.WithInternalExecutor(params.ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + // Create the External Connection and persist it in the + // `system.external_connections` table. + if err := ex.Create(params.ctx, ie, p.User(), txn); err != nil { + return errors.Wrap(err, "failed to create external connection") + } + + // Grant user `ALL` on the newly created External Connection. + grantStatement := fmt.Sprintf(`GRANT ALL ON EXTERNAL CONNECTION "%s" TO %s`, + name, p.User().SQLIdentifier()) + _, err = ie.ExecEx(params.ctx, + "grant-on-create-external-connection", txn, + sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, grantStatement) + if err != nil { + return errors.Wrap(err, "failed to grant on newly created External Connection") + } + return nil + }) } func (c *createExternalConectionNode) Next(_ runParams) (bool, error) { return false, nil } From 1d5a912c5cec4d05716de0ca006ee3bf4908cd9f Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Wed, 24 Aug 2022 15:24:30 +0000 Subject: [PATCH 5/9] backupccl: migrate job restoration to `CollectionFactory.TxnWithExecutor()` for DDLs This commit is part of the project to migrate DDLs running with internal executor with the correct interface. DDLs are only allowed to run with internal executor that is bound to txn-related metadata. Release justification: Low risk, high benefit changes to existing functionality Release note: none --- pkg/ccl/backupccl/restore_job.go | 5 ++--- pkg/ccl/backupccl/restore_planning.go | 6 ++++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 1f2a8d54f6fe..c4c475860dbf 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -2214,8 +2214,8 @@ func (r *restoreResumer) OnFailOrCancel( logJobCompletion(ctx, restoreJobEventType, r.job.ID(), false, jobErr) execCfg := execCtx.(sql.JobExecContext).ExecCfg() - if err := sql.DescsTxn(ctx, execCfg, func( - ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + if err := execCfg.CollectionFactory.TxnWithExecutor(ctx, execCfg.DB, p.SessionData(), func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor, ) error { for _, tenant := range details.Tenants { tenant.State = descpb.TenantInfo_DROP @@ -2233,7 +2233,6 @@ func (r *restoreResumer) OnFailOrCancel( if details.DescriptorCoverage == tree.AllDescriptors { // We've dropped defaultdb and postgres in the planning phase, we must // recreate them now if the full cluster restore failed. - ie := p.ExecCfg().InternalExecutor _, err := ie.Exec(ctx, "recreate-defaultdb", txn, "CREATE DATABASE IF NOT EXISTS defaultdb") if err != nil { return err diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index 3972e7af05e1..f8c1bb5e18bf 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -58,6 +58,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -705,8 +706,9 @@ func getDatabaseIDAndDesc( // as regular databases, we drop them before restoring them again in the // restore. func dropDefaultUserDBs(ctx context.Context, execCfg *sql.ExecutorConfig) error { - return sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error { - ie := execCfg.InternalExecutor + return execCfg.CollectionFactory.TxnWithExecutor(ctx, execCfg.DB, nil /* session data */, func( + ctx context.Context, txn *kv.Txn, _ *descs.Collection, ie sqlutil.InternalExecutor, + ) error { _, err := ie.Exec(ctx, "drop-defaultdb", txn, "DROP DATABASE IF EXISTS defaultdb") if err != nil { return err From 671fd41f285e169655b5d1b7de2cbd657b2d3d8e Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Wed, 24 Aug 2022 15:30:05 +0000 Subject: [PATCH 6/9] backupccl: refactor `cleanupTempSystemTables()` We stripped `txn` from the parameter list in `cleanupTempSystemTables()`. It was run with not-nil txn by mistake, which is a mis-usage of running internal executor with DDLs. Release justification: bug fix Release note: none --- pkg/ccl/backupccl/restore_job.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index c4c475860dbf..670079ea77cb 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1669,7 +1669,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro // Reload the details as we may have updated the job. details = r.job.Details().(jobspb.RestoreDetails) - if err := r.cleanupTempSystemTables(ctx, nil /* txn */); err != nil { + if err := r.cleanupTempSystemTables(ctx); err != nil { return err } } else if isSystemUserRestore(details) { @@ -1678,7 +1678,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } details = r.job.Details().(jobspb.RestoreDetails) - if err := r.cleanupTempSystemTables(ctx, nil /* txn */); err != nil { + if err := r.cleanupTempSystemTables(ctx); err != nil { return err } } @@ -2251,7 +2251,7 @@ func (r *restoreResumer) OnFailOrCancel( if details.DescriptorCoverage == tree.AllDescriptors { // The temporary system table descriptors should already have been dropped // in `dropDescriptors` but we still need to drop the temporary system db. - if err := execCfg.DB.Txn(ctx, r.cleanupTempSystemTables); err != nil { + if err := r.cleanupTempSystemTables(ctx); err != nil { return err } } @@ -2800,12 +2800,12 @@ func (r *restoreResumer) restoreSystemTables( return nil } -func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context, txn *kv.Txn) error { +func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context) error { executor := r.execCfg.InternalExecutor // Check if the temp system database has already been dropped. This can happen // if the restore job fails after the system database has cleaned up. checkIfDatabaseExists := "SELECT database_name FROM [SHOW DATABASES] WHERE database_name=$1" - if row, err := executor.QueryRow(ctx, "checking-for-temp-system-db" /* opName */, txn, checkIfDatabaseExists, restoreTempSystemDB); err != nil { + if row, err := executor.QueryRow(ctx, "checking-for-temp-system-db" /* opName */, nil /* txn */, checkIfDatabaseExists, restoreTempSystemDB); err != nil { return errors.Wrap(err, "checking for temporary system db") } else if row == nil { // Temporary system DB might already have been dropped by the restore job. @@ -2815,11 +2815,11 @@ func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context, txn *kv.Tx // After restoring the system tables, drop the temporary database holding the // system tables. gcTTLQuery := fmt.Sprintf("ALTER DATABASE %s CONFIGURE ZONE USING gc.ttlseconds=1", restoreTempSystemDB) - if _, err := executor.Exec(ctx, "altering-gc-ttl-temp-system" /* opName */, txn, gcTTLQuery); err != nil { + if _, err := executor.Exec(ctx, "altering-gc-ttl-temp-system" /* opName */, nil /* txn */, gcTTLQuery); err != nil { log.Errorf(ctx, "failed to update the GC TTL of %q: %+v", restoreTempSystemDB, err) } dropTableQuery := fmt.Sprintf("DROP DATABASE %s CASCADE", restoreTempSystemDB) - if _, err := executor.Exec(ctx, "drop-temp-system-db" /* opName */, txn, dropTableQuery); err != nil { + if _, err := executor.Exec(ctx, "drop-temp-system-db" /* opName */, nil /* txn */, dropTableQuery); err != nil { return errors.Wrap(err, "dropping temporary system db") } return nil From 128aadd1f5bdf76d504a8117d62468b528490f04 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Thu, 1 Sep 2022 20:57:20 +0000 Subject: [PATCH 7/9] sql: migrate `deleteTempTables` to use `planner.WithInternalExecutor()` This is another DDL statement executed via an internal executor mal-inited. Change it to use the right interface. Release justification: Release note: none. --- pkg/sql/discard.go | 28 +++++++++++++++------------- pkg/sql/temporary_schema.go | 2 -- pkg/sql/temporary_schema_test.go | 1 - 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/pkg/sql/discard.go b/pkg/sql/discard.go index 501642b62ed0..27085f728735 100644 --- a/pkg/sql/discard.go +++ b/pkg/sql/discard.go @@ -13,10 +13,12 @@ package sql import ( "context" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/errors" ) @@ -84,20 +86,20 @@ func (n *discardNode) startExec(params runParams) error { } func deleteTempTables(ctx context.Context, p *planner) error { - codec := p.execCfg.Codec - descCol := p.Descriptors() - allDbDescs, err := descCol.GetAllDatabaseDescriptors(ctx, p.Txn()) - if err != nil { - return err - } - ie := p.execCfg.InternalExecutor - - for _, dbDesc := range allDbDescs { - schemaName := p.TemporarySchemaName() - err = cleanupSchemaObjects(ctx, p.execCfg.Settings, p.Txn(), descCol, codec, ie, dbDesc, schemaName) + return p.WithInternalExecutor(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error { + codec := p.execCfg.Codec + descCol := p.Descriptors() + allDbDescs, err := descCol.GetAllDatabaseDescriptors(ctx, p.Txn()) if err != nil { return err } - } - return nil + for _, dbDesc := range allDbDescs { + schemaName := p.TemporarySchemaName() + err = cleanupSchemaObjects(ctx, p.Txn(), descCol, codec, ie, dbDesc, schemaName) + if err != nil { + return err + } + } + return nil + }) } diff --git a/pkg/sql/temporary_schema.go b/pkg/sql/temporary_schema.go index 29655d151b9d..0bf2615bf43e 100644 --- a/pkg/sql/temporary_schema.go +++ b/pkg/sql/temporary_schema.go @@ -188,7 +188,6 @@ func cleanupSessionTempObjects( for _, dbDesc := range allDbDescs { if err := cleanupSchemaObjects( ctx, - settings, txn, descsCol, codec, @@ -220,7 +219,6 @@ func cleanupSessionTempObjects( // API or avoid it entirely. func cleanupSchemaObjects( ctx context.Context, - settings *cluster.Settings, txn *kv.Txn, descsCol *descs.Collection, codec keys.SQLCodec, diff --git a/pkg/sql/temporary_schema_test.go b/pkg/sql/temporary_schema_test.go index 6dcc5ff9e1cd..fa45359fc465 100644 --- a/pkg/sql/temporary_schema_test.go +++ b/pkg/sql/temporary_schema_test.go @@ -111,7 +111,6 @@ INSERT INTO perm_table VALUES (DEFAULT, 1); } return cleanupSchemaObjects( ctx, - execCfg.Settings, txn, descsCol, execCfg.Codec, From a91c8d1f4b32d7e9668ad88af69e574e6b5c2780 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Thu, 1 Sep 2022 22:57:08 +0000 Subject: [PATCH 8/9] sql: require txn-related metadata if running DDLs with internal executor with txn When using internal executor to run DDL statements under a not-nil outer txn, we require txn-related metadata (such as descriptor collections) to be passed to the internal executor from the outer caller too. This commit is to add a gate for this use case. Release justification: bug fix Release note: none --- pkg/sql/internal.go | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 8ed85ce214b8..0c4e05ad3af6 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -616,7 +616,7 @@ func (ie *InternalExecutor) queryInternalBuffered( txn *kv.Txn, sessionDataOverride sessiondata.InternalExecutorOverride, stmt string, - // Non-zero limit specifies the limit on the number of rows returned. +// Non-zero limit specifies the limit on the number of rows returned. limit int, qargs ...interface{}, ) ([]tree.Datums, colinfo.ResultColumns, error) { @@ -831,6 +831,10 @@ func (ie *InternalExecutor) execInternal( stmt string, qargs ...interface{}, ) (r *rowsIterator, retErr error) { + if err := ie.checkIfTxnIsConsistent(txn); err != nil { + return nil, err + } + ctx = logtags.AddTag(ctx, "intExec", opName) var sd *sessiondata.SessionData @@ -896,6 +900,9 @@ func (ie *InternalExecutor) execInternal( timeReceived := timeutil.Now() parseStart := timeReceived parsed, err := parser.ParseOne(stmt) + if err := ie.checkIfStmtIsAllowed(parsed.AST, txn); err != nil { + return nil, err + } if err != nil { return nil, err } @@ -1075,6 +1082,29 @@ func (ie *InternalExecutor) commitTxn(ctx context.Context) error { return ex.commitSQLTransactionInternal(ctx) } +// checkIfStmtIsAllowed returns an error if the internal executor is not bound +// with the outer-txn-related info but is used to run DDL statements within an +// outer txn. +// TODO (janexing): this will be deprecate soon since it's not a good idea +// to have `extraTxnState` to store the info from a outer txn. +func (ie *InternalExecutor) checkIfStmtIsAllowed(stmt tree.Statement, txn *kv.Txn) error { + if tree.CanModifySchema(stmt) && txn != nil && ie.extraTxnState == nil { + return errors.New("DDL statement is disallowed if internal " + + "executor is not bound with txn metadata") + } + return nil +} + +// checkIfTxnIsConsistent returns true if the given txn is not nil and is not +// the same txn that is used to construct the internal executor. +func (ie *InternalExecutor) checkIfTxnIsConsistent(txn *kv.Txn) error { + if txn != nil && ie.extraTxnState != nil && ie.extraTxnState.txn != txn { + return errors.New("txn is inconsistent with the one when " + + "constructing the internal executor") + } + return nil +} + // internalClientComm is an implementation of ClientComm used by the // InternalExecutor. Result rows are buffered in memory. type internalClientComm struct { From fb3a0f919093536e5bf65d05baa872d5db5968c5 Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Mon, 19 Sep 2022 19:34:11 +0000 Subject: [PATCH 9/9] sql: add a space to a line to fix the lint It was causing the lint in CI to fail. Release note: None --- pkg/sql/internal.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 0c4e05ad3af6..7b076cd04ca0 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -616,7 +616,7 @@ func (ie *InternalExecutor) queryInternalBuffered( txn *kv.Txn, sessionDataOverride sessiondata.InternalExecutorOverride, stmt string, -// Non-zero limit specifies the limit on the number of rows returned. + // Non-zero limit specifies the limit on the number of rows returned. limit int, qargs ...interface{}, ) ([]tree.Datums, colinfo.ResultColumns, error) {