diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index ab8e06ab3a46..a091739d23f0 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -1055,6 +1055,7 @@ unreserved_keyword ::= | 'CONTROLJOB' | 'CONVERSION' | 'CONVERT' + | 'COORDINATOR_LOCALITY' | 'COPY' | 'COST' | 'COVERING' @@ -2257,6 +2258,7 @@ backup_options ::= | 'DETACHED' '=' 'FALSE' | 'KMS' '=' string_or_placeholder_opt_list | 'INCREMENTAL_LOCATION' '=' string_or_placeholder_opt_list + | 'COORDINATOR_LOCALITY' '=' string_or_placeholder c_expr ::= d_expr @@ -3410,6 +3412,7 @@ bare_label_keywords ::= 'AS_JSON' | 'ATOMIC' | 'CALLED' + | 'COORDINATOR_LOCALITY' | 'COST' | 'CHECK_FILES' | 'DEBUG_IDS' diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 29ce7fa51929..19ef4484d626 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -136,6 +136,7 @@ go_library( "//pkg/util/metric", "//pkg/util/mon", "//pkg/util/protoutil", + "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/span", "//pkg/util/stop", diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 0a78f6856738..42c8379d3b2f 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -52,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -450,6 +451,11 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { // The span is finished by the registry executing the job. details := b.job.Details().(jobspb.BackupDetails) p := execCtx.(sql.JobExecContext) + + if err := b.maybeRelocateJobExecution(ctx, p, details.CoordinatorLocation); err != nil { + return err + } + kmsEnv := backupencryption.MakeBackupKMSEnv( p.ExecCfg().Settings, &p.ExecCfg().ExternalIODirConfig, @@ -840,6 +846,41 @@ func (b *backupResumer) ReportResults(ctx context.Context, resultsCh chan<- tree } } +func (b *backupResumer) maybeRelocateJobExecution( + ctx context.Context, p sql.JobExecContext, locality roachpb.Locality, +) error { + if locality.NonEmpty() { + current, err := p.DistSQLPlanner().GetSQLInstanceInfo(p.ExecCfg().JobRegistry.ID()) + if err != nil { + return err + } + if ok, missedTier := current.Locality.Matches(locality); !ok { + log.Infof(ctx, + "BACKUP job %d initially adopted on instance %d but it does not match locality filter %s, finding a new coordinator", + b.job.ID(), current.NodeID, missedTier.String(), + ) + + instancesInRegion, err := p.DistSQLPlanner().GetAllInstancesByLocality(ctx, locality) + if err != nil { + return err + } + rng, _ := randutil.NewPseudoRand() + dest := instancesInRegion[rng.Intn(len(instancesInRegion))] + + var res error + if err := p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + var err error + res, err = p.ExecCfg().JobRegistry.RelocateLease(ctx, txn, b.job.ID(), dest.InstanceID, dest.SessionID) + return err + }); err != nil { + return errors.Wrapf(err, "failed to relocate job coordinator to %d", dest.InstanceID) + } + return res + } + } + return nil +} + func getBackupDetailAndManifest( ctx context.Context, execCfg *sql.ExecutorConfig, diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 268ddd8c9a5f..554f1afbc757 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -519,6 +519,7 @@ func backupTypeCheck( exprutil.Strings{ backupStmt.Subdir, backupStmt.Options.EncryptionPassphrase, + backupStmt.Options.CoordinatorLocality, }, exprutil.StringArrays{ tree.Exprs(backupStmt.To), @@ -600,6 +601,19 @@ func backupPlanHook( } } + var coordinatorLocality roachpb.Locality + if backupStmt.Options.CoordinatorLocality != nil { + s, err := exprEval.String(ctx, backupStmt.Options.CoordinatorLocality) + if err != nil { + return nil, nil, nil, false, err + } + if s != "" { + if err := coordinatorLocality.Set(s); err != nil { + return nil, nil, nil, false, err + } + } + } + encryptionParams := jobspb.BackupEncryptionOptions{ Mode: jobspb.EncryptionMode_None, } @@ -712,6 +726,13 @@ func backupPlanHook( return err } + // Check that a node will currently be able to run this before we create it. + if coordinatorLocality.NonEmpty() { + if _, err := p.DistSQLPlanner().GetAllInstancesByLocality(ctx, coordinatorLocality); err != nil { + return err + } + } + initialDetails := jobspb.BackupDetails{ Destination: jobspb.BackupDetails_Destination{To: to, IncrementalStorage: incrementalStorage}, EndTime: endTime, @@ -723,6 +744,7 @@ func backupPlanHook( AsOfInterval: asOfInterval, Detached: detached, ApplicationName: p.SessionData().ApplicationName, + CoordinatorLocation: coordinatorLocality, } if backupStmt.CreatedByInfo != nil && backupStmt.CreatedByInfo.Name == jobs.CreatedByScheduledJobs { initialDetails.ScheduleID = backupStmt.CreatedByInfo.ID diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 06d4b1bbfa48..e2c648db2992 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -10754,3 +10754,49 @@ $$; require.NoError(t, err) } + +func localityFromStr(t *testing.T, s string) roachpb.Locality { + var l roachpb.Locality + require.NoError(t, l.Set(s)) + return l +} + +func TestBackupInLocality(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const numAccounts = 1000 + + // Disabled to run within tenant as certain MR features are not available to tenants. + args := base.TestClusterArgs{ServerArgsPerNode: map[int]base.TestServerArgs{ + 0: {Locality: localityFromStr(t, "region=east,dc=1,az=1")}, + 1: {Locality: localityFromStr(t, "region=east,dc=2,az=2")}, + 2: {Locality: localityFromStr(t, "region=west,dc=1,az=1")}, + }} + + cluster, _, _, cleanupFn := backupRestoreTestSetupWithParams(t, 3 /* nodes */, numAccounts, InitManualReplication, args) + defer cleanupFn() + + for i, tc := range []struct { + node int + filter, err string + }{ + {node: 1, filter: "region=east", err: ""}, + {node: 1, filter: "region=east,dc=1", err: ""}, + {node: 1, filter: "region=east,dc=6", err: "no instances found"}, + {node: 1, filter: "region=central", err: "no instances found"}, + {node: 1, filter: "region=east,dc=2", err: "relocated"}, + {node: 1, filter: "region=west,dc=1", err: "relocated"}, + + {node: 2, filter: "region=east", err: ""}, + {node: 2, filter: "region=east,az=2", err: ""}, + {node: 2, filter: "region=east,dc=1", err: "relocated"}, + {node: 2, filter: "region=east,az=1", err: "relocated"}, + + {node: 3, filter: "region=east", err: "relocated"}, + {node: 3, filter: "region=central,dc=1", err: "no instances found"}, + } { + db := sqlutils.MakeSQLRunner(cluster.ServerConn(tc.node - 1)) + db.ExpectErr(t, tc.err, "BACKUP system.users INTO $1 WITH coordinator_locality = $2", fmt.Sprintf("userfile:///tc%d", i), tc.filter) + } +} diff --git a/pkg/jobs/errors.go b/pkg/jobs/errors.go index 6cecafdbce11..bd1a8c33de9c 100644 --- a/pkg/jobs/errors.go +++ b/pkg/jobs/errors.go @@ -48,12 +48,6 @@ func IsPermanentJobError(err error) bool { return errors.Is(err, errJobPermanentSentinel) } -// IsPauseSelfError checks whether the given error is a -// PauseRequestError. -func IsPauseSelfError(err error) bool { - return errors.Is(err, errPauseSelfSentinel) -} - // errPauseSelfSentinel exists so the errors returned from PauseRequestErr can // be marked with it. var errPauseSelfSentinel = errors.New("job requested it be paused") @@ -65,9 +59,19 @@ func MarkPauseRequestError(reason error) error { return errors.Mark(reason, errPauseSelfSentinel) } +// IsPauseSelfError checks whether the given error is a +// PauseRequestError. +func IsPauseSelfError(err error) bool { + return errors.Is(err, errPauseSelfSentinel) +} + // PauseRequestExplained is a prose used to wrap and explain a pause-request error. const PauseRequestExplained = "pausing due to error; use RESUME JOB to try to proceed once the issue is resolved, or CANCEL JOB to rollback" +// errJobLeaseNotHeld is a marker error for returning from a job execution if it +// knows or finds out it no longer has a job lease. +var errJobLeaseNotHeld = errors.New("job lease not held") + // InvalidStatusError is the error returned when the desired operation is // invalid given the job's current status. type InvalidStatusError struct { diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 7982a35d4eb1..c6c544f58d0c 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -305,6 +305,10 @@ message BackupDetails { // ApplicationName is the application name in the session where the backup was // invoked. string application_name = 23; + + roachpb.Locality coordinator_location = 24 [(gogoproto.nullable) = false]; + + // NEXT ID: 25; } message BackupProgress { diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 91ed09d9788c..a4a60dcce12c 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -1604,6 +1604,9 @@ func (r *Registry) stepThroughStateMachine( jm.ResumeRetryError.Inc(1) return errors.Errorf("job %d: node liveness error: restarting in background", job.ID()) } + if errors.Is(err, errJobLeaseNotHeld) { + return err + } if errors.Is(err, errPauseSelfSentinel) { if err := r.PauseRequested(ctx, nil, job.ID(), err.Error()); err != nil { @@ -1901,6 +1904,23 @@ func (r *Registry) CheckPausepoint(name string) error { return nil } +func (r *Registry) RelocateLease( + ctx context.Context, + txn isql.Txn, + id jobspb.JobID, + destID base.SQLInstanceID, + destSession sqlliveness.SessionID, +) (sentinel error, failure error) { + if _, err := r.db.Executor().Exec(ctx, "job-relocate-coordinator", txn.KV(), + "UPDATE system.jobs SET claim_instance_id = $2, claim_session_id = $3 WHERE id = $1", + id, destID, destSession.UnsafeBytes(), + ); err != nil { + return nil, errors.Wrapf(err, "failed to relocate job coordinator to %d", destID) + } + + return errors.Mark(errors.Newf("execution of job %d relocated to %d", id, destID), errJobLeaseNotHeld), nil +} + // TestingIsJobIdle returns true if the job is adopted and currently idle. func (r *Registry) TestingIsJobIdle(jobID jobspb.JobID) bool { r.mu.Lock() diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 60514987fe21..79574b5cf9af 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -597,6 +597,23 @@ func (l Locality) String() string { return strings.Join(tiers, ",") } +// NonEmpty returns true if the tiers are non-empty. +func (l Locality) NonEmpty() bool { + return len(l.Tiers) > 0 +} + +// Matches checks if this locality has a tier with a matching value for each +// tier of the passed filter, returning true if so or false if not along with +// the first tier of the filters that did not matched. +func (l Locality) Matches(filter Locality) (bool, Tier) { + for _, t := range filter.Tiers { + if v, ok := l.Find(t.Key); !ok || v != t.Value { + return false, t + } + } + return true, Tier{} +} + // Type returns the underlying type in string form. This is part of pflag's // value interface. func (Locality) Type() string { diff --git a/pkg/roachpb/metadata_test.go b/pkg/roachpb/metadata_test.go index abf764343183..67f691e37511 100644 --- a/pkg/roachpb/metadata_test.go +++ b/pkg/roachpb/metadata_test.go @@ -186,6 +186,41 @@ func TestLocalityConversions(t *testing.T) { } } +func TestLocalityMatches(t *testing.T) { + var empty Locality + var l Locality + require.NoError(t, l.Set("a=b,c=d,e=f")) + for _, tc := range []struct { + filter string + miss string + }{ + {filter: "", miss: ""}, + {filter: "a=b", miss: ""}, + {filter: "a=b,c=d,e=f", miss: ""}, + {filter: "c=d,e=f,a=b", miss: ""}, + {filter: "a=z", miss: "a=z"}, + {filter: "a=b,c=x,e=f", miss: "c=x"}, + {filter: "a=b,x=y", miss: "x=y"}, + } { + t.Run(fmt.Sprintf("%s-miss-%s", tc.filter, tc.miss), func(t *testing.T) { + var filter Locality + if tc.filter != "" { + require.NoError(t, filter.Set(tc.filter)) + } + matches, miss := l.Matches(filter) + if tc.miss == "" { + require.True(t, matches) + } else { + require.False(t, matches) + require.Equal(t, tc.miss, miss.String()) + } + + emptyMatches, _ := empty.Matches(filter) + require.Equal(t, tc.filter == "", emptyMatches) + }) + } +} + func TestDiversityScore(t *testing.T) { // Keys are not considered for score, just the order, so we don't need to // specify them. diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index fd6e754449d1..e724f016c4c9 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -216,6 +216,28 @@ func NewDistSQLPlanner( return dsp } +// GetAllInstancesByLocality lists all instances that match the passed locality +// filters. +func (dsp *DistSQLPlanner) GetAllInstancesByLocality( + ctx context.Context, filter roachpb.Locality, +) ([]sqlinstance.InstanceInfo, error) { + all, err := dsp.sqlAddressResolver.GetAllInstances(ctx) + if err != nil { + return nil, err + } + var pos int + for _, n := range all { + if ok, _ := n.Locality.Matches(filter); ok { + all[pos] = n + pos++ + } + } + if pos == 0 { + return nil, errors.Newf("no instances found matching locality filter %s", filter.String()) + } + return all[:pos], nil +} + // GetSQLInstanceInfo gets a node descriptor by node ID. func (dsp *DistSQLPlanner) GetSQLInstanceInfo( sqlInstanceID base.SQLInstanceID, diff --git a/pkg/sql/opt/exec/execbuilder/scalar.go b/pkg/sql/opt/exec/execbuilder/scalar.go index 3ebbd2df0b6e..1cdbff9092c0 100644 --- a/pkg/sql/opt/exec/execbuilder/scalar.go +++ b/pkg/sql/opt/exec/execbuilder/scalar.go @@ -679,8 +679,7 @@ func (b *Builder) buildExistsSubquery( // TODO(mgartner): This path should never be executed because the // ConvertUncorrelatedExistsToCoalesceSubquery converts all uncorrelated // Exists with Coalesce+Subquery expressions. Remove this and the execution - // support for the Exists mode. Remember to mark - // ConvertUncorrelatedExistsToCoalesceSubquery as an essential rule. + // support for the Exists mode. plan, err := b.buildRelational(exists.Input) if err != nil { return nil, err @@ -1032,5 +1031,5 @@ func (b *Builder) buildRoutinePlanGenerator( } func expectedLazyRoutineError(typ string) error { - return errors.AssertionFailedf("expected %s to be lazily planned as routines", typ) + return errors.AssertionFailedf("expected %s to be lazily planned as a routine", typ) } diff --git a/pkg/sql/opt/xform/optimizer.go b/pkg/sql/opt/xform/optimizer.go index 3ec32f09496d..7774cdc168e0 100644 --- a/pkg/sql/opt/xform/optimizer.go +++ b/pkg/sql/opt/xform/optimizer.go @@ -1078,6 +1078,9 @@ func (o *Optimizer) disableRulesRandom(probability float64) { int(opt.PruneScanCols), // Needed to ensure that the input of a RangeExpr is always an AndExpr. int(opt.SimplifyRange), + // Needed to ensure that all uncorrelated EXISTS subqueries are + // converted to COALESCE+subquery expressions. + int(opt.ConvertUncorrelatedExistsToCoalesceSubquery), ) var disabledRules RuleSet diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 520964173a6b..eb3a743c4f76 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -890,7 +890,7 @@ func (u *sqlSymUnion) showTenantOpts() tree.ShowTenantOptions { %token CLUSTER COALESCE COLLATE COLLATION COLUMN COLUMNS COMMENT COMMENTS COMMIT %token COMMITTED COMPACT COMPLETE COMPLETIONS CONCAT CONCURRENTLY CONFIGURATION CONFIGURATIONS CONFIGURE %token CONFLICT CONNECTION CONNECTIONS CONSTRAINT CONSTRAINTS CONTAINS CONTROLCHANGEFEED CONTROLJOB -%token CONVERSION CONVERT COPY COST COVERING CREATE CREATEDB CREATELOGIN CREATEROLE +%token CONVERSION CONVERT COORDINATOR_LOCALITY COPY COST COVERING CREATE CREATEDB CREATELOGIN CREATEROLE %token CROSS CSV CUBE CURRENT CURRENT_CATALOG CURRENT_DATE CURRENT_SCHEMA %token CURRENT_ROLE CURRENT_TIME CURRENT_TIMESTAMP %token CURRENT_USER CURSOR CYCLE @@ -3241,9 +3241,12 @@ backup_options: } | INCREMENTAL_LOCATION '=' string_or_placeholder_opt_list { - $$.val = &tree.BackupOptions{IncrementalStorage: $3.stringOrPlaceholderOptList()} + $$.val = &tree.BackupOptions{IncrementalStorage: $3.stringOrPlaceholderOptList()} + } +| COORDINATOR_LOCALITY '=' string_or_placeholder + { + $$.val = &tree.BackupOptions{CoordinatorLocality: $3.expr()} } - // %Help: CREATE SCHEDULE FOR BACKUP - backup data periodically // %Category: CCL @@ -15967,6 +15970,7 @@ unreserved_keyword: | CONTROLJOB | CONVERSION | CONVERT +| COORDINATOR_LOCALITY | COPY | COST | COVERING @@ -16356,6 +16360,7 @@ bare_label_keywords: AS_JSON | ATOMIC | CALLED +| COORDINATOR_LOCALITY | COST | CHECK_FILES | DEBUG_IDS diff --git a/pkg/sql/parser/testdata/backup_restore b/pkg/sql/parser/testdata/backup_restore index 01fea821f897..49addbbe1094 100644 --- a/pkg/sql/parser/testdata/backup_restore +++ b/pkg/sql/parser/testdata/backup_restore @@ -324,13 +324,13 @@ BACKUP DATABASE foo TO ($1, $1) INCREMENTAL FROM '_' -- literals removed BACKUP DATABASE _ TO ($1, $2) INCREMENTAL FROM 'baz' -- identifiers removed parse -BACKUP foo TO 'bar' WITH ENCRYPTION_PASSPHRASE = 'secret', revision_history +BACKUP foo TO 'bar' WITH ENCRYPTION_PASSPHRASE = 'secret', revision_history, coordinator_locality = 'a=b' ---- -BACKUP TABLE foo TO 'bar' WITH revision_history = true, encryption_passphrase = '*****' -- normalized! -BACKUP TABLE (foo) TO ('bar') WITH revision_history = (true), encryption_passphrase = '*****' -- fully parenthesized -BACKUP TABLE foo TO '_' WITH revision_history = _, encryption_passphrase = '*****' -- literals removed -BACKUP TABLE _ TO 'bar' WITH revision_history = true, encryption_passphrase = '*****' -- identifiers removed -BACKUP TABLE foo TO 'bar' WITH revision_history = true, encryption_passphrase = 'secret' -- passwords exposed +BACKUP TABLE foo TO 'bar' WITH revision_history = true, encryption_passphrase = '*****', coordinator_locality = 'a=b' -- normalized! +BACKUP TABLE (foo) TO ('bar') WITH revision_history = (true), encryption_passphrase = '*****', coordinator_locality = ('a=b') -- fully parenthesized +BACKUP TABLE foo TO '_' WITH revision_history = _, encryption_passphrase = '*****', coordinator_locality = '_' -- literals removed +BACKUP TABLE _ TO 'bar' WITH revision_history = true, encryption_passphrase = '*****', coordinator_locality = 'a=b' -- identifiers removed +BACKUP TABLE foo TO 'bar' WITH revision_history = true, encryption_passphrase = 'secret', coordinator_locality = 'a=b' -- passwords exposed parse BACKUP foo TO 'bar' WITH KMS = ('foo', 'bar'), revision_history @@ -752,12 +752,12 @@ BACKUP TABLE foo TO '_' WITH revision_history = _, detached -- literals removed BACKUP TABLE _ TO 'bar' WITH revision_history = true, detached -- identifiers removed parse -BACKUP TABLE foo TO 'bar' WITH revision_history = $1, detached +BACKUP TABLE foo TO 'bar' WITH revision_history = $1, detached, coordinator_locality = $2 ---- -BACKUP TABLE foo TO 'bar' WITH revision_history = $1, detached -BACKUP TABLE (foo) TO ('bar') WITH revision_history = ($1), detached -- fully parenthesized -BACKUP TABLE foo TO '_' WITH revision_history = $1, detached -- literals removed -BACKUP TABLE _ TO 'bar' WITH revision_history = $1, detached -- identifiers removed +BACKUP TABLE foo TO 'bar' WITH revision_history = $1, detached, coordinator_locality = $2 +BACKUP TABLE (foo) TO ('bar') WITH revision_history = ($1), detached, coordinator_locality = ($2) -- fully parenthesized +BACKUP TABLE foo TO '_' WITH revision_history = $1, detached, coordinator_locality = $1 -- literals removed +BACKUP TABLE _ TO 'bar' WITH revision_history = $1, detached, coordinator_locality = $2 -- identifiers removed parse RESTORE TABLE foo FROM 'bar' WITH skip_missing_foreign_keys, skip_missing_sequences, detached diff --git a/pkg/sql/sem/tree/backup.go b/pkg/sql/sem/tree/backup.go index e4e17367dc86..094456fe268a 100644 --- a/pkg/sql/sem/tree/backup.go +++ b/pkg/sql/sem/tree/backup.go @@ -45,6 +45,7 @@ type BackupOptions struct { Detached *DBool EncryptionKMSURI StringOrPlaceholderOptList IncrementalStorage StringOrPlaceholderOptList + CoordinatorLocality Expr } var _ NodeFormatter = &BackupOptions{} @@ -292,6 +293,13 @@ func (o *BackupOptions) Format(ctx *FmtCtx) { ctx.WriteString("incremental_location = ") ctx.FormatNode(&o.IncrementalStorage) } + + if o.CoordinatorLocality != nil { + maybeAddSep() + ctx.WriteString("coordinator_locality = ") + ctx.FormatNode(o.CoordinatorLocality) + } + } // CombineWith merges other backup options into this backup options struct. @@ -331,6 +339,12 @@ func (o *BackupOptions) CombineWith(other *BackupOptions) error { return errors.New("incremental_location option specified multiple times") } + if o.CoordinatorLocality == nil { + o.CoordinatorLocality = other.CoordinatorLocality + } else if other.CoordinatorLocality != nil { + return errors.New("coordinator_locality option specified multiple times") + } + return nil } @@ -341,7 +355,8 @@ func (o BackupOptions) IsDefault() bool { o.Detached == options.Detached && cmp.Equal(o.EncryptionKMSURI, options.EncryptionKMSURI) && o.EncryptionPassphrase == options.EncryptionPassphrase && - cmp.Equal(o.IncrementalStorage, options.IncrementalStorage) + cmp.Equal(o.IncrementalStorage, options.IncrementalStorage) && + o.CoordinatorLocality == options.CoordinatorLocality } // Format implements the NodeFormatter interface.