Skip to content

Commit

Permalink
Merge #95791 #96802
Browse files Browse the repository at this point in the history
95791: backup: allow restricting backup coordination by region r=dt a=dt

The coordinator of a backup job needs to access the 'default' locality
to read and write metadata including the backup checkpoint files and
manifest. Prior to this change, this meant that every node in the
cluster needed to be able to access this default locality storage
location, since any node could become the coordinator.

This change introduces a new locality filter option that can be
specified when the backup job is created. If set, this new option will
cause any node which attempts to execute the backup job to check if it
meets the locality requirements and, if not, move the execution of the
job to a node which does meet them, or fail if not such node can be
found.

A locality requirement is specified as any number of key=value pairs,
each of which a node must match to be eligible to execute the job, for
example, a job run with BACKUP ... WITH coordinator_locality =
'region=east,cloud=azure' would require a node have both 'region=east'
and 'cloud=azure', however the order is not significant, only that each
specified filter is met.

Jobs typically begin executing directly on the node on which they are
created so if that node has matching localities, it will execute the
job, or relocate it if it does not.

Relocated executions may take some amount of time to resume on the new
node to which they were relocated, similar to the delay seen when a
paused job is resumed, typically between a few seconds to a minute.

Note that this only restricts the *coordination* of the backup job --
reading the row data from individual ranges and exporting that data to
the destination storage location or locations is still performed by many
nodes, typically the leaseholders for each range.

Release note (enterprise change): coordination of BACKUP jobs and thus
writing of BACKUP metadata can be restricted to nodes within designated
localities using the new 'coordinator_locality' option.

Epic: [CRDB-9547](https://cockroachlabs.atlassian.net/browse/CRDB-9547).

96802: opt: mark ConvertUncorrelatedExistsToCoalesceSubquery as essential r=mgartner a=mgartner

This rule converts uncorrelated `EXISTS` subqueries into
`COALESCE`+subquery expressions. It is essential because execbuilder
cannot plan lazily-executed, uncorrelated `EXISTS` subqueries.

Fixes #95546

Release note: None


Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Co-authored-by: Marcus Gartner <marcus@cockroachlabs.com>
  • Loading branch information
3 people committed Feb 8, 2023
3 parents 7f62dc7 + 91d2a76 + 9f319dd commit 6ca0b8c
Show file tree
Hide file tree
Showing 16 changed files with 261 additions and 24 deletions.
3 changes: 3 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ unreserved_keyword ::=
| 'CONTROLJOB'
| 'CONVERSION'
| 'CONVERT'
| 'COORDINATOR_LOCALITY'
| 'COPY'
| 'COST'
| 'COVERING'
Expand Down Expand Up @@ -2257,6 +2258,7 @@ backup_options ::=
| 'DETACHED' '=' 'FALSE'
| 'KMS' '=' string_or_placeholder_opt_list
| 'INCREMENTAL_LOCATION' '=' string_or_placeholder_opt_list
| 'COORDINATOR_LOCALITY' '=' string_or_placeholder

c_expr ::=
d_expr
Expand Down Expand Up @@ -3410,6 +3412,7 @@ bare_label_keywords ::=
'AS_JSON'
| 'ATOMIC'
| 'CALLED'
| 'COORDINATOR_LOCALITY'
| 'COST'
| 'CHECK_FILES'
| 'DEBUG_IDS'
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
"//pkg/util/stop",
Expand Down
41 changes: 41 additions & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -450,6 +451,11 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// The span is finished by the registry executing the job.
details := b.job.Details().(jobspb.BackupDetails)
p := execCtx.(sql.JobExecContext)

if err := b.maybeRelocateJobExecution(ctx, p, details.CoordinatorLocation); err != nil {
return err
}

kmsEnv := backupencryption.MakeBackupKMSEnv(
p.ExecCfg().Settings,
&p.ExecCfg().ExternalIODirConfig,
Expand Down Expand Up @@ -840,6 +846,41 @@ func (b *backupResumer) ReportResults(ctx context.Context, resultsCh chan<- tree
}
}

func (b *backupResumer) maybeRelocateJobExecution(
ctx context.Context, p sql.JobExecContext, locality roachpb.Locality,
) error {
if locality.NonEmpty() {
current, err := p.DistSQLPlanner().GetSQLInstanceInfo(p.ExecCfg().JobRegistry.ID())
if err != nil {
return err
}
if ok, missedTier := current.Locality.Matches(locality); !ok {
log.Infof(ctx,
"BACKUP job %d initially adopted on instance %d but it does not match locality filter %s, finding a new coordinator",
b.job.ID(), current.NodeID, missedTier.String(),
)

instancesInRegion, err := p.DistSQLPlanner().GetAllInstancesByLocality(ctx, locality)
if err != nil {
return err
}
rng, _ := randutil.NewPseudoRand()
dest := instancesInRegion[rng.Intn(len(instancesInRegion))]

var res error
if err := p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
var err error
res, err = p.ExecCfg().JobRegistry.RelocateLease(ctx, txn, b.job.ID(), dest.InstanceID, dest.SessionID)
return err
}); err != nil {
return errors.Wrapf(err, "failed to relocate job coordinator to %d", dest.InstanceID)
}
return res
}
}
return nil
}

func getBackupDetailAndManifest(
ctx context.Context,
execCfg *sql.ExecutorConfig,
Expand Down
22 changes: 22 additions & 0 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ func backupTypeCheck(
exprutil.Strings{
backupStmt.Subdir,
backupStmt.Options.EncryptionPassphrase,
backupStmt.Options.CoordinatorLocality,
},
exprutil.StringArrays{
tree.Exprs(backupStmt.To),
Expand Down Expand Up @@ -600,6 +601,19 @@ func backupPlanHook(
}
}

var coordinatorLocality roachpb.Locality
if backupStmt.Options.CoordinatorLocality != nil {
s, err := exprEval.String(ctx, backupStmt.Options.CoordinatorLocality)
if err != nil {
return nil, nil, nil, false, err
}
if s != "" {
if err := coordinatorLocality.Set(s); err != nil {
return nil, nil, nil, false, err
}
}
}

encryptionParams := jobspb.BackupEncryptionOptions{
Mode: jobspb.EncryptionMode_None,
}
Expand Down Expand Up @@ -712,6 +726,13 @@ func backupPlanHook(
return err
}

// Check that a node will currently be able to run this before we create it.
if coordinatorLocality.NonEmpty() {
if _, err := p.DistSQLPlanner().GetAllInstancesByLocality(ctx, coordinatorLocality); err != nil {
return err
}
}

initialDetails := jobspb.BackupDetails{
Destination: jobspb.BackupDetails_Destination{To: to, IncrementalStorage: incrementalStorage},
EndTime: endTime,
Expand All @@ -723,6 +744,7 @@ func backupPlanHook(
AsOfInterval: asOfInterval,
Detached: detached,
ApplicationName: p.SessionData().ApplicationName,
CoordinatorLocation: coordinatorLocality,
}
if backupStmt.CreatedByInfo != nil && backupStmt.CreatedByInfo.Name == jobs.CreatedByScheduledJobs {
initialDetails.ScheduleID = backupStmt.CreatedByInfo.ID
Expand Down
46 changes: 46 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10754,3 +10754,49 @@ $$;
require.NoError(t, err)

}

func localityFromStr(t *testing.T, s string) roachpb.Locality {
var l roachpb.Locality
require.NoError(t, l.Set(s))
return l
}

func TestBackupInLocality(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numAccounts = 1000

// Disabled to run within tenant as certain MR features are not available to tenants.
args := base.TestClusterArgs{ServerArgsPerNode: map[int]base.TestServerArgs{
0: {Locality: localityFromStr(t, "region=east,dc=1,az=1")},
1: {Locality: localityFromStr(t, "region=east,dc=2,az=2")},
2: {Locality: localityFromStr(t, "region=west,dc=1,az=1")},
}}

cluster, _, _, cleanupFn := backupRestoreTestSetupWithParams(t, 3 /* nodes */, numAccounts, InitManualReplication, args)
defer cleanupFn()

for i, tc := range []struct {
node int
filter, err string
}{
{node: 1, filter: "region=east", err: ""},
{node: 1, filter: "region=east,dc=1", err: ""},
{node: 1, filter: "region=east,dc=6", err: "no instances found"},
{node: 1, filter: "region=central", err: "no instances found"},
{node: 1, filter: "region=east,dc=2", err: "relocated"},
{node: 1, filter: "region=west,dc=1", err: "relocated"},

{node: 2, filter: "region=east", err: ""},
{node: 2, filter: "region=east,az=2", err: ""},
{node: 2, filter: "region=east,dc=1", err: "relocated"},
{node: 2, filter: "region=east,az=1", err: "relocated"},

{node: 3, filter: "region=east", err: "relocated"},
{node: 3, filter: "region=central,dc=1", err: "no instances found"},
} {
db := sqlutils.MakeSQLRunner(cluster.ServerConn(tc.node - 1))
db.ExpectErr(t, tc.err, "BACKUP system.users INTO $1 WITH coordinator_locality = $2", fmt.Sprintf("userfile:///tc%d", i), tc.filter)
}
}
16 changes: 10 additions & 6 deletions pkg/jobs/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ func IsPermanentJobError(err error) bool {
return errors.Is(err, errJobPermanentSentinel)
}

// IsPauseSelfError checks whether the given error is a
// PauseRequestError.
func IsPauseSelfError(err error) bool {
return errors.Is(err, errPauseSelfSentinel)
}

// errPauseSelfSentinel exists so the errors returned from PauseRequestErr can
// be marked with it.
var errPauseSelfSentinel = errors.New("job requested it be paused")
Expand All @@ -65,9 +59,19 @@ func MarkPauseRequestError(reason error) error {
return errors.Mark(reason, errPauseSelfSentinel)
}

// IsPauseSelfError checks whether the given error is a
// PauseRequestError.
func IsPauseSelfError(err error) bool {
return errors.Is(err, errPauseSelfSentinel)
}

// PauseRequestExplained is a prose used to wrap and explain a pause-request error.
const PauseRequestExplained = "pausing due to error; use RESUME JOB to try to proceed once the issue is resolved, or CANCEL JOB to rollback"

// errJobLeaseNotHeld is a marker error for returning from a job execution if it
// knows or finds out it no longer has a job lease.
var errJobLeaseNotHeld = errors.New("job lease not held")

// InvalidStatusError is the error returned when the desired operation is
// invalid given the job's current status.
type InvalidStatusError struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ message BackupDetails {
// ApplicationName is the application name in the session where the backup was
// invoked.
string application_name = 23;

roachpb.Locality coordinator_location = 24 [(gogoproto.nullable) = false];

// NEXT ID: 25;
}

message BackupProgress {
Expand Down
20 changes: 20 additions & 0 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1604,6 +1604,9 @@ func (r *Registry) stepThroughStateMachine(
jm.ResumeRetryError.Inc(1)
return errors.Errorf("job %d: node liveness error: restarting in background", job.ID())
}
if errors.Is(err, errJobLeaseNotHeld) {
return err
}

if errors.Is(err, errPauseSelfSentinel) {
if err := r.PauseRequested(ctx, nil, job.ID(), err.Error()); err != nil {
Expand Down Expand Up @@ -1901,6 +1904,23 @@ func (r *Registry) CheckPausepoint(name string) error {
return nil
}

func (r *Registry) RelocateLease(
ctx context.Context,
txn isql.Txn,
id jobspb.JobID,
destID base.SQLInstanceID,
destSession sqlliveness.SessionID,
) (sentinel error, failure error) {
if _, err := r.db.Executor().Exec(ctx, "job-relocate-coordinator", txn.KV(),
"UPDATE system.jobs SET claim_instance_id = $2, claim_session_id = $3 WHERE id = $1",
id, destID, destSession.UnsafeBytes(),
); err != nil {
return nil, errors.Wrapf(err, "failed to relocate job coordinator to %d", destID)
}

return errors.Mark(errors.Newf("execution of job %d relocated to %d", id, destID), errJobLeaseNotHeld), nil
}

// TestingIsJobIdle returns true if the job is adopted and currently idle.
func (r *Registry) TestingIsJobIdle(jobID jobspb.JobID) bool {
r.mu.Lock()
Expand Down
17 changes: 17 additions & 0 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,23 @@ func (l Locality) String() string {
return strings.Join(tiers, ",")
}

// NonEmpty returns true if the tiers are non-empty.
func (l Locality) NonEmpty() bool {
return len(l.Tiers) > 0
}

// Matches checks if this locality has a tier with a matching value for each
// tier of the passed filter, returning true if so or false if not along with
// the first tier of the filters that did not matched.
func (l Locality) Matches(filter Locality) (bool, Tier) {
for _, t := range filter.Tiers {
if v, ok := l.Find(t.Key); !ok || v != t.Value {
return false, t
}
}
return true, Tier{}
}

// Type returns the underlying type in string form. This is part of pflag's
// value interface.
func (Locality) Type() string {
Expand Down
35 changes: 35 additions & 0 deletions pkg/roachpb/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,41 @@ func TestLocalityConversions(t *testing.T) {
}
}

func TestLocalityMatches(t *testing.T) {
var empty Locality
var l Locality
require.NoError(t, l.Set("a=b,c=d,e=f"))
for _, tc := range []struct {
filter string
miss string
}{
{filter: "", miss: ""},
{filter: "a=b", miss: ""},
{filter: "a=b,c=d,e=f", miss: ""},
{filter: "c=d,e=f,a=b", miss: ""},
{filter: "a=z", miss: "a=z"},
{filter: "a=b,c=x,e=f", miss: "c=x"},
{filter: "a=b,x=y", miss: "x=y"},
} {
t.Run(fmt.Sprintf("%s-miss-%s", tc.filter, tc.miss), func(t *testing.T) {
var filter Locality
if tc.filter != "" {
require.NoError(t, filter.Set(tc.filter))
}
matches, miss := l.Matches(filter)
if tc.miss == "" {
require.True(t, matches)
} else {
require.False(t, matches)
require.Equal(t, tc.miss, miss.String())
}

emptyMatches, _ := empty.Matches(filter)
require.Equal(t, tc.filter == "", emptyMatches)
})
}
}

func TestDiversityScore(t *testing.T) {
// Keys are not considered for score, just the order, so we don't need to
// specify them.
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,28 @@ func NewDistSQLPlanner(
return dsp
}

// GetAllInstancesByLocality lists all instances that match the passed locality
// filters.
func (dsp *DistSQLPlanner) GetAllInstancesByLocality(
ctx context.Context, filter roachpb.Locality,
) ([]sqlinstance.InstanceInfo, error) {
all, err := dsp.sqlAddressResolver.GetAllInstances(ctx)
if err != nil {
return nil, err
}
var pos int
for _, n := range all {
if ok, _ := n.Locality.Matches(filter); ok {
all[pos] = n
pos++
}
}
if pos == 0 {
return nil, errors.Newf("no instances found matching locality filter %s", filter.String())
}
return all[:pos], nil
}

// GetSQLInstanceInfo gets a node descriptor by node ID.
func (dsp *DistSQLPlanner) GetSQLInstanceInfo(
sqlInstanceID base.SQLInstanceID,
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/opt/exec/execbuilder/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,7 @@ func (b *Builder) buildExistsSubquery(
// TODO(mgartner): This path should never be executed because the
// ConvertUncorrelatedExistsToCoalesceSubquery converts all uncorrelated
// Exists with Coalesce+Subquery expressions. Remove this and the execution
// support for the Exists mode. Remember to mark
// ConvertUncorrelatedExistsToCoalesceSubquery as an essential rule.
// support for the Exists mode.
plan, err := b.buildRelational(exists.Input)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1032,5 +1031,5 @@ func (b *Builder) buildRoutinePlanGenerator(
}

func expectedLazyRoutineError(typ string) error {
return errors.AssertionFailedf("expected %s to be lazily planned as routines", typ)
return errors.AssertionFailedf("expected %s to be lazily planned as a routine", typ)
}
3 changes: 3 additions & 0 deletions pkg/sql/opt/xform/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,9 @@ func (o *Optimizer) disableRulesRandom(probability float64) {
int(opt.PruneScanCols),
// Needed to ensure that the input of a RangeExpr is always an AndExpr.
int(opt.SimplifyRange),
// Needed to ensure that all uncorrelated EXISTS subqueries are
// converted to COALESCE+subquery expressions.
int(opt.ConvertUncorrelatedExistsToCoalesceSubquery),
)

var disabledRules RuleSet
Expand Down
Loading

0 comments on commit 6ca0b8c

Please sign in to comment.