Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: *: add restriction to running DDL with internal executors #88287

Merged
merged 9 commits into from
Sep 20, 2022
7 changes: 6 additions & 1 deletion pkg/ccl/backupccl/backup_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -86,7 +87,11 @@ func checkMetadata(
tc.Servers[0].ClusterSettings(),
blobs.TestEmptyBlobClientFactory,
username.RootUserName(),
tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil)
tc.Servers[0].InternalExecutor().(*sql.InternalExecutor),
tc.Servers[0].CollectionFactory().(*descs.CollectionFactory),
tc.Servers[0].DB(),
nil, /* limiters */
)
if err != nil {
t.Fatal(err)
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,11 @@ func TestBackupRestoreAppend(t *testing.T) {
tc.Servers[0].ClusterSettings(),
blobs.TestEmptyBlobClientFactory,
username.RootUserName(),
tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil)
tc.Servers[0].InternalExecutor().(*sql.InternalExecutor),
tc.Servers[0].CollectionFactory().(*descs.CollectionFactory),
tc.Servers[0].DB(),
nil, /* limiters */
)
require.NoError(t, err)
defer store.Close()
var files []string
Expand Down Expand Up @@ -8021,7 +8025,12 @@ func TestReadBackupManifestMemoryMonitoring(t *testing.T) {
base.ExternalIODirConfig{},
st,
blobs.TestBlobServiceClient(dir),
username.RootUserName(), nil, nil, nil)
username.RootUserName(),
nil, /* ie */
nil, /* cf */
nil, /* kvDB */
nil, /* limiters */
)
require.NoError(t, err)

m := mon.NewMonitor("test-monitor", mon.MemoryResource, nil, nil, 0, 0, st)
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,12 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
DB: kvDB,
ExternalStorage: func(ctx context.Context, dest cloudpb.ExternalStorage, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) {
return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{},
s.ClusterSettings(), blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir), nil, nil, nil, opts...)
s.ClusterSettings(), blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir),
nil, /* ie */
nil, /* cf */
nil, /* kvDB */
nil, /* limiters */
opts...)
},
Settings: s.ClusterSettings(),
Codec: keys.SystemSQLCodec,
Expand Down
19 changes: 9 additions & 10 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1669,7 +1669,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
// Reload the details as we may have updated the job.
details = r.job.Details().(jobspb.RestoreDetails)

if err := r.cleanupTempSystemTables(ctx, nil /* txn */); err != nil {
if err := r.cleanupTempSystemTables(ctx); err != nil {
return err
}
} else if isSystemUserRestore(details) {
Expand All @@ -1678,7 +1678,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
}
details = r.job.Details().(jobspb.RestoreDetails)

if err := r.cleanupTempSystemTables(ctx, nil /* txn */); err != nil {
if err := r.cleanupTempSystemTables(ctx); err != nil {
return err
}
}
Expand Down Expand Up @@ -2214,8 +2214,8 @@ func (r *restoreResumer) OnFailOrCancel(
logJobCompletion(ctx, restoreJobEventType, r.job.ID(), false, jobErr)

execCfg := execCtx.(sql.JobExecContext).ExecCfg()
if err := sql.DescsTxn(ctx, execCfg, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
if err := execCfg.CollectionFactory.TxnWithExecutor(ctx, execCfg.DB, p.SessionData(), func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor,
) error {
for _, tenant := range details.Tenants {
tenant.State = descpb.TenantInfo_DROP
Expand All @@ -2233,7 +2233,6 @@ func (r *restoreResumer) OnFailOrCancel(
if details.DescriptorCoverage == tree.AllDescriptors {
// We've dropped defaultdb and postgres in the planning phase, we must
// recreate them now if the full cluster restore failed.
ie := p.ExecCfg().InternalExecutor
_, err := ie.Exec(ctx, "recreate-defaultdb", txn, "CREATE DATABASE IF NOT EXISTS defaultdb")
if err != nil {
return err
Expand All @@ -2252,7 +2251,7 @@ func (r *restoreResumer) OnFailOrCancel(
if details.DescriptorCoverage == tree.AllDescriptors {
// The temporary system table descriptors should already have been dropped
// in `dropDescriptors` but we still need to drop the temporary system db.
if err := execCfg.DB.Txn(ctx, r.cleanupTempSystemTables); err != nil {
if err := r.cleanupTempSystemTables(ctx); err != nil {
return err
}
}
Expand Down Expand Up @@ -2801,12 +2800,12 @@ func (r *restoreResumer) restoreSystemTables(
return nil
}

func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context, txn *kv.Txn) error {
func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context) error {
executor := r.execCfg.InternalExecutor
// Check if the temp system database has already been dropped. This can happen
// if the restore job fails after the system database has cleaned up.
checkIfDatabaseExists := "SELECT database_name FROM [SHOW DATABASES] WHERE database_name=$1"
if row, err := executor.QueryRow(ctx, "checking-for-temp-system-db" /* opName */, txn, checkIfDatabaseExists, restoreTempSystemDB); err != nil {
if row, err := executor.QueryRow(ctx, "checking-for-temp-system-db" /* opName */, nil /* txn */, checkIfDatabaseExists, restoreTempSystemDB); err != nil {
return errors.Wrap(err, "checking for temporary system db")
} else if row == nil {
// Temporary system DB might already have been dropped by the restore job.
Expand All @@ -2816,11 +2815,11 @@ func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context, txn *kv.Tx
// After restoring the system tables, drop the temporary database holding the
// system tables.
gcTTLQuery := fmt.Sprintf("ALTER DATABASE %s CONFIGURE ZONE USING gc.ttlseconds=1", restoreTempSystemDB)
if _, err := executor.Exec(ctx, "altering-gc-ttl-temp-system" /* opName */, txn, gcTTLQuery); err != nil {
if _, err := executor.Exec(ctx, "altering-gc-ttl-temp-system" /* opName */, nil /* txn */, gcTTLQuery); err != nil {
log.Errorf(ctx, "failed to update the GC TTL of %q: %+v", restoreTempSystemDB, err)
}
dropTableQuery := fmt.Sprintf("DROP DATABASE %s CASCADE", restoreTempSystemDB)
if _, err := executor.Exec(ctx, "drop-temp-system-db" /* opName */, txn, dropTableQuery); err != nil {
if _, err := executor.Exec(ctx, "drop-temp-system-db" /* opName */, nil /* txn */, dropTableQuery); err != nil {
return errors.Wrap(err, "dropping temporary system db")
}
return nil
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -705,8 +706,9 @@ func getDatabaseIDAndDesc(
// as regular databases, we drop them before restoring them again in the
// restore.
func dropDefaultUserDBs(ctx context.Context, execCfg *sql.ExecutorConfig) error {
return sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
ie := execCfg.InternalExecutor
return execCfg.CollectionFactory.TxnWithExecutor(ctx, execCfg.DB, nil /* session data */, func(
ctx context.Context, txn *kv.Txn, _ *descs.Collection, ie sqlutil.InternalExecutor,
) error {
_, err := ie.Exec(ctx, "drop-defaultdb", txn, "DROP DATABASE IF EXISTS defaultdb")
if err != nil {
return err
Expand Down
43 changes: 43 additions & 0 deletions pkg/ccl/backupccl/testdata/backup-restore/file_table_read_write
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
subtest backup_file_table

new-server name=s1
----

exec-sql
CREATE DATABASE to_backup;
----

exec-sql
CREATE DATABASE backups;
----

exec-sql
BACKUP DATABASE to_backup INTO 'userfile://backups.public.userfiles_$user/data';
----

query-sql
SELECT * FROM backups.crdb_internal.invalid_objects;
----

exec-sql
USE backups;
----

query-sql
SELECT * FROM pg_catalog.pg_tables where schemaname='public';
----
public userfiles_$user_upload_files root <nil> true false false false
public userfiles_$user_upload_payload root <nil> true false false false

query-sql
SELECT conname FROM pg_catalog.pg_constraint con
INNER JOIN pg_catalog.pg_class rel ON rel.oid = con.conrelid
INNER JOIN pg_catalog.pg_namespace nsp
ON nsp.oid = connamespace
WHERE rel.relname='userfiles_$user_upload_payload'
ORDER BY conname;
----
file_id_fk
userfiles_$user_upload_payload_pkey

subtest end
8 changes: 7 additions & 1 deletion pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,13 @@ func TestCloudStorageSink(t *testing.T) {
externalStorageFromURI := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage,
error) {
return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, settings,
clientFactory, user, nil, nil, nil, opts...)
clientFactory,
user,
nil, /* ie */
nil, /* cf */
nil, /* kvDB */
nil, /* limiters */
opts...)
}

user := username.RootUserName()
Expand Down
15 changes: 13 additions & 2 deletions pkg/ccl/cliccl/debug_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,12 @@ func externalStorageFromURIFactory(
defaultSettings := &cluster.Settings{}
defaultSettings.SV.Init(ctx, nil /* opaque */)
return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{},
defaultSettings, newBlobFactory, user, nil /*Internal Executor*/, nil /*kvDB*/, nil, opts...)
defaultSettings, newBlobFactory, user,
nil, /* ie */
nil, /* cf */
nil, /* kvDB */
nil, /* limiters */
opts...)
}

func getManifestFromURI(ctx context.Context, path string) (backuppb.BackupManifest, error) {
Expand Down Expand Up @@ -588,7 +593,13 @@ func makeIters(
var err error
clusterSettings := cluster.MakeClusterSettings()
dirStorage[i], err = cloud.MakeExternalStorage(ctx, file.Dir, base.ExternalIODirConfig{},
clusterSettings, newBlobFactory, nil /*internal executor*/, nil /*kvDB*/, nil)
clusterSettings,
newBlobFactory,
nil, /* ie */
nil, /* cf */
nil, /* kvDB */
nil, /* limiters */
)
if err != nil {
return nil, nil, errors.Wrapf(err, "making external storage")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/storageccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_test(
"//pkg/security/username",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog/descs",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/storageccl/external_sst_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
Expand Down Expand Up @@ -122,7 +123,11 @@ func TestNewExternalSSTReader(t *testing.T) {
clusterSettings,
blobs.TestBlobServiceClient(tempDir),
username.RootUserName(),
tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), tc.Servers[0].DB(), nil)
tc.Servers[0].InternalExecutor().(*sql.InternalExecutor),
tc.Servers[0].CollectionFactory().(*descs.CollectionFactory),
tc.Servers[0].DB(),
nil, /* limiters */
)
require.NoError(t, err)
fileStores[i].Store = store

Expand Down
15 changes: 12 additions & 3 deletions pkg/ccl/workloadccl/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,18 @@ func GetStorage(ctx context.Context, cfg FixtureConfig) (cloud.ExternalStorage,
return nil, errors.AssertionFailedf("unsupported external storage provider; valid providers are gs, s3, and azure")
}

s, err := cloud.ExternalStorageFromURI(ctx, cfg.ObjectPathToURI(),
base.ExternalIODirConfig{}, clustersettings.MakeClusterSettings(),
nil, username.SQLUsername{}, nil, nil, nil)
s, err := cloud.ExternalStorageFromURI(
ctx,
cfg.ObjectPathToURI(),
base.ExternalIODirConfig{},
clustersettings.MakeClusterSettings(),
nil, /* blobClientFactory */
username.SQLUsername{},
nil, /* ie */
nil, /* cf */
nil, /* kvDB */
nil, /* limiters */
)
if err != nil {
return nil, errors.Wrap(err, storageError)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//pkg/security/username",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog/descs",
"//pkg/sql/sqlutil",
"//pkg/util/ctxgroup",
"//pkg/util/ioctx",
Expand Down
Loading