Skip to content

Commit

Permalink
backup: allow restricting backup coordination by region
Browse files Browse the repository at this point in the history
The coordinator of a backup job needs to access the 'default' locality
to read and write metadata including the backup checkpoint files and
manifest. Prior to this change, this meant that every node in the
cluster needed to be able to access this default locality storage
location, since any node could become the coordinator.

This change introduces a new locality filter option that can be
specified when the backup job is created. If set, this new option will
cause any node which attempts to execute the backup job to check if it
meets the locality requirements and, if not, move the execution of the
job to a node which does meet them, or fail if not such node can be
found.

A locality requirement is specified as any number of key=value pairs,
each of which a node must match to be eligible to execute the job, for
example, a job run with BACKUP ... WITH coordinator_locality =
'region=east,cloud=azure' would require a node have both 'region=east'
and 'cloud=azure', however the order is not significant, only that each
specified filter is met.

Jobs typically begin executing directly on the node on which they are
created so if that node has matching localities, it will execute the
job, or relocate it if it does not.

Relocated executions may take some amount of time to resume on the new
node to which they were relocated, similar to the delay seen when a
paused job is resumed, typically between a few seconds to a minute.

Note that this only restricts the *coordination* of the backup job --
reading the row data from individual ranges and exporting that data to
the destination storage location or locations is still performed by many
nodes, typically the leaseholders for each range.

Release note (enterprise change): coordination of BACKUP jobs and thus
writing of BACKUP metadata can be restricted to nodes within designated
localities using the new 'coordinator_locality' option.

Epic: CRDB-9547.
  • Loading branch information
dt committed Feb 7, 2023
1 parent a8db047 commit 91d2a76
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 15 deletions.
3 changes: 3 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ unreserved_keyword ::=
| 'CONTROLJOB'
| 'CONVERSION'
| 'CONVERT'
| 'COORDINATOR_LOCALITY'
| 'COPY'
| 'COST'
| 'COVERING'
Expand Down Expand Up @@ -2256,6 +2257,7 @@ backup_options ::=
| 'DETACHED' '=' 'FALSE'
| 'KMS' '=' string_or_placeholder_opt_list
| 'INCREMENTAL_LOCATION' '=' string_or_placeholder_opt_list
| 'COORDINATOR_LOCALITY' '=' string_or_placeholder

c_expr ::=
d_expr
Expand Down Expand Up @@ -3409,6 +3411,7 @@ bare_label_keywords ::=
'AS_JSON'
| 'ATOMIC'
| 'CALLED'
| 'COORDINATOR_LOCALITY'
| 'COST'
| 'CHECK_FILES'
| 'DEBUG_IDS'
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
"//pkg/util/stop",
Expand Down
41 changes: 41 additions & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -450,6 +451,11 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// The span is finished by the registry executing the job.
details := b.job.Details().(jobspb.BackupDetails)
p := execCtx.(sql.JobExecContext)

if err := b.maybeRelocateJobExecution(ctx, p, details.CoordinatorLocation); err != nil {
return err
}

kmsEnv := backupencryption.MakeBackupKMSEnv(
p.ExecCfg().Settings,
&p.ExecCfg().ExternalIODirConfig,
Expand Down Expand Up @@ -842,6 +848,41 @@ func (b *backupResumer) ReportResults(ctx context.Context, resultsCh chan<- tree
}
}

func (b *backupResumer) maybeRelocateJobExecution(
ctx context.Context, p sql.JobExecContext, locality roachpb.Locality,
) error {
if locality.NonEmpty() {
current, err := p.DistSQLPlanner().GetSQLInstanceInfo(p.ExecCfg().JobRegistry.ID())
if err != nil {
return err
}
if ok, missedTier := current.Locality.Matches(locality); !ok {
log.Infof(ctx,
"BACKUP job %d initially adopted on instance %d but it does not match locality filter %s, finding a new coordinator",
b.job.ID(), current.NodeID, missedTier.String(),
)

instancesInRegion, err := p.DistSQLPlanner().GetAllInstancesByLocality(ctx, locality)
if err != nil {
return err
}
rng, _ := randutil.NewPseudoRand()
dest := instancesInRegion[rng.Intn(len(instancesInRegion))]

var res error
if err := p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
var err error
res, err = p.ExecCfg().JobRegistry.RelocateLease(ctx, txn, b.job.ID(), dest.InstanceID, dest.SessionID)
return err
}); err != nil {
return errors.Wrapf(err, "failed to relocate job coordinator to %d", dest.InstanceID)
}
return res
}
}
return nil
}

func getBackupDetailAndManifest(
ctx context.Context,
execCfg *sql.ExecutorConfig,
Expand Down
22 changes: 22 additions & 0 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ func backupTypeCheck(
exprutil.Strings{
backupStmt.Subdir,
backupStmt.Options.EncryptionPassphrase,
backupStmt.Options.CoordinatorLocality,
},
exprutil.StringArrays{
tree.Exprs(backupStmt.To),
Expand Down Expand Up @@ -600,6 +601,19 @@ func backupPlanHook(
}
}

var coordinatorLocality roachpb.Locality
if backupStmt.Options.CoordinatorLocality != nil {
s, err := exprEval.String(ctx, backupStmt.Options.CoordinatorLocality)
if err != nil {
return nil, nil, nil, false, err
}
if s != "" {
if err := coordinatorLocality.Set(s); err != nil {
return nil, nil, nil, false, err
}
}
}

encryptionParams := jobspb.BackupEncryptionOptions{
Mode: jobspb.EncryptionMode_None,
}
Expand Down Expand Up @@ -712,6 +726,13 @@ func backupPlanHook(
return err
}

// Check that a node will currently be able to run this before we create it.
if coordinatorLocality.NonEmpty() {
if _, err := p.DistSQLPlanner().GetAllInstancesByLocality(ctx, coordinatorLocality); err != nil {
return err
}
}

initialDetails := jobspb.BackupDetails{
Destination: jobspb.BackupDetails_Destination{To: to, IncrementalStorage: incrementalStorage},
EndTime: endTime,
Expand All @@ -723,6 +744,7 @@ func backupPlanHook(
AsOfInterval: asOfInterval,
Detached: detached,
ApplicationName: p.SessionData().ApplicationName,
CoordinatorLocation: coordinatorLocality,
}
if backupStmt.CreatedByInfo != nil && backupStmt.CreatedByInfo.Name == jobs.CreatedByScheduledJobs {
initialDetails.ScheduleID = backupStmt.CreatedByInfo.ID
Expand Down
46 changes: 46 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10732,3 +10732,49 @@ $$;
require.NoError(t, err)

}

func localityFromStr(t *testing.T, s string) roachpb.Locality {
var l roachpb.Locality
require.NoError(t, l.Set(s))
return l
}

func TestBackupInLocality(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numAccounts = 1000

// Disabled to run within tenant as certain MR features are not available to tenants.
args := base.TestClusterArgs{ServerArgsPerNode: map[int]base.TestServerArgs{
0: {Locality: localityFromStr(t, "region=east,dc=1,az=1")},
1: {Locality: localityFromStr(t, "region=east,dc=2,az=2")},
2: {Locality: localityFromStr(t, "region=west,dc=1,az=1")},
}}

cluster, _, _, cleanupFn := backupRestoreTestSetupWithParams(t, 3 /* nodes */, numAccounts, InitManualReplication, args)
defer cleanupFn()

for i, tc := range []struct {
node int
filter, err string
}{
{node: 1, filter: "region=east", err: ""},
{node: 1, filter: "region=east,dc=1", err: ""},
{node: 1, filter: "region=east,dc=6", err: "no instances found"},
{node: 1, filter: "region=central", err: "no instances found"},
{node: 1, filter: "region=east,dc=2", err: "relocated"},
{node: 1, filter: "region=west,dc=1", err: "relocated"},

{node: 2, filter: "region=east", err: ""},
{node: 2, filter: "region=east,az=2", err: ""},
{node: 2, filter: "region=east,dc=1", err: "relocated"},
{node: 2, filter: "region=east,az=1", err: "relocated"},

{node: 3, filter: "region=east", err: "relocated"},
{node: 3, filter: "region=central,dc=1", err: "no instances found"},
} {
db := sqlutils.MakeSQLRunner(cluster.ServerConn(tc.node - 1))
db.ExpectErr(t, tc.err, "BACKUP system.users INTO $1 WITH coordinator_locality = $2", fmt.Sprintf("userfile:///tc%d", i), tc.filter)
}
}
4 changes: 4 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ message BackupDetails {
// ApplicationName is the application name in the session where the backup was
// invoked.
string application_name = 23;

roachpb.Locality coordinator_location = 24 [(gogoproto.nullable) = false];

// NEXT ID: 25;
}

message BackupProgress {
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,28 @@ func NewDistSQLPlanner(
return dsp
}

// GetAllInstancesByLocality lists all instances that match the passed locality
// filters.
func (dsp *DistSQLPlanner) GetAllInstancesByLocality(
ctx context.Context, filter roachpb.Locality,
) ([]sqlinstance.InstanceInfo, error) {
all, err := dsp.sqlAddressResolver.GetAllInstances(ctx)
if err != nil {
return nil, err
}
var pos int
for _, n := range all {
if ok, _ := n.Locality.Matches(filter); ok {
all[pos] = n
pos++
}
}
if pos == 0 {
return nil, errors.Newf("no instances found matching locality filter %s", filter.String())
}
return all[:pos], nil
}

// GetSQLInstanceInfo gets a node descriptor by node ID.
func (dsp *DistSQLPlanner) GetSQLInstanceInfo(
sqlInstanceID base.SQLInstanceID,
Expand Down
11 changes: 8 additions & 3 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ func (u *sqlSymUnion) showTenantOpts() tree.ShowTenantOptions {
%token <str> CLUSTER COALESCE COLLATE COLLATION COLUMN COLUMNS COMMENT COMMENTS COMMIT
%token <str> COMMITTED COMPACT COMPLETE COMPLETIONS CONCAT CONCURRENTLY CONFIGURATION CONFIGURATIONS CONFIGURE
%token <str> CONFLICT CONNECTION CONNECTIONS CONSTRAINT CONSTRAINTS CONTAINS CONTROLCHANGEFEED CONTROLJOB
%token <str> CONVERSION CONVERT COPY COST COVERING CREATE CREATEDB CREATELOGIN CREATEROLE
%token <str> CONVERSION CONVERT COORDINATOR_LOCALITY COPY COST COVERING CREATE CREATEDB CREATELOGIN CREATEROLE
%token <str> CROSS CSV CUBE CURRENT CURRENT_CATALOG CURRENT_DATE CURRENT_SCHEMA
%token <str> CURRENT_ROLE CURRENT_TIME CURRENT_TIMESTAMP
%token <str> CURRENT_USER CURSOR CYCLE
Expand Down Expand Up @@ -3241,9 +3241,12 @@ backup_options:
}
| INCREMENTAL_LOCATION '=' string_or_placeholder_opt_list
{
$$.val = &tree.BackupOptions{IncrementalStorage: $3.stringOrPlaceholderOptList()}
$$.val = &tree.BackupOptions{IncrementalStorage: $3.stringOrPlaceholderOptList()}
}
| COORDINATOR_LOCALITY '=' string_or_placeholder
{
$$.val = &tree.BackupOptions{CoordinatorLocality: $3.expr()}
}


// %Help: CREATE SCHEDULE FOR BACKUP - backup data periodically
// %Category: CCL
Expand Down Expand Up @@ -15967,6 +15970,7 @@ unreserved_keyword:
| CONTROLJOB
| CONVERSION
| CONVERT
| COORDINATOR_LOCALITY
| COPY
| COST
| COVERING
Expand Down Expand Up @@ -16356,6 +16360,7 @@ bare_label_keywords:
AS_JSON
| ATOMIC
| CALLED
| COORDINATOR_LOCALITY
| COST
| CHECK_FILES
| DEBUG_IDS
Expand Down
22 changes: 11 additions & 11 deletions pkg/sql/parser/testdata/backup_restore
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,13 @@ BACKUP DATABASE foo TO ($1, $1) INCREMENTAL FROM '_' -- literals removed
BACKUP DATABASE _ TO ($1, $2) INCREMENTAL FROM 'baz' -- identifiers removed

parse
BACKUP foo TO 'bar' WITH ENCRYPTION_PASSPHRASE = 'secret', revision_history
BACKUP foo TO 'bar' WITH ENCRYPTION_PASSPHRASE = 'secret', revision_history, coordinator_locality = 'a=b'
----
BACKUP TABLE foo TO 'bar' WITH revision_history = true, encryption_passphrase = '*****' -- normalized!
BACKUP TABLE (foo) TO ('bar') WITH revision_history = (true), encryption_passphrase = '*****' -- fully parenthesized
BACKUP TABLE foo TO '_' WITH revision_history = _, encryption_passphrase = '*****' -- literals removed
BACKUP TABLE _ TO 'bar' WITH revision_history = true, encryption_passphrase = '*****' -- identifiers removed
BACKUP TABLE foo TO 'bar' WITH revision_history = true, encryption_passphrase = 'secret' -- passwords exposed
BACKUP TABLE foo TO 'bar' WITH revision_history = true, encryption_passphrase = '*****', coordinator_locality = 'a=b' -- normalized!
BACKUP TABLE (foo) TO ('bar') WITH revision_history = (true), encryption_passphrase = '*****', coordinator_locality = ('a=b') -- fully parenthesized
BACKUP TABLE foo TO '_' WITH revision_history = _, encryption_passphrase = '*****', coordinator_locality = '_' -- literals removed
BACKUP TABLE _ TO 'bar' WITH revision_history = true, encryption_passphrase = '*****', coordinator_locality = 'a=b' -- identifiers removed
BACKUP TABLE foo TO 'bar' WITH revision_history = true, encryption_passphrase = 'secret', coordinator_locality = 'a=b' -- passwords exposed

parse
BACKUP foo TO 'bar' WITH KMS = ('foo', 'bar'), revision_history
Expand Down Expand Up @@ -752,12 +752,12 @@ BACKUP TABLE foo TO '_' WITH revision_history = _, detached -- literals removed
BACKUP TABLE _ TO 'bar' WITH revision_history = true, detached -- identifiers removed

parse
BACKUP TABLE foo TO 'bar' WITH revision_history = $1, detached
BACKUP TABLE foo TO 'bar' WITH revision_history = $1, detached, coordinator_locality = $2
----
BACKUP TABLE foo TO 'bar' WITH revision_history = $1, detached
BACKUP TABLE (foo) TO ('bar') WITH revision_history = ($1), detached -- fully parenthesized
BACKUP TABLE foo TO '_' WITH revision_history = $1, detached -- literals removed
BACKUP TABLE _ TO 'bar' WITH revision_history = $1, detached -- identifiers removed
BACKUP TABLE foo TO 'bar' WITH revision_history = $1, detached, coordinator_locality = $2
BACKUP TABLE (foo) TO ('bar') WITH revision_history = ($1), detached, coordinator_locality = ($2) -- fully parenthesized
BACKUP TABLE foo TO '_' WITH revision_history = $1, detached, coordinator_locality = $1 -- literals removed
BACKUP TABLE _ TO 'bar' WITH revision_history = $1, detached, coordinator_locality = $2 -- identifiers removed

parse
RESTORE TABLE foo FROM 'bar' WITH skip_missing_foreign_keys, skip_missing_sequences, detached
Expand Down
17 changes: 16 additions & 1 deletion pkg/sql/sem/tree/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type BackupOptions struct {
Detached *DBool
EncryptionKMSURI StringOrPlaceholderOptList
IncrementalStorage StringOrPlaceholderOptList
CoordinatorLocality Expr
}

var _ NodeFormatter = &BackupOptions{}
Expand Down Expand Up @@ -292,6 +293,13 @@ func (o *BackupOptions) Format(ctx *FmtCtx) {
ctx.WriteString("incremental_location = ")
ctx.FormatNode(&o.IncrementalStorage)
}

if o.CoordinatorLocality != nil {
maybeAddSep()
ctx.WriteString("coordinator_locality = ")
ctx.FormatNode(o.CoordinatorLocality)
}

}

// CombineWith merges other backup options into this backup options struct.
Expand Down Expand Up @@ -331,6 +339,12 @@ func (o *BackupOptions) CombineWith(other *BackupOptions) error {
return errors.New("incremental_location option specified multiple times")
}

if o.CoordinatorLocality == nil {
o.CoordinatorLocality = other.CoordinatorLocality
} else if other.CoordinatorLocality != nil {
return errors.New("coordinator_locality option specified multiple times")
}

return nil
}

Expand All @@ -341,7 +355,8 @@ func (o BackupOptions) IsDefault() bool {
o.Detached == options.Detached &&
cmp.Equal(o.EncryptionKMSURI, options.EncryptionKMSURI) &&
o.EncryptionPassphrase == options.EncryptionPassphrase &&
cmp.Equal(o.IncrementalStorage, options.IncrementalStorage)
cmp.Equal(o.IncrementalStorage, options.IncrementalStorage) &&
o.CoordinatorLocality == options.CoordinatorLocality
}

// Format implements the NodeFormatter interface.
Expand Down

0 comments on commit 91d2a76

Please sign in to comment.