Skip to content

Commit

Permalink
jobs: Execute scheduled jobs on a single node in the cluster.
Browse files Browse the repository at this point in the history
Execute scheduled jobs daemon on a single node -- namely, the lease
holder for meta1 range lease holder.

Prior to this change, scheduling daemon was running on each node,
polling scheduled jobs table periodically with a `FOR UPDATE` clause.
Unfortunately, job planning phase (namely, the backup planning phase) could
take significant amount of time.  In such situation, the entirety
of the scheduled jobs table would be locked, resulting in inability
to introspect the state of schedules (or jobs) via `SHOW SCHEDULES` or similar
statements.

Furthermore, dropping `FOR UPDATE` clause by itself is not ideal because
that would lead to expensive backup planning being executed on almost every
node, with all but 1 node making progress.

The single node mode is turned on by default, but can be reverted back
to its prior state via a `jobs.scheduler.single_node_scheduler.enabled` setting.

Release Notes: scheduled jobs scheduler now runs on a single node by default
in order to reduce contention on scheduled jobs table.
  • Loading branch information
Yevgeniy Miretskiy committed Feb 18, 2022
1 parent 4996177 commit 0e2461e
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 7 deletions.
43 changes: 39 additions & 4 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,30 @@ func (s *jobScheduler) executeSchedules(
return err
}

// An internal, safety valve setting to revert scheduler execution to distributed mode.
// This setting should be removed once scheduled job system no longer locks tables for excessive
// periods of time.
var schedulerRunsOnSingleNode = settings.RegisterBoolSetting(
settings.TenantReadOnly,
"jobs.scheduler.single_node_scheduler.enabled",
"execute scheduler on a single node in a cluster",
false,
)

func (s *jobScheduler) schedulerEnabledOnThisNode(ctx context.Context) bool {
if s.ShouldRunScheduler == nil || !schedulerRunsOnSingleNode.Get(&s.Settings.SV) {
return true
}

enabled, err := s.ShouldRunScheduler(ctx, s.DB.Clock().NowAsClockTimestamp())
if err != nil {
log.Errorf(ctx, "error determining if the scheduler enabled: %v; will recheck after %s",
err, recheckEnabledAfterPeriod)
return false
}
return enabled
}

func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
_ = stopper.RunAsyncTask(ctx, "job-scheduler", func(ctx context.Context) {
initialDelay := getInitialScanDelay(s.TestingKnobs)
Expand All @@ -361,13 +385,12 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
}

for timer := time.NewTimer(initialDelay); ; timer.Reset(
getWaitPeriod(ctx, &s.Settings.SV, jitter, s.TestingKnobs)) {
getWaitPeriod(ctx, &s.Settings.SV, s.schedulerEnabledOnThisNode, jitter, s.TestingKnobs)) {
select {
case <-stopper.ShouldQuiesce():
return
case <-timer.C:
if !schedulerEnabledSetting.Get(&s.Settings.SV) {
log.Info(ctx, "scheduled job daemon disabled")
if !schedulerEnabledSetting.Get(&s.Settings.SV) || !s.schedulerEnabledOnThisNode(ctx) {
continue
}

Expand Down Expand Up @@ -427,7 +450,11 @@ type jitterFn func(duration time.Duration) time.Duration

// Returns duration to wait before scanning system.scheduled_jobs.
func getWaitPeriod(
ctx context.Context, sv *settings.Values, jitter jitterFn, knobs base.ModuleTestingKnobs,
ctx context.Context,
sv *settings.Values,
enabledOnThisNode func(ctx context.Context) bool,
jitter jitterFn,
knobs base.ModuleTestingKnobs,
) time.Duration {
if k, ok := knobs.(*TestingKnobs); ok && k.SchedulerDaemonScanDelay != nil {
return k.SchedulerDaemonScanDelay()
Expand All @@ -437,6 +464,10 @@ func getWaitPeriod(
return recheckEnabledAfterPeriod
}

if enabledOnThisNode != nil && !enabledOnThisNode(ctx) {
return recheckEnabledAfterPeriod
}

pace := schedulerPaceSetting.Get(sv)
if pace < minPacePeriod {
if warnIfPaceTooLow.ShouldLog() {
Expand Down Expand Up @@ -481,5 +512,9 @@ func StartJobSchedulerDaemon(
return
}

if daemonKnobs != nil && daemonKnobs.CaptureJobScheduler != nil {
daemonKnobs.CaptureJobScheduler(scheduler)
}

scheduler.runDaemon(ctx, stopper)
}
70 changes: 67 additions & 3 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -221,22 +222,23 @@ func TestJobSchedulerDaemonGetWaitPeriod(t *testing.T) {
sv, cleanup := getScopedSettings()
defer cleanup()

var schedulerEnabled func(context.Context) bool
noJitter := func(d time.Duration) time.Duration { return d }

schedulerEnabledSetting.Override(ctx, sv, false)

// When disabled, we wait 5 minutes before rechecking.
require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, noJitter, nil))
require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil))
schedulerEnabledSetting.Override(ctx, sv, true)

// When pace is too low, we use something more reasonable.
schedulerPaceSetting.Override(ctx, sv, time.Nanosecond)
require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, noJitter, nil))
require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil))

// Otherwise, we use user specified setting.
pace := 42 * time.Second
schedulerPaceSetting.Override(ctx, sv, pace)
require.EqualValues(t, pace, getWaitPeriod(ctx, sv, noJitter, nil))
require.EqualValues(t, pace, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil))
}

type recordScheduleExecutor struct {
Expand Down Expand Up @@ -762,3 +764,65 @@ INSERT INTO defaultdb.foo VALUES(1, 1)
updated := h.loadSchedule(t, schedule.ScheduleID())
require.Equal(t, "", updated.ScheduleStatus())
}

func TestSchedulerCanBeRestrictedToSingleNode(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numNodes = 3
for _, enableSingleNode := range []bool{true, false} {
t.Run(fmt.Sprintf("runs-on-single-node=%t", enableSingleNode), func(t *testing.T) {
schedulers := struct {
syncutil.Mutex
schedulers []*jobScheduler
}{}
knobs := &TestingKnobs{
CaptureJobScheduler: func(s interface{}) {
schedulers.Lock()
defer schedulers.Unlock()
schedulers.schedulers = append(schedulers.schedulers, s.(*jobScheduler))
},
}

args := base.TestServerArgs{
Knobs: base.TestingKnobs{JobsTestingKnobs: knobs},
}

tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ServerArgs: args})
defer tc.Stopper().Stop(context.Background())

testutils.SucceedsSoon(t, func() error {
schedulers.Lock()
defer schedulers.Unlock()
if len(schedulers.schedulers) == numNodes {
return nil
}
return errors.Newf("want %d schedules, got %d", numNodes, len(schedulers.schedulers))
})

sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.scheduler.single_node_scheduler.enabled=$1", enableSingleNode)

schedulers.Lock()
defer schedulers.Unlock()
expectedEnabled := numNodes
if enableSingleNode {
expectedEnabled = 1
}

testutils.SucceedsSoon(t, func() error {
numEnabled := 0
for _, s := range schedulers.schedulers {
if s.schedulerEnabledOnThisNode(context.Background()) {
numEnabled++
}
}
if numEnabled == expectedEnabled {
return nil
}
return errors.Newf("expecting %d enabled, found %d", expectedEnabled, numEnabled)
})

})
}
}
5 changes: 5 additions & 0 deletions pkg/jobs/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type TestingKnobs struct {
// may invoke directly, bypassing normal job scheduler daemon logic.
TakeOverJobsScheduling func(func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error)

// CaptureJobScheduler is a function which will be passed a fully constructed job scheduler.
// The scheduler is passed in as interface{} because jobScheduler is an unexported type.
// This testing knob is useful only for job scheduler tests.
CaptureJobScheduler func(scheduler interface{})

// CaptureJobExecutionConfig is a callback invoked with a job execution config
// which will be used when executing job schedules.
// The reason this callback exists is due to a circular dependency issues that exists
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduledjobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//pkg/security",
"//pkg/settings/cluster",
"//pkg/sql/sqlutil",
"//pkg/util/hlc",
"//pkg/util/timeutil",
],
)
5 changes: 5 additions & 0 deletions pkg/scheduledjobs/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package scheduledjobs

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand Down Expand Up @@ -53,6 +55,9 @@ type JobExecutionConfig struct {
// function that must be called once the caller is done with the planner.
// This is the same mechanism used in jobs.Registry.
PlanHookMaker func(opName string, tnx *kv.Txn, user security.SQLUsername) (interface{}, func())
// ShouldRunScheduler, if set, returns true if the job scheduler should run
// schedules. This callback should be re-checked periodically.
ShouldRunScheduler func(ctx context.Context, ts hlc.ClockTimestamp) (bool, error)
}

// production JobSchedulerEnv implementation.
Expand Down
9 changes: 9 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ type SQLServer struct {

systemConfigWatcher *systemconfigwatcher.Cache

isMeta1Leaseholder func(context.Context, hlc.ClockTimestamp) (bool, error)

// pgL is the shared RPC/SQL listener, opened when RPC was initialized.
pgL net.Listener
// connManager is the connection manager to use to set up additional
Expand Down Expand Up @@ -1021,6 +1023,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
spanconfigSQLWatcher: spanConfig.sqlWatcher,
settingsWatcher: settingsWatcher,
systemConfigWatcher: cfg.systemConfigWatcher,
isMeta1Leaseholder: cfg.isMeta1Leaseholder,
}, nil
}

Expand Down Expand Up @@ -1237,6 +1240,12 @@ func (s *SQLServer) preStart(
sessiondatapb.SessionData{},
)
},
ShouldRunScheduler: func(ctx context.Context, ts hlc.ClockTimestamp) (bool, error) {
if s.execCfg.Codec.ForSystemTenant() {
return s.isMeta1Leaseholder(ctx, ts)
}
return true, nil
},
},
scheduledjobs.ProdJobSchedulerEnv,
)
Expand Down

0 comments on commit 0e2461e

Please sign in to comment.