diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 60f06901f702..0a76ee9da209 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -133,7 +133,8 @@ type TestServerArgs struct { // If set, a TraceDir is initialized at the provided path. TraceDir string - // If set, the span configs infrastructure will be enabled. + // If set, the span configs infrastructure will be enabled. This is + // equivalent to setting COCKROACH_EXPERIMENTAL_SPAN_CONFIGS. EnableSpanConfigs bool } diff --git a/pkg/cli/testdata/doctor/test_examine_cluster b/pkg/cli/testdata/doctor/test_examine_cluster index 1c2777f765b6..ee4652c7bbf7 100644 --- a/pkg/cli/testdata/doctor/test_examine_cluster +++ b/pkg/cli/testdata/doctor/test_examine_cluster @@ -3,5 +3,5 @@ debug doctor examine cluster debug doctor examine cluster Examining 41 descriptors and 42 namespace entries... ParentID 50, ParentSchemaID 29: relation "foo" (53): expected matching namespace entry, found none -Examining 4 jobs... +Examining 3 jobs... ERROR: validation failed diff --git a/pkg/clusterversion/clusterversion.go b/pkg/clusterversion/clusterversion.go index 56b614524884..3f7f27870289 100644 --- a/pkg/clusterversion/clusterversion.go +++ b/pkg/clusterversion/clusterversion.go @@ -131,6 +131,12 @@ type Handle interface { // node restarts when initializing the cluster version, as seen by this // node. SetActiveVersion(context.Context, ClusterVersion) error + + // SetOnChange installs a callback that's invoked when the active cluster + // version changes. The callback should avoid doing long-running or blocking + // work; it's called on the same goroutine handling all cluster setting + // updates. + SetOnChange(fn func(context.Context)) } // handleImpl is a concrete implementation of Handle. It mostly relegates to the @@ -205,6 +211,11 @@ func (v *handleImpl) SetActiveVersion(ctx context.Context, cv ClusterVersion) er return nil } +// SetOnChange implements the Handle interface. +func (v *handleImpl) SetOnChange(fn func(ctx context.Context)) { + version.SetOnChange(v.sv, fn) +} + // IsActive implements the Handle interface. func (v *handleImpl) IsActive(ctx context.Context, key Key) bool { return version.isActive(ctx, v.sv, key) diff --git a/pkg/server/config.go b/pkg/server/config.go index 9fefc0e47a67..ad3d81669fa8 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -149,6 +149,11 @@ type BaseConfig struct { // instantiate stores. StorageEngine enginepb.EngineType + // Enables the use of the (experimental) span configs infrastructure. + // + // Environment Variable: COCKROACH_EXPERIMENTAL_SPAN_CONFIGS + SpanConfigsEnabled bool + // TestingKnobs is used for internal test controls only. TestingKnobs base.TestingKnobs } @@ -222,11 +227,6 @@ type KVConfig struct { // The following values can only be set via environment variables and are // for testing only. They are not meant to be set by the end user. - // Enables the use of the (experimental) span configs infrastructure. - // - // Environment Variable: COCKROACH_EXPERIMENTAL_SPAN_CONFIGS - SpanConfigsEnabled bool - // Enables linearizable behavior of operations on this node by making sure // that no commit timestamp is reported back to the client until all other // node clocks have necessarily passed it. diff --git a/pkg/server/node.go b/pkg/server/node.go index bb44118ee370..59ac2182aefb 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1447,10 +1447,6 @@ func (emptyMetricStruct) MetricStruct() {} func (n *Node) GetSpanConfigs( ctx context.Context, req *roachpb.GetSpanConfigsRequest, ) (*roachpb.GetSpanConfigsResponse, error) { - if !n.storeCfg.SpanConfigsEnabled { - return nil, errors.New("use of span configs disabled") - } - entries, err := n.spanConfigAccessor.GetSpanConfigEntriesFor(ctx, req.Spans) if err != nil { return nil, err @@ -1463,9 +1459,6 @@ func (n *Node) GetSpanConfigs( func (n *Node) UpdateSpanConfigs( ctx context.Context, req *roachpb.UpdateSpanConfigsRequest, ) (*roachpb.UpdateSpanConfigsResponse, error) { - if !n.storeCfg.SpanConfigsEnabled { - return nil, errors.New("use of span configs disabled") - } // TODO(irfansharif): We want to protect ourselves from tenants creating // outlandishly large string buffers here and OOM-ing the host cluster. Is // the maximum protobuf message size enough of a safeguard? diff --git a/pkg/server/server.go b/pkg/server/server.go index 572868ccfde9..1ded84bde987 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -636,6 +636,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { db, internalExecutor, cfg.Settings, systemschema.SpanConfigurationsTableName.FQString(), ) + } else { + spanConfigAccessor = spanconfigkvaccessor.DisabledAccessor{} } if storeTestingKnobs := cfg.TestingKnobs.Store; storeTestingKnobs != nil { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 6b747ff05b08..75112378da1c 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -829,20 +829,22 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { execCfg.MigrationTestingKnobs = knobs } - // Instantiate a span config manager; it exposes a hook to idempotently - // create the span config reconciliation job and captures all relevant job - // dependencies. - spanConfigKnobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs) - spanConfigMgr := spanconfigmanager.New( - cfg.db, - jobRegistry, - cfg.circularInternalExecutor, - cfg.stopper, - cfg.Settings, - cfg.spanConfigAccessor, - spanConfigKnobs, - ) - execCfg.SpanConfigReconciliationJobDeps = spanConfigMgr + var spanConfigMgr *spanconfigmanager.Manager + if !codec.ForSystemTenant() || cfg.SpanConfigsEnabled { + // Instantiate a span config manager. If we're the host tenant we'll + // only do it if COCKROACH_EXPERIMENTAL_SPAN_CONFIGS is set. + spanConfigKnobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs) + spanConfigMgr = spanconfigmanager.New( + cfg.db, + jobRegistry, + cfg.circularInternalExecutor, + cfg.stopper, + cfg.Settings, + cfg.spanConfigAccessor, + spanConfigKnobs, + ) + execCfg.SpanConfigReconciliationJobDeps = spanConfigMgr + } temporaryObjectCleaner := sql.NewTemporaryObjectCleaner( cfg.Settings, @@ -1043,8 +1045,10 @@ func (s *SQLServer) preStart( return err } - if err := s.spanconfigMgr.Start(ctx); err != nil { - return err + if s.spanconfigMgr != nil { + if err := s.spanconfigMgr.Start(ctx); err != nil { + return err + } } var bootstrapVersion roachpb.Version diff --git a/pkg/settings/version.go b/pkg/settings/version.go index 0a0c48d3ddf5..8daa437776bc 100644 --- a/pkg/settings/version.go +++ b/pkg/settings/version.go @@ -176,15 +176,3 @@ func TestingRegisterVersionSetting(key, desc string, impl VersionSettingImpl) *V register(key, desc, &setting) return &setting } - -// SetOnChange is part of the Setting interface, and is discouraged for use in -// VersionSetting (we're implementing it here to not fall back on the embedded -// `common` type definition). -// -// NB: VersionSetting is unique in more ways than one, and we might want to move -// it out of the settings package before long (see TODO on the type itself). In -// our current usage we don't rely on attaching pre-change triggers, so let's -// not add it needlessly. -func (v *VersionSetting) SetOnChange(sv *Values, fn func(ctx context.Context)) { - panic("unimplemented") -} diff --git a/pkg/spanconfig/spanconfigkvaccessor/BUILD.bazel b/pkg/spanconfig/spanconfigkvaccessor/BUILD.bazel index 1794734ec43a..179abd8d5633 100644 --- a/pkg/spanconfig/spanconfigkvaccessor/BUILD.bazel +++ b/pkg/spanconfig/spanconfigkvaccessor/BUILD.bazel @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "spanconfigkvaccessor", - srcs = ["kvaccessor.go"], + srcs = [ + "disabled.go", + "kvaccessor.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor", visibility = ["//visibility:public"], deps = [ diff --git a/pkg/spanconfig/spanconfigkvaccessor/disabled.go b/pkg/spanconfig/spanconfigkvaccessor/disabled.go new file mode 100644 index 000000000000..7ae39c4286b7 --- /dev/null +++ b/pkg/spanconfig/spanconfigkvaccessor/disabled.go @@ -0,0 +1,39 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvaccessor + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/errors" +) + +// DisabledAccessor provides a implementation of the KVAccessor interface that +// simply errors out with a disabled error. +type DisabledAccessor struct{} + +// GetSpanConfigEntriesFor is part of the KVAccessor interface. +func (n DisabledAccessor) GetSpanConfigEntriesFor( + context.Context, []roachpb.Span, +) ([]roachpb.SpanConfigEntry, error) { + return nil, errors.New("span configs disabled") +} + +// UpdateSpanConfigEntries is part of the KVAccessor interface. +func (n DisabledAccessor) UpdateSpanConfigEntries( + context.Context, []roachpb.Span, []roachpb.SpanConfigEntry, +) error { + return errors.New("span configs disabled") +} + +var _ spanconfig.KVAccessor = &DisabledAccessor{} diff --git a/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go b/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go index c340083de24c..edb2f22a2589 100644 --- a/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go +++ b/pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go @@ -52,19 +52,22 @@ func New( } } -// enabledSetting is a hidden cluster setting that gates usage of the -// KVAccessor. It has no effect unless COCKROACH_EXPERIMENTAL_SPAN_CONFIGS is -// also set. +// enabledSetting gates usage of the KVAccessor. It has no effect unless +// COCKROACH_EXPERIMENTAL_SPAN_CONFIGS is also set. var enabledSetting = settings.RegisterBoolSetting( "spanconfig.experimental_kvaccessor.enabled", "enable the use of the kv accessor", false).WithSystemOnly() +// errDisabled is returned if the setting gating usage of the KVAccessor is +// disabled. +var errDisabled = errors.New("span config kv accessor disabled") + // GetSpanConfigEntriesFor is part of the KVAccessor interface. func (k *KVAccessor) GetSpanConfigEntriesFor( ctx context.Context, spans []roachpb.Span, ) (resp []roachpb.SpanConfigEntry, retErr error) { if !enabledSetting.Get(&k.settings.SV) { - return nil, errors.New("use of span configs disabled") + return nil, errDisabled } if len(spans) == 0 { @@ -116,7 +119,7 @@ func (k *KVAccessor) UpdateSpanConfigEntries( ctx context.Context, toDelete []roachpb.Span, toUpsert []roachpb.SpanConfigEntry, ) error { if !enabledSetting.Get(&k.settings.SV) { - return errors.New("use of span configs disabled") + return errDisabled } if err := validateUpdateArgs(toDelete, toUpsert); err != nil { diff --git a/pkg/spanconfig/spanconfigmanager/BUILD.bazel b/pkg/spanconfig/spanconfigmanager/BUILD.bazel index 8f5131b63899..c7eec6981a82 100644 --- a/pkg/spanconfig/spanconfigmanager/BUILD.bazel +++ b/pkg/spanconfig/spanconfigmanager/BUILD.bazel @@ -17,8 +17,6 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/spanconfig", - "//pkg/sql/sem/tree", - "//pkg/sql/sessiondata", "//pkg/sql/sqlutil", "//pkg/util/log", "//pkg/util/stop", @@ -35,14 +33,18 @@ go_test( deps = [ ":spanconfigmanager", "//pkg/base", + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", + "//pkg/settings/cluster", "//pkg/spanconfig", "//pkg/sql", + "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/protoutil", diff --git a/pkg/spanconfig/spanconfigmanager/manager.go b/pkg/spanconfig/spanconfigmanager/manager.go index 97306884a608..20d4a222931b 100644 --- a/pkg/spanconfig/spanconfigmanager/manager.go +++ b/pkg/spanconfig/spanconfigmanager/manager.go @@ -12,7 +12,6 @@ package spanconfigmanager import ( "context" - "fmt" "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" @@ -23,31 +22,35 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) -// checkAndStartReconciliationJobInterval is a cluster setting to control how -// often the existence of the automatic span config reconciliation job will be -// checked. If the check concludes that the job doesn't exist it will be started. -var checkAndStartReconciliationJobInterval = settings.RegisterDurationSetting( - "spanconfig.reconciliation_job.check_interval", +// checkReconciliationJobInterval is a cluster setting to control how often we +// check if the span config reconciliation job exists. If it's not found, it +// will be started. It has no effect unless +// spanconfig.experimental_reconciliation.enabled is configured. For host +// tenants, COCKROACH_EXPERIMENTAL_SPAN_CONFIGS needs to be additionally set. +var checkReconciliationJobInterval = settings.RegisterDurationSetting( + "spanconfig.experimental_reconciliation_job.check_interval", "the frequency at which to check if the span config reconciliation job exists (and to start it if not)", 10*time.Minute, settings.NonNegativeDuration, ) -// Manager is the coordinator of the span config subsystem. It is responsible -// for the following tasks: +// jobEnabledSetting gates the activation of the span config reconciliation job. // -// 1. Ensuring that one (and only one) span config reconciliation job exists for -// every tenant. -// 2. Encapsulating all dependencies required by the span config reconciliation -// job to perform its task. +// For the host tenant it has no effect unless +// COCKROACH_EXPERIMENTAL_SPAN_CONFIGS is also set. +var jobEnabledSetting = settings.RegisterBoolSetting( + "spanconfig.experimental_reconciliation_job.enabled", + "enable the use of the kv accessor", false) + +// Manager is the coordinator of the span config subsystem. It ensures that +// there's only one span config reconciliation job for every tenant. It also +// captures all relevant dependencies for the job. type Manager struct { db *kv.DB jr *jobs.Registry @@ -86,10 +89,8 @@ func New( } // Start creates a background task that starts the auto span config -// reconciliation job. The background task also periodically (as dictated by the -// cluster setting) checks to ensure that the job exists. We don't expect this -// to happen, but if the periodic check indicates that the job doesn't exist, it -// will be started again. +// reconciliation job. It also periodically ensures that the job exists, +// recreating it if it doesn't. func (m *Manager) Start(ctx context.Context) error { return m.stopper.RunAsyncTask(ctx, "span-config-mgr", func(ctx context.Context) { m.run(ctx) @@ -97,39 +98,69 @@ func (m *Manager) Start(ctx context.Context) error { } func (m *Manager) run(ctx context.Context) { - reconciliationIntervalChanged := make(chan struct{}, 1) - checkAndStartReconciliationJobInterval.SetOnChange( - &m.settings.SV, func(ctx context.Context) { - select { - case reconciliationIntervalChanged <- struct{}{}: - default: - } - }) - - lastChecked := time.Time{} + jobCheckCh := make(chan struct{}, 1) + triggerJobCheck := func() { + select { + case jobCheckCh <- struct{}{}: + default: + } + } + + // We have a few conditions that should trigger a job check: + // - when the setting to enable/disable the reconciliation job is toggled; + // - when the setting controlling the reconciliation job check interval is + // changed; + // - when the cluster version is changed; if we don't it's possible to have + // started a tenant pod with a conservative view of the cluster version, + // skip starting the reconciliation job, learning about the cluster + // version shortly, and only checking the job after an interval has + // passed. + jobEnabledSetting.SetOnChange(&m.settings.SV, func(ctx context.Context) { + triggerJobCheck() + }) + checkReconciliationJobInterval.SetOnChange(&m.settings.SV, func(ctx context.Context) { + triggerJobCheck() + }) + m.settings.Version.SetOnChange(func(ctx context.Context) { + triggerJobCheck() + }) + + checkJob := func() { + if fn := m.knobs.ManagerCheckJobInterceptor; fn != nil { + fn() + } + + if !jobEnabledSetting.Get(&m.settings.SV) || + !m.settings.Version.IsActive(ctx, clusterversion.AutoSpanConfigReconciliationJob) { + return + } + + started, err := m.createAndStartJobIfNoneExists(ctx) + if err != nil { + log.Errorf(ctx, "error starting auto span config reconciliation job: %v", err) + } + if started { + log.Infof(ctx, "started auto span config reconciliation job") + } + } + + // Periodically check if the span config reconciliation job exists and start + // it if it doesn't. timer := timeutil.NewTimer() defer timer.Stop() - // Periodically check the span config reconciliation job exists and start it - // if for some reason it does not. + + triggerJobCheck() for { - timer.Reset(timeutil.Until( - lastChecked.Add(checkAndStartReconciliationJobInterval.Get(&m.settings.SV)), - )) + timer.Reset(checkReconciliationJobInterval.Get(&m.settings.SV)) select { + case <-jobCheckCh: + timer.Read = true + + checkJob() case <-timer.C: timer.Read = true - if m.settings.Version.IsActive(ctx, clusterversion.AutoSpanConfigReconciliationJob) { - started, err := m.createAndStartJobIfNoneExists(ctx) - if err != nil { - log.Errorf(ctx, "error starting auto span config reconciliation job: %v", err) - } - if started { - log.Infof(ctx, "started auto span config reconciliation job") - } - } - lastChecked = timeutil.Now() - case <-reconciliationIntervalChanged: - // loop back around + + checkJob() case <-m.stopper.ShouldQuiesce(): return case <-ctx.Done(): @@ -140,7 +171,7 @@ func (m *Manager) run(ctx context.Context) { // createAndStartJobIfNoneExists creates span config reconciliation job iff it // hasn't been created already and notifies the jobs registry to adopt it. -// Returns a boolean indicating if the job was created. +// Returns a boolean indicating if the job was created. func (m *Manager) createAndStartJobIfNoneExists(ctx context.Context) (bool, error) { if m.knobs.ManagerDisableJobCreation { return false, nil @@ -156,8 +187,11 @@ func (m *Manager) createAndStartJobIfNoneExists(ctx context.Context) (bool, erro var job *jobs.Job if err := m.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - // TODO(arul): Switch this to use jobs.RunningJobExists once #68434 lands. - exists, err := m.checkIfReconciliationJobExists(ctx, txn) + exists, err := jobs.RunningJobExists(ctx, jobspb.InvalidJobID, m.ie, txn, + func(payload *jobspb.Payload) bool { + return payload.Type() == jobspb.TypeAutoSpanConfigReconciliation + }, + ) if err != nil { return err } @@ -187,24 +221,3 @@ func (m *Manager) createAndStartJobIfNoneExists(ctx context.Context) (bool, erro err := m.jr.NotifyToAdoptJobs(ctx) return true, err } - -// checkIfReconciliationJobExists checks if an span config reconciliation job -// already exists. -func (m *Manager) checkIfReconciliationJobExists( - ctx context.Context, txn *kv.Txn, -) (exists bool, _ error) { - stmt := fmt.Sprintf(` -SELECT EXISTS( - SELECT job_id - FROM [SHOW AUTOMATIC JOBS] - WHERE job_type = '%s' - AND status IN %s - ); -`, jobspb.TypeAutoSpanConfigReconciliation.String(), jobs.NonTerminalStatusTupleString) - row, err := m.ie.QueryRowEx(ctx, "check-if-reconciliation-job-already-exists", txn, - sessiondata.InternalExecutorOverride{User: security.RootUserName()}, stmt) - if err != nil { - return false, err - } - return bool(*row[0].(*tree.DBool)), nil -} diff --git a/pkg/spanconfig/spanconfigmanager/manager_test.go b/pkg/spanconfig/spanconfigmanager/manager_test.go index 4c684c4b356a..7849796c6709 100644 --- a/pkg/spanconfig/spanconfigmanager/manager_test.go +++ b/pkg/spanconfig/spanconfigmanager/manager_test.go @@ -12,15 +12,21 @@ package spanconfigmanager_test import ( "context" + "sync/atomic" "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigmanager" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -179,3 +185,73 @@ func TestManagerStartsJobIfFailed(t *testing.T) { require.NoError(t, err) require.True(t, started) } + +func TestManagerCheckJobConditions(t *testing.T) { + defer leaktest.AfterTest(t)() + + spanConfigJobVersion := clusterversion.ByKey(clusterversion.AutoSpanConfigReconciliationJob) + preSpanConfigJobVersion := clusterversion.ByKey(clusterversion.AutoSpanConfigReconciliationJob - 1) + settings := cluster.MakeTestingClusterSettingsWithVersions( + spanConfigJobVersion, preSpanConfigJobVersion, false, /* initializeVersion */ + ) + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Settings: settings, + EnableSpanConfigs: true, + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, // disable the automatic job creation + }, + Server: &server.TestingKnobs{ + BinaryVersionOverride: preSpanConfigJobVersion, + DisableAutomaticVersionUpgrade: 1, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0) + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + tdb.Exec(t, `SET CLUSTER SETTING spanconfig.experimental_reconciliation_job.enabled = false;`) + + var interceptCount int32 + checkInterceptCountGreaterThan := func(min int32) int32 { + var currentCount int32 + testutils.SucceedsSoon(t, func() error { + if currentCount = atomic.LoadInt32(&interceptCount); !(currentCount > min) { + return errors.Errorf("expected intercept count(=%d) > min(=%d)", + currentCount, min) + } + return nil + }) + return currentCount + } + manager := spanconfigmanager.New( + ts.DB(), + ts.JobRegistry().(*jobs.Registry), + ts.InternalExecutor().(*sql.InternalExecutor), + ts.Stopper(), + ts.ClusterSettings(), + ts.SpanConfigAccessor().(spanconfig.KVAccessor), + &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, + ManagerCheckJobInterceptor: func() { + atomic.AddInt32(&interceptCount, 1) + }, + }, + ) + var currentCount int32 + require.NoError(t, manager.Start(ctx)) + currentCount = checkInterceptCountGreaterThan(currentCount) // wait for an initial check + + tdb.Exec(t, `SET CLUSTER SETTING spanconfig.experimental_reconciliation_job.enabled = true;`) + currentCount = checkInterceptCountGreaterThan(currentCount) // the job enablement setting triggers a check + + tdb.Exec(t, `SET CLUSTER SETTING spanconfig.experimental_reconciliation_job.check_interval = '25m'`) + currentCount = checkInterceptCountGreaterThan(currentCount) // the job check interval setting triggers a check + + tdb.Exec(t, `SET CLUSTER SETTING version = $1`, spanConfigJobVersion.String()) + _ = checkInterceptCountGreaterThan(currentCount) // the cluster version setting triggers a check +} diff --git a/pkg/spanconfig/testing_knobs.go b/pkg/spanconfig/testing_knobs.go index 04c2884b61f0..bf498332d19e 100644 --- a/pkg/spanconfig/testing_knobs.go +++ b/pkg/spanconfig/testing_knobs.go @@ -18,9 +18,15 @@ type TestingKnobs struct { // ManagerDisableJobCreation disables creating the auto span config // reconciliation job. ManagerDisableJobCreation bool + + // ManagerCheckJobInterceptor, if set, is invoked when checking to see if + // the reconciliation job exists. + ManagerCheckJobInterceptor func() + // ManagerCreatedJobInterceptor expects a *jobs.Job to be passed into it. It // takes an interface here to resolve a circular dependency. ManagerCreatedJobInterceptor func(interface{}) + // ManagerAfterCheckedReconciliationJobExistsInterceptor is run after the // manager has checked if the auto span config reconciliation job exists or // not. diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index a04bb3849c9b..12b804fbf8ee 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -460,6 +460,8 @@ type testClusterConfig struct { // If true, a sql tenant server will be started and pointed at a node in the // cluster. Connections on behalf of the logic test will go to that tenant. useTenant bool + // If true, XXX: + useSpanConfigs bool // isCCLConfig should be true for any config that can only be run with a CCL // binary. isCCLConfig bool @@ -725,6 +727,11 @@ var logicTestConfigs = []testClusterConfig{ localities: multiregion9node3region3azsLocalities, overrideVectorize: "off", }, + { + name: "experimental-span-configs", + numNodes: 1, + useSpanConfigs: true, + }, } // An index in the above slice. @@ -1386,10 +1393,13 @@ func (t *logicTest) newCluster(serverArgs TestServerArgs) { ReplicationMode: base.ReplicationManual, } + cfg := t.cfg + if cfg.useSpanConfigs { + params.ServerArgs.EnableSpanConfigs = true + } distSQLKnobs := &execinfra.TestingKnobs{ MetadataTestLevel: execinfra.Off, } - cfg := t.cfg if cfg.sqlExecUseDisk { distSQLKnobs.ForceDiskSpill = true } diff --git a/pkg/sql/logictest/testdata/logic_test/auto_span_config_reconciliation_job b/pkg/sql/logictest/testdata/logic_test/auto_span_config_reconciliation_job index 84955839b37c..2b7b1b5d527e 100644 --- a/pkg/sql/logictest/testdata/logic_test/auto_span_config_reconciliation_job +++ b/pkg/sql/logictest/testdata/logic_test/auto_span_config_reconciliation_job @@ -1,3 +1,8 @@ +# LogicTest: experimental-span-configs + +statement ok +SET CLUSTER SETTING spanconfig.experimental_reconciliation_job.enabled = true; + # Ensure there's a single auto span config reconciliation job in the system, # and that it's running. query ITT colnames diff --git a/pkg/sql/logictest/testdata/logic_test/jobs b/pkg/sql/logictest/testdata/logic_test/jobs index b1c35a19b2d4..36eea8b6b0b2 100644 --- a/pkg/sql/logictest/testdata/logic_test/jobs +++ b/pkg/sql/logictest/testdata/logic_test/jobs @@ -24,11 +24,10 @@ SCHEMA CHANGE CREATE INDEX ON test.public.t (x) root query TTT SELECT job_type, description, user_name FROM crdb_internal.jobs WHERE user_name = 'root' ---- -AUTO SPAN CONFIG RECONCILIATION reconciling span configurations root -SCHEMA CHANGE updating version for users table root -SCHEMA CHANGE updating version for role options table root -SCHEMA CHANGE updating privileges for database 52 root -SCHEMA CHANGE CREATE INDEX ON test.public.t (x) root +SCHEMA CHANGE updating version for users table root +SCHEMA CHANGE updating version for role options table root +SCHEMA CHANGE updating privileges for database 52 root +SCHEMA CHANGE CREATE INDEX ON test.public.t (x) root user testuser @@ -74,12 +73,11 @@ SCHEMA CHANGE CREATE INDEX ON test.public.u (x) testuser query TTT SELECT job_type, description, user_name FROM crdb_internal.jobs WHERE user_name IN ('root', 'testuser') ---- -AUTO SPAN CONFIG RECONCILIATION reconciling span configurations root -SCHEMA CHANGE updating version for users table root -SCHEMA CHANGE updating version for role options table root -SCHEMA CHANGE updating privileges for database 52 root -SCHEMA CHANGE CREATE INDEX ON test.public.t (x) root -SCHEMA CHANGE CREATE INDEX ON test.public.u (x) testuser +SCHEMA CHANGE updating version for users table root +SCHEMA CHANGE updating version for role options table root +SCHEMA CHANGE updating privileges for database 52 root +SCHEMA CHANGE CREATE INDEX ON test.public.t (x) root +SCHEMA CHANGE CREATE INDEX ON test.public.u (x) testuser statement ok CREATE USER testuser2