From 64a5ae91eeaa3e47f73df2672d8826c2bd4da91b Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 24 Jan 2023 20:09:52 +0000 Subject: [PATCH 1/4] roachpb: add Locality matching helper Release note: none. Epic: none. --- pkg/roachpb/metadata.go | 17 +++++++++++++++++ pkg/roachpb/metadata_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 60514987fe21..79574b5cf9af 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -597,6 +597,23 @@ func (l Locality) String() string { return strings.Join(tiers, ",") } +// NonEmpty returns true if the tiers are non-empty. +func (l Locality) NonEmpty() bool { + return len(l.Tiers) > 0 +} + +// Matches checks if this locality has a tier with a matching value for each +// tier of the passed filter, returning true if so or false if not along with +// the first tier of the filters that did not matched. +func (l Locality) Matches(filter Locality) (bool, Tier) { + for _, t := range filter.Tiers { + if v, ok := l.Find(t.Key); !ok || v != t.Value { + return false, t + } + } + return true, Tier{} +} + // Type returns the underlying type in string form. This is part of pflag's // value interface. func (Locality) Type() string { diff --git a/pkg/roachpb/metadata_test.go b/pkg/roachpb/metadata_test.go index abf764343183..67f691e37511 100644 --- a/pkg/roachpb/metadata_test.go +++ b/pkg/roachpb/metadata_test.go @@ -186,6 +186,41 @@ func TestLocalityConversions(t *testing.T) { } } +func TestLocalityMatches(t *testing.T) { + var empty Locality + var l Locality + require.NoError(t, l.Set("a=b,c=d,e=f")) + for _, tc := range []struct { + filter string + miss string + }{ + {filter: "", miss: ""}, + {filter: "a=b", miss: ""}, + {filter: "a=b,c=d,e=f", miss: ""}, + {filter: "c=d,e=f,a=b", miss: ""}, + {filter: "a=z", miss: "a=z"}, + {filter: "a=b,c=x,e=f", miss: "c=x"}, + {filter: "a=b,x=y", miss: "x=y"}, + } { + t.Run(fmt.Sprintf("%s-miss-%s", tc.filter, tc.miss), func(t *testing.T) { + var filter Locality + if tc.filter != "" { + require.NoError(t, filter.Set(tc.filter)) + } + matches, miss := l.Matches(filter) + if tc.miss == "" { + require.True(t, matches) + } else { + require.False(t, matches) + require.Equal(t, tc.miss, miss.String()) + } + + emptyMatches, _ := empty.Matches(filter) + require.Equal(t, tc.filter == "", emptyMatches) + }) + } +} + func TestDiversityScore(t *testing.T) { // Keys are not considered for score, just the order, so we don't need to // specify them. From a8db047fe21de32a9325795c5f0e39657cc51aca Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 24 Jan 2023 20:13:33 +0000 Subject: [PATCH 2/4] jobs: add API to move execution lease Release note: none. Epic: none. --- pkg/jobs/errors.go | 16 ++++++++++------ pkg/jobs/registry.go | 20 ++++++++++++++++++++ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/pkg/jobs/errors.go b/pkg/jobs/errors.go index 6cecafdbce11..bd1a8c33de9c 100644 --- a/pkg/jobs/errors.go +++ b/pkg/jobs/errors.go @@ -48,12 +48,6 @@ func IsPermanentJobError(err error) bool { return errors.Is(err, errJobPermanentSentinel) } -// IsPauseSelfError checks whether the given error is a -// PauseRequestError. -func IsPauseSelfError(err error) bool { - return errors.Is(err, errPauseSelfSentinel) -} - // errPauseSelfSentinel exists so the errors returned from PauseRequestErr can // be marked with it. var errPauseSelfSentinel = errors.New("job requested it be paused") @@ -65,9 +59,19 @@ func MarkPauseRequestError(reason error) error { return errors.Mark(reason, errPauseSelfSentinel) } +// IsPauseSelfError checks whether the given error is a +// PauseRequestError. +func IsPauseSelfError(err error) bool { + return errors.Is(err, errPauseSelfSentinel) +} + // PauseRequestExplained is a prose used to wrap and explain a pause-request error. const PauseRequestExplained = "pausing due to error; use RESUME JOB to try to proceed once the issue is resolved, or CANCEL JOB to rollback" +// errJobLeaseNotHeld is a marker error for returning from a job execution if it +// knows or finds out it no longer has a job lease. +var errJobLeaseNotHeld = errors.New("job lease not held") + // InvalidStatusError is the error returned when the desired operation is // invalid given the job's current status. type InvalidStatusError struct { diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 2f54afd45587..5934f7667520 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -1523,6 +1523,9 @@ func (r *Registry) stepThroughStateMachine( jm.ResumeRetryError.Inc(1) return errors.Errorf("job %d: node liveness error: restarting in background", job.ID()) } + if errors.Is(err, errJobLeaseNotHeld) { + return err + } if errors.Is(err, errPauseSelfSentinel) { if err := r.PauseRequested(ctx, nil, job.ID(), err.Error()); err != nil { @@ -1820,6 +1823,23 @@ func (r *Registry) CheckPausepoint(name string) error { return nil } +func (r *Registry) RelocateLease( + ctx context.Context, + txn isql.Txn, + id jobspb.JobID, + destID base.SQLInstanceID, + destSession sqlliveness.SessionID, +) (sentinel error, failure error) { + if _, err := r.db.Executor().Exec(ctx, "job-relocate-coordinator", txn.KV(), + "UPDATE system.jobs SET claim_instance_id = $2, claim_session_id = $3 WHERE id = $1", + id, destID, destSession.UnsafeBytes(), + ); err != nil { + return nil, errors.Wrapf(err, "failed to relocate job coordinator to %d", destID) + } + + return errors.Mark(errors.Newf("execution of job %d relocated to %d", id, destID), errJobLeaseNotHeld), nil +} + // TestingIsJobIdle returns true if the job is adopted and currently idle. func (r *Registry) TestingIsJobIdle(jobID jobspb.JobID) bool { r.mu.Lock() From 91d2a76561d15bfc9b65d4743f4085fb6c937a8c Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 24 Jan 2023 21:23:40 +0000 Subject: [PATCH 3/4] backup: allow restricting backup coordination by region The coordinator of a backup job needs to access the 'default' locality to read and write metadata including the backup checkpoint files and manifest. Prior to this change, this meant that every node in the cluster needed to be able to access this default locality storage location, since any node could become the coordinator. This change introduces a new locality filter option that can be specified when the backup job is created. If set, this new option will cause any node which attempts to execute the backup job to check if it meets the locality requirements and, if not, move the execution of the job to a node which does meet them, or fail if not such node can be found. A locality requirement is specified as any number of key=value pairs, each of which a node must match to be eligible to execute the job, for example, a job run with BACKUP ... WITH coordinator_locality = 'region=east,cloud=azure' would require a node have both 'region=east' and 'cloud=azure', however the order is not significant, only that each specified filter is met. Jobs typically begin executing directly on the node on which they are created so if that node has matching localities, it will execute the job, or relocate it if it does not. Relocated executions may take some amount of time to resume on the new node to which they were relocated, similar to the delay seen when a paused job is resumed, typically between a few seconds to a minute. Note that this only restricts the *coordination* of the backup job -- reading the row data from individual ranges and exporting that data to the destination storage location or locations is still performed by many nodes, typically the leaseholders for each range. Release note (enterprise change): coordination of BACKUP jobs and thus writing of BACKUP metadata can be restricted to nodes within designated localities using the new 'coordinator_locality' option. Epic: CRDB-9547. --- docs/generated/sql/bnf/stmt_block.bnf | 3 ++ pkg/ccl/backupccl/BUILD.bazel | 1 + pkg/ccl/backupccl/backup_job.go | 41 +++++++++++++++++++++++ pkg/ccl/backupccl/backup_planning.go | 22 ++++++++++++ pkg/ccl/backupccl/backup_test.go | 46 ++++++++++++++++++++++++++ pkg/jobs/jobspb/jobs.proto | 4 +++ pkg/sql/distsql_physical_planner.go | 22 ++++++++++++ pkg/sql/parser/sql.y | 11 ++++-- pkg/sql/parser/testdata/backup_restore | 22 ++++++------ pkg/sql/sem/tree/backup.go | 17 +++++++++- 10 files changed, 174 insertions(+), 15 deletions(-) diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 55e94ecb0024..3ef12e919eb6 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -1055,6 +1055,7 @@ unreserved_keyword ::= | 'CONTROLJOB' | 'CONVERSION' | 'CONVERT' + | 'COORDINATOR_LOCALITY' | 'COPY' | 'COST' | 'COVERING' @@ -2256,6 +2257,7 @@ backup_options ::= | 'DETACHED' '=' 'FALSE' | 'KMS' '=' string_or_placeholder_opt_list | 'INCREMENTAL_LOCATION' '=' string_or_placeholder_opt_list + | 'COORDINATOR_LOCALITY' '=' string_or_placeholder c_expr ::= d_expr @@ -3409,6 +3411,7 @@ bare_label_keywords ::= 'AS_JSON' | 'ATOMIC' | 'CALLED' + | 'COORDINATOR_LOCALITY' | 'COST' | 'CHECK_FILES' | 'DEBUG_IDS' diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index db1839ece26d..6bfc0a2ac334 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -136,6 +136,7 @@ go_library( "//pkg/util/metric", "//pkg/util/mon", "//pkg/util/protoutil", + "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/span", "//pkg/util/stop", diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 5acebaaeb35a..0ef57c475139 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -52,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -450,6 +451,11 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { // The span is finished by the registry executing the job. details := b.job.Details().(jobspb.BackupDetails) p := execCtx.(sql.JobExecContext) + + if err := b.maybeRelocateJobExecution(ctx, p, details.CoordinatorLocation); err != nil { + return err + } + kmsEnv := backupencryption.MakeBackupKMSEnv( p.ExecCfg().Settings, &p.ExecCfg().ExternalIODirConfig, @@ -842,6 +848,41 @@ func (b *backupResumer) ReportResults(ctx context.Context, resultsCh chan<- tree } } +func (b *backupResumer) maybeRelocateJobExecution( + ctx context.Context, p sql.JobExecContext, locality roachpb.Locality, +) error { + if locality.NonEmpty() { + current, err := p.DistSQLPlanner().GetSQLInstanceInfo(p.ExecCfg().JobRegistry.ID()) + if err != nil { + return err + } + if ok, missedTier := current.Locality.Matches(locality); !ok { + log.Infof(ctx, + "BACKUP job %d initially adopted on instance %d but it does not match locality filter %s, finding a new coordinator", + b.job.ID(), current.NodeID, missedTier.String(), + ) + + instancesInRegion, err := p.DistSQLPlanner().GetAllInstancesByLocality(ctx, locality) + if err != nil { + return err + } + rng, _ := randutil.NewPseudoRand() + dest := instancesInRegion[rng.Intn(len(instancesInRegion))] + + var res error + if err := p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + var err error + res, err = p.ExecCfg().JobRegistry.RelocateLease(ctx, txn, b.job.ID(), dest.InstanceID, dest.SessionID) + return err + }); err != nil { + return errors.Wrapf(err, "failed to relocate job coordinator to %d", dest.InstanceID) + } + return res + } + } + return nil +} + func getBackupDetailAndManifest( ctx context.Context, execCfg *sql.ExecutorConfig, diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 268ddd8c9a5f..554f1afbc757 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -519,6 +519,7 @@ func backupTypeCheck( exprutil.Strings{ backupStmt.Subdir, backupStmt.Options.EncryptionPassphrase, + backupStmt.Options.CoordinatorLocality, }, exprutil.StringArrays{ tree.Exprs(backupStmt.To), @@ -600,6 +601,19 @@ func backupPlanHook( } } + var coordinatorLocality roachpb.Locality + if backupStmt.Options.CoordinatorLocality != nil { + s, err := exprEval.String(ctx, backupStmt.Options.CoordinatorLocality) + if err != nil { + return nil, nil, nil, false, err + } + if s != "" { + if err := coordinatorLocality.Set(s); err != nil { + return nil, nil, nil, false, err + } + } + } + encryptionParams := jobspb.BackupEncryptionOptions{ Mode: jobspb.EncryptionMode_None, } @@ -712,6 +726,13 @@ func backupPlanHook( return err } + // Check that a node will currently be able to run this before we create it. + if coordinatorLocality.NonEmpty() { + if _, err := p.DistSQLPlanner().GetAllInstancesByLocality(ctx, coordinatorLocality); err != nil { + return err + } + } + initialDetails := jobspb.BackupDetails{ Destination: jobspb.BackupDetails_Destination{To: to, IncrementalStorage: incrementalStorage}, EndTime: endTime, @@ -723,6 +744,7 @@ func backupPlanHook( AsOfInterval: asOfInterval, Detached: detached, ApplicationName: p.SessionData().ApplicationName, + CoordinatorLocation: coordinatorLocality, } if backupStmt.CreatedByInfo != nil && backupStmt.CreatedByInfo.Name == jobs.CreatedByScheduledJobs { initialDetails.ScheduleID = backupStmt.CreatedByInfo.ID diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index b4386a05d0bb..585e6a90f15f 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -10732,3 +10732,49 @@ $$; require.NoError(t, err) } + +func localityFromStr(t *testing.T, s string) roachpb.Locality { + var l roachpb.Locality + require.NoError(t, l.Set(s)) + return l +} + +func TestBackupInLocality(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const numAccounts = 1000 + + // Disabled to run within tenant as certain MR features are not available to tenants. + args := base.TestClusterArgs{ServerArgsPerNode: map[int]base.TestServerArgs{ + 0: {Locality: localityFromStr(t, "region=east,dc=1,az=1")}, + 1: {Locality: localityFromStr(t, "region=east,dc=2,az=2")}, + 2: {Locality: localityFromStr(t, "region=west,dc=1,az=1")}, + }} + + cluster, _, _, cleanupFn := backupRestoreTestSetupWithParams(t, 3 /* nodes */, numAccounts, InitManualReplication, args) + defer cleanupFn() + + for i, tc := range []struct { + node int + filter, err string + }{ + {node: 1, filter: "region=east", err: ""}, + {node: 1, filter: "region=east,dc=1", err: ""}, + {node: 1, filter: "region=east,dc=6", err: "no instances found"}, + {node: 1, filter: "region=central", err: "no instances found"}, + {node: 1, filter: "region=east,dc=2", err: "relocated"}, + {node: 1, filter: "region=west,dc=1", err: "relocated"}, + + {node: 2, filter: "region=east", err: ""}, + {node: 2, filter: "region=east,az=2", err: ""}, + {node: 2, filter: "region=east,dc=1", err: "relocated"}, + {node: 2, filter: "region=east,az=1", err: "relocated"}, + + {node: 3, filter: "region=east", err: "relocated"}, + {node: 3, filter: "region=central,dc=1", err: "no instances found"}, + } { + db := sqlutils.MakeSQLRunner(cluster.ServerConn(tc.node - 1)) + db.ExpectErr(t, tc.err, "BACKUP system.users INTO $1 WITH coordinator_locality = $2", fmt.Sprintf("userfile:///tc%d", i), tc.filter) + } +} diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 2b2503812488..c5ebc03f17bb 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -305,6 +305,10 @@ message BackupDetails { // ApplicationName is the application name in the session where the backup was // invoked. string application_name = 23; + + roachpb.Locality coordinator_location = 24 [(gogoproto.nullable) = false]; + + // NEXT ID: 25; } message BackupProgress { diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index fd6e754449d1..e724f016c4c9 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -216,6 +216,28 @@ func NewDistSQLPlanner( return dsp } +// GetAllInstancesByLocality lists all instances that match the passed locality +// filters. +func (dsp *DistSQLPlanner) GetAllInstancesByLocality( + ctx context.Context, filter roachpb.Locality, +) ([]sqlinstance.InstanceInfo, error) { + all, err := dsp.sqlAddressResolver.GetAllInstances(ctx) + if err != nil { + return nil, err + } + var pos int + for _, n := range all { + if ok, _ := n.Locality.Matches(filter); ok { + all[pos] = n + pos++ + } + } + if pos == 0 { + return nil, errors.Newf("no instances found matching locality filter %s", filter.String()) + } + return all[:pos], nil +} + // GetSQLInstanceInfo gets a node descriptor by node ID. func (dsp *DistSQLPlanner) GetSQLInstanceInfo( sqlInstanceID base.SQLInstanceID, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index ca9b733e201d..098a239a6f66 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -890,7 +890,7 @@ func (u *sqlSymUnion) showTenantOpts() tree.ShowTenantOptions { %token CLUSTER COALESCE COLLATE COLLATION COLUMN COLUMNS COMMENT COMMENTS COMMIT %token COMMITTED COMPACT COMPLETE COMPLETIONS CONCAT CONCURRENTLY CONFIGURATION CONFIGURATIONS CONFIGURE %token CONFLICT CONNECTION CONNECTIONS CONSTRAINT CONSTRAINTS CONTAINS CONTROLCHANGEFEED CONTROLJOB -%token CONVERSION CONVERT COPY COST COVERING CREATE CREATEDB CREATELOGIN CREATEROLE +%token CONVERSION CONVERT COORDINATOR_LOCALITY COPY COST COVERING CREATE CREATEDB CREATELOGIN CREATEROLE %token CROSS CSV CUBE CURRENT CURRENT_CATALOG CURRENT_DATE CURRENT_SCHEMA %token CURRENT_ROLE CURRENT_TIME CURRENT_TIMESTAMP %token CURRENT_USER CURSOR CYCLE @@ -3241,9 +3241,12 @@ backup_options: } | INCREMENTAL_LOCATION '=' string_or_placeholder_opt_list { - $$.val = &tree.BackupOptions{IncrementalStorage: $3.stringOrPlaceholderOptList()} + $$.val = &tree.BackupOptions{IncrementalStorage: $3.stringOrPlaceholderOptList()} + } +| COORDINATOR_LOCALITY '=' string_or_placeholder + { + $$.val = &tree.BackupOptions{CoordinatorLocality: $3.expr()} } - // %Help: CREATE SCHEDULE FOR BACKUP - backup data periodically // %Category: CCL @@ -15967,6 +15970,7 @@ unreserved_keyword: | CONTROLJOB | CONVERSION | CONVERT +| COORDINATOR_LOCALITY | COPY | COST | COVERING @@ -16356,6 +16360,7 @@ bare_label_keywords: AS_JSON | ATOMIC | CALLED +| COORDINATOR_LOCALITY | COST | CHECK_FILES | DEBUG_IDS diff --git a/pkg/sql/parser/testdata/backup_restore b/pkg/sql/parser/testdata/backup_restore index 01fea821f897..49addbbe1094 100644 --- a/pkg/sql/parser/testdata/backup_restore +++ b/pkg/sql/parser/testdata/backup_restore @@ -324,13 +324,13 @@ BACKUP DATABASE foo TO ($1, $1) INCREMENTAL FROM '_' -- literals removed BACKUP DATABASE _ TO ($1, $2) INCREMENTAL FROM 'baz' -- identifiers removed parse -BACKUP foo TO 'bar' WITH ENCRYPTION_PASSPHRASE = 'secret', revision_history +BACKUP foo TO 'bar' WITH ENCRYPTION_PASSPHRASE = 'secret', revision_history, coordinator_locality = 'a=b' ---- -BACKUP TABLE foo TO 'bar' WITH revision_history = true, encryption_passphrase = '*****' -- normalized! -BACKUP TABLE (foo) TO ('bar') WITH revision_history = (true), encryption_passphrase = '*****' -- fully parenthesized -BACKUP TABLE foo TO '_' WITH revision_history = _, encryption_passphrase = '*****' -- literals removed -BACKUP TABLE _ TO 'bar' WITH revision_history = true, encryption_passphrase = '*****' -- identifiers removed -BACKUP TABLE foo TO 'bar' WITH revision_history = true, encryption_passphrase = 'secret' -- passwords exposed +BACKUP TABLE foo TO 'bar' WITH revision_history = true, encryption_passphrase = '*****', coordinator_locality = 'a=b' -- normalized! +BACKUP TABLE (foo) TO ('bar') WITH revision_history = (true), encryption_passphrase = '*****', coordinator_locality = ('a=b') -- fully parenthesized +BACKUP TABLE foo TO '_' WITH revision_history = _, encryption_passphrase = '*****', coordinator_locality = '_' -- literals removed +BACKUP TABLE _ TO 'bar' WITH revision_history = true, encryption_passphrase = '*****', coordinator_locality = 'a=b' -- identifiers removed +BACKUP TABLE foo TO 'bar' WITH revision_history = true, encryption_passphrase = 'secret', coordinator_locality = 'a=b' -- passwords exposed parse BACKUP foo TO 'bar' WITH KMS = ('foo', 'bar'), revision_history @@ -752,12 +752,12 @@ BACKUP TABLE foo TO '_' WITH revision_history = _, detached -- literals removed BACKUP TABLE _ TO 'bar' WITH revision_history = true, detached -- identifiers removed parse -BACKUP TABLE foo TO 'bar' WITH revision_history = $1, detached +BACKUP TABLE foo TO 'bar' WITH revision_history = $1, detached, coordinator_locality = $2 ---- -BACKUP TABLE foo TO 'bar' WITH revision_history = $1, detached -BACKUP TABLE (foo) TO ('bar') WITH revision_history = ($1), detached -- fully parenthesized -BACKUP TABLE foo TO '_' WITH revision_history = $1, detached -- literals removed -BACKUP TABLE _ TO 'bar' WITH revision_history = $1, detached -- identifiers removed +BACKUP TABLE foo TO 'bar' WITH revision_history = $1, detached, coordinator_locality = $2 +BACKUP TABLE (foo) TO ('bar') WITH revision_history = ($1), detached, coordinator_locality = ($2) -- fully parenthesized +BACKUP TABLE foo TO '_' WITH revision_history = $1, detached, coordinator_locality = $1 -- literals removed +BACKUP TABLE _ TO 'bar' WITH revision_history = $1, detached, coordinator_locality = $2 -- identifiers removed parse RESTORE TABLE foo FROM 'bar' WITH skip_missing_foreign_keys, skip_missing_sequences, detached diff --git a/pkg/sql/sem/tree/backup.go b/pkg/sql/sem/tree/backup.go index e4e17367dc86..094456fe268a 100644 --- a/pkg/sql/sem/tree/backup.go +++ b/pkg/sql/sem/tree/backup.go @@ -45,6 +45,7 @@ type BackupOptions struct { Detached *DBool EncryptionKMSURI StringOrPlaceholderOptList IncrementalStorage StringOrPlaceholderOptList + CoordinatorLocality Expr } var _ NodeFormatter = &BackupOptions{} @@ -292,6 +293,13 @@ func (o *BackupOptions) Format(ctx *FmtCtx) { ctx.WriteString("incremental_location = ") ctx.FormatNode(&o.IncrementalStorage) } + + if o.CoordinatorLocality != nil { + maybeAddSep() + ctx.WriteString("coordinator_locality = ") + ctx.FormatNode(o.CoordinatorLocality) + } + } // CombineWith merges other backup options into this backup options struct. @@ -331,6 +339,12 @@ func (o *BackupOptions) CombineWith(other *BackupOptions) error { return errors.New("incremental_location option specified multiple times") } + if o.CoordinatorLocality == nil { + o.CoordinatorLocality = other.CoordinatorLocality + } else if other.CoordinatorLocality != nil { + return errors.New("coordinator_locality option specified multiple times") + } + return nil } @@ -341,7 +355,8 @@ func (o BackupOptions) IsDefault() bool { o.Detached == options.Detached && cmp.Equal(o.EncryptionKMSURI, options.EncryptionKMSURI) && o.EncryptionPassphrase == options.EncryptionPassphrase && - cmp.Equal(o.IncrementalStorage, options.IncrementalStorage) + cmp.Equal(o.IncrementalStorage, options.IncrementalStorage) && + o.CoordinatorLocality == options.CoordinatorLocality } // Format implements the NodeFormatter interface. From 9f319ddb59560905206ca861d05b786fbf6fa437 Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Wed, 8 Feb 2023 09:05:42 -0800 Subject: [PATCH 4/4] opt: mark ConvertUncorrelatedExistsToCoalesceSubquery as essential This rule converts uncorrelated `EXISTS` subqueries into `COALESCE`+subquery expressions. It is essential because execbuilder cannot plan lazily-executed, uncorrelated `EXISTS` subqueries. Fixes #95546 Release note: None --- pkg/sql/opt/exec/execbuilder/scalar.go | 5 ++--- pkg/sql/opt/xform/optimizer.go | 3 +++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/sql/opt/exec/execbuilder/scalar.go b/pkg/sql/opt/exec/execbuilder/scalar.go index 3ebbd2df0b6e..1cdbff9092c0 100644 --- a/pkg/sql/opt/exec/execbuilder/scalar.go +++ b/pkg/sql/opt/exec/execbuilder/scalar.go @@ -679,8 +679,7 @@ func (b *Builder) buildExistsSubquery( // TODO(mgartner): This path should never be executed because the // ConvertUncorrelatedExistsToCoalesceSubquery converts all uncorrelated // Exists with Coalesce+Subquery expressions. Remove this and the execution - // support for the Exists mode. Remember to mark - // ConvertUncorrelatedExistsToCoalesceSubquery as an essential rule. + // support for the Exists mode. plan, err := b.buildRelational(exists.Input) if err != nil { return nil, err @@ -1032,5 +1031,5 @@ func (b *Builder) buildRoutinePlanGenerator( } func expectedLazyRoutineError(typ string) error { - return errors.AssertionFailedf("expected %s to be lazily planned as routines", typ) + return errors.AssertionFailedf("expected %s to be lazily planned as a routine", typ) } diff --git a/pkg/sql/opt/xform/optimizer.go b/pkg/sql/opt/xform/optimizer.go index 3ec32f09496d..7774cdc168e0 100644 --- a/pkg/sql/opt/xform/optimizer.go +++ b/pkg/sql/opt/xform/optimizer.go @@ -1078,6 +1078,9 @@ func (o *Optimizer) disableRulesRandom(probability float64) { int(opt.PruneScanCols), // Needed to ensure that the input of a RangeExpr is always an AndExpr. int(opt.SimplifyRange), + // Needed to ensure that all uncorrelated EXISTS subqueries are + // converted to COALESCE+subquery expressions. + int(opt.ConvertUncorrelatedExistsToCoalesceSubquery), ) var disabledRules RuleSet