diff --git a/pkg/jobs/job_scheduler.go b/pkg/jobs/job_scheduler.go index 7d819df05f03..e0a1390af6af 100644 --- a/pkg/jobs/job_scheduler.go +++ b/pkg/jobs/job_scheduler.go @@ -351,6 +351,30 @@ func (s *jobScheduler) executeSchedules( return err } +// An internal, safety valve setting to revert scheduler execution to distributed mode. +// This setting should be removed once scheduled job system no longer locks tables for excessive +// periods of time. +var schedulerRunsOnSingleNode = settings.RegisterBoolSetting( + settings.TenantReadOnly, + "jobs.scheduler.single_node_scheduler.enabled", + "execute scheduler on a single node in a cluster", + false, +) + +func (s *jobScheduler) schedulerEnabledOnThisNode(ctx context.Context) bool { + if s.ShouldRunScheduler == nil || !schedulerRunsOnSingleNode.Get(&s.Settings.SV) { + return true + } + + enabled, err := s.ShouldRunScheduler(ctx, s.DB.Clock().NowAsClockTimestamp()) + if err != nil { + log.Errorf(ctx, "error determining if the scheduler enabled: %v; will recheck after %s", + err, recheckEnabledAfterPeriod) + return false + } + return enabled +} + func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) { _ = stopper.RunAsyncTask(ctx, "job-scheduler", func(ctx context.Context) { initialDelay := getInitialScanDelay(s.TestingKnobs) @@ -361,13 +385,12 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) { } for timer := time.NewTimer(initialDelay); ; timer.Reset( - getWaitPeriod(ctx, &s.Settings.SV, jitter, s.TestingKnobs)) { + getWaitPeriod(ctx, &s.Settings.SV, s.schedulerEnabledOnThisNode, jitter, s.TestingKnobs)) { select { case <-stopper.ShouldQuiesce(): return case <-timer.C: - if !schedulerEnabledSetting.Get(&s.Settings.SV) { - log.Info(ctx, "scheduled job daemon disabled") + if !schedulerEnabledSetting.Get(&s.Settings.SV) || !s.schedulerEnabledOnThisNode(ctx) { continue } @@ -427,7 +450,11 @@ type jitterFn func(duration time.Duration) time.Duration // Returns duration to wait before scanning system.scheduled_jobs. func getWaitPeriod( - ctx context.Context, sv *settings.Values, jitter jitterFn, knobs base.ModuleTestingKnobs, + ctx context.Context, + sv *settings.Values, + enabledOnThisNode func(ctx context.Context) bool, + jitter jitterFn, + knobs base.ModuleTestingKnobs, ) time.Duration { if k, ok := knobs.(*TestingKnobs); ok && k.SchedulerDaemonScanDelay != nil { return k.SchedulerDaemonScanDelay() @@ -437,6 +464,10 @@ func getWaitPeriod( return recheckEnabledAfterPeriod } + if enabledOnThisNode != nil && !enabledOnThisNode(ctx) { + return recheckEnabledAfterPeriod + } + pace := schedulerPaceSetting.Get(sv) if pace < minPacePeriod { if warnIfPaceTooLow.ShouldLog() { @@ -481,5 +512,9 @@ func StartJobSchedulerDaemon( return } + if daemonKnobs != nil && daemonKnobs.CaptureJobScheduler != nil { + daemonKnobs.CaptureJobScheduler(scheduler) + } + scheduler.runDaemon(ctx, stopper) } diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index b68230cb08fb..4a2588f18309 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -221,22 +222,23 @@ func TestJobSchedulerDaemonGetWaitPeriod(t *testing.T) { sv, cleanup := getScopedSettings() defer cleanup() + var schedulerEnabled func(context.Context) bool noJitter := func(d time.Duration) time.Duration { return d } schedulerEnabledSetting.Override(ctx, sv, false) // When disabled, we wait 5 minutes before rechecking. - require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, noJitter, nil)) + require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil)) schedulerEnabledSetting.Override(ctx, sv, true) // When pace is too low, we use something more reasonable. schedulerPaceSetting.Override(ctx, sv, time.Nanosecond) - require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, noJitter, nil)) + require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil)) // Otherwise, we use user specified setting. pace := 42 * time.Second schedulerPaceSetting.Override(ctx, sv, pace) - require.EqualValues(t, pace, getWaitPeriod(ctx, sv, noJitter, nil)) + require.EqualValues(t, pace, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil)) } type recordScheduleExecutor struct { @@ -762,3 +764,65 @@ INSERT INTO defaultdb.foo VALUES(1, 1) updated := h.loadSchedule(t, schedule.ScheduleID()) require.Equal(t, "", updated.ScheduleStatus()) } + +func TestSchedulerCanBeRestrictedToSingleNode(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const numNodes = 3 + for _, enableSingleNode := range []bool{true, false} { + t.Run(fmt.Sprintf("runs-on-single-node=%t", enableSingleNode), func(t *testing.T) { + schedulers := struct { + syncutil.Mutex + schedulers []*jobScheduler + }{} + knobs := &TestingKnobs{ + CaptureJobScheduler: func(s interface{}) { + schedulers.Lock() + defer schedulers.Unlock() + schedulers.schedulers = append(schedulers.schedulers, s.(*jobScheduler)) + }, + } + + args := base.TestServerArgs{ + Knobs: base.TestingKnobs{JobsTestingKnobs: knobs}, + } + + tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ServerArgs: args}) + defer tc.Stopper().Stop(context.Background()) + + testutils.SucceedsSoon(t, func() error { + schedulers.Lock() + defer schedulers.Unlock() + if len(schedulers.schedulers) == numNodes { + return nil + } + return errors.Newf("want %d schedules, got %d", numNodes, len(schedulers.schedulers)) + }) + + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + sqlDB.Exec(t, "SET CLUSTER SETTING jobs.scheduler.single_node_scheduler.enabled=$1", enableSingleNode) + + schedulers.Lock() + defer schedulers.Unlock() + expectedEnabled := numNodes + if enableSingleNode { + expectedEnabled = 1 + } + + testutils.SucceedsSoon(t, func() error { + numEnabled := 0 + for _, s := range schedulers.schedulers { + if s.schedulerEnabledOnThisNode(context.Background()) { + numEnabled++ + } + } + if numEnabled == expectedEnabled { + return nil + } + return errors.Newf("expecting %d enabled, found %d", expectedEnabled, numEnabled) + }) + + }) + } +} diff --git a/pkg/jobs/testing_knobs.go b/pkg/jobs/testing_knobs.go index 5f863bf7767b..2474b742d7cb 100644 --- a/pkg/jobs/testing_knobs.go +++ b/pkg/jobs/testing_knobs.go @@ -38,6 +38,11 @@ type TestingKnobs struct { // may invoke directly, bypassing normal job scheduler daemon logic. TakeOverJobsScheduling func(func(ctx context.Context, maxSchedules int64, txn *kv.Txn) error) + // CaptureJobScheduler is a function which will be passed a fully constructed job scheduler. + // The scheduler is passed in as interface{} because jobScheduler is an unexported type. + // This testing knob is useful only for job scheduler tests. + CaptureJobScheduler func(scheduler interface{}) + // CaptureJobExecutionConfig is a callback invoked with a job execution config // which will be used when executing job schedules. // The reason this callback exists is due to a circular dependency issues that exists diff --git a/pkg/scheduledjobs/BUILD.bazel b/pkg/scheduledjobs/BUILD.bazel index 87164e407736..71f3268baea9 100644 --- a/pkg/scheduledjobs/BUILD.bazel +++ b/pkg/scheduledjobs/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/security", "//pkg/settings/cluster", "//pkg/sql/sqlutil", + "//pkg/util/hlc", "//pkg/util/timeutil", ], ) diff --git a/pkg/scheduledjobs/env.go b/pkg/scheduledjobs/env.go index ef57d0632661..231594721bc5 100644 --- a/pkg/scheduledjobs/env.go +++ b/pkg/scheduledjobs/env.go @@ -11,6 +11,7 @@ package scheduledjobs import ( + "context" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -19,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -53,6 +55,9 @@ type JobExecutionConfig struct { // function that must be called once the caller is done with the planner. // This is the same mechanism used in jobs.Registry. PlanHookMaker func(opName string, tnx *kv.Txn, user security.SQLUsername) (interface{}, func()) + // ShouldRunScheduler, if set, returns true if the job scheduler should run + // schedules. This callback should be re-checked periodically. + ShouldRunScheduler func(ctx context.Context, ts hlc.ClockTimestamp) (bool, error) } // production JobSchedulerEnv implementation. diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index f0193dc13ca9..a557eb252eea 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -151,6 +151,8 @@ type SQLServer struct { systemConfigWatcher *systemconfigwatcher.Cache + isMeta1Leaseholder func(context.Context, hlc.ClockTimestamp) (bool, error) + // pgL is the shared RPC/SQL listener, opened when RPC was initialized. pgL net.Listener // connManager is the connection manager to use to set up additional @@ -1021,6 +1023,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { spanconfigSQLWatcher: spanConfig.sqlWatcher, settingsWatcher: settingsWatcher, systemConfigWatcher: cfg.systemConfigWatcher, + isMeta1Leaseholder: cfg.isMeta1Leaseholder, }, nil } @@ -1237,6 +1240,12 @@ func (s *SQLServer) preStart( sessiondatapb.SessionData{}, ) }, + ShouldRunScheduler: func(ctx context.Context, ts hlc.ClockTimestamp) (bool, error) { + if s.execCfg.Codec.ForSystemTenant() { + return s.isMeta1Leaseholder(ctx, ts) + } + return true, nil + }, }, scheduledjobs.ProdJobSchedulerEnv, )