diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 116e4f08bc78..5c8117630d80 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -187,6 +187,7 @@ ALL_TESTS = [ "//pkg/server/serverpb:serverpb_test", "//pkg/server/settingswatcher:settingswatcher_test", "//pkg/server/status:status_test", + "//pkg/server/systemconfigwatcher:systemconfigwatcher_test", "//pkg/server/telemetry:telemetry_test", "//pkg/server/tracedumper:tracedumper_test", "//pkg/server:server_test", diff --git a/pkg/ccl/benchccl/rttanalysisccl/testdata/benchmark_expectations b/pkg/ccl/benchccl/rttanalysisccl/testdata/benchmark_expectations index bbb3fb52f4c0..600c80f2a409 100644 --- a/pkg/ccl/benchccl/rttanalysisccl/testdata/benchmark_expectations +++ b/pkg/ccl/benchccl/rttanalysisccl/testdata/benchmark_expectations @@ -2,7 +2,7 @@ exp,benchmark 21,AlterPrimaryRegion/alter_empty_database_alter_primary_region 25-26,AlterPrimaryRegion/alter_empty_database_set_initial_primary_region 21,AlterPrimaryRegion/alter_populated_database_alter_primary_region -27,AlterPrimaryRegion/alter_populated_database_set_initial_primary_region +26,AlterPrimaryRegion/alter_populated_database_set_initial_primary_region 20,AlterRegions/alter_empty_database_add_region 21,AlterRegions/alter_empty_database_drop_region 20,AlterRegions/alter_populated_database_add_region diff --git a/pkg/ccl/logictestccl/testdata/logic_test/multi_region_query_behavior b/pkg/ccl/logictestccl/testdata/logic_test/multi_region_query_behavior index 55fefc390b2d..8d7d34288e0d 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/multi_region_query_behavior +++ b/pkg/ccl/logictestccl/testdata/logic_test/multi_region_query_behavior @@ -1,6 +1,12 @@ # LogicTest: multiregion-9node-3region-3azs # TODO(#69265): enable multiregion-9node-3region-3azs-tenant. +# Set the closed timestamp interval to be short to shorten the amount of time +# we need to wait for the system config to propagate. +statement ok +SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms'; +SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms'; + statement ok CREATE DATABASE multi_region_test_db PRIMARY REGION "ca-central-1" REGIONS "ap-southeast-2", "us-east-1"; USE multi_region_test_db @@ -187,7 +193,7 @@ ALTER TABLE history INJECT STATISTICS '[ # Regression test for #63735. Ensure that we choose locality optimized anti # joins for the foreign key checks. -query T +query T retry EXPLAIN INSERT INTO history (h_c_id, h_c_d_id, h_c_w_id, h_d_id, h_w_id, h_amount, h_date, h_data) diff --git a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior index 5d158dbb0b03..aece04377d3a 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior +++ b/pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior @@ -2,6 +2,12 @@ # TODO(#69265): enable multiregion-9node-3region-3azs-tenant and/or revert # the commit that split these changes out. +# Set the closed timestamp interval to be short to shorten the amount of time +# we need to wait for the system config to propagate. +statement ok +SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms'; +SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms'; + statement ok CREATE DATABASE multi_region_test_db PRIMARY REGION "ca-central-1" REGIONS "ap-southeast-2", "us-east-1" SURVIVE REGION FAILURE; USE multi_region_test_db @@ -20,7 +26,7 @@ CREATE TABLE regional_by_row_table ( ) LOCALITY REGIONAL BY ROW # Do a REGEXP replace of the enums as these may not be static. -query T +query T retry SELECT regexp_replace(info, '@\d+', '@', 'g') FROM [EXPLAIN (OPT, CATALOG) SELECT * FROM regional_by_row_table] ---- diff --git a/pkg/ccl/serverccl/BUILD.bazel b/pkg/ccl/serverccl/BUILD.bazel index 4e1c3c41bff3..9e31c616d29b 100644 --- a/pkg/ccl/serverccl/BUILD.bazel +++ b/pkg/ccl/serverccl/BUILD.bazel @@ -29,6 +29,7 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/server/serverpb", + "//pkg/server/systemconfigwatcher/systemconfigwatchertest", "//pkg/sql", "//pkg/sql/distsql", "//pkg/sql/tests", diff --git a/pkg/ccl/serverccl/server_sql_test.go b/pkg/ccl/serverccl/server_sql_test.go index 331fd3ddb3ff..8de0edbb9295 100644 --- a/pkg/ccl/serverccl/server_sql_test.go +++ b/pkg/ccl/serverccl/server_sql_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/utilccl/licenseccl" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher/systemconfigwatchertest" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -250,3 +251,8 @@ func TestNoInflightTracesVirtualTableOnTenant(t *testing.T) { require.Error(t, err, "cluster_inflight_traces should be unsupported") require.Contains(t, err.Error(), "table crdb_internal.cluster_inflight_traces is not implemented on tenants") } + +func TestSystemConfigWatcherCache(t *testing.T) { + defer leaktest.AfterTest(t)() + systemconfigwatchertest.TestSystemConfigWatcher(t, false /* skipSecondary */) +} diff --git a/pkg/ccl/telemetryccl/testdata/telemetry/multiregion b/pkg/ccl/telemetryccl/testdata/telemetry/multiregion index f1df49a1a9fc..628047bf311d 100644 --- a/pkg/ccl/telemetryccl/testdata/telemetry/multiregion +++ b/pkg/ccl/telemetryccl/testdata/telemetry/multiregion @@ -384,6 +384,14 @@ IMPORT INTO t7 CSV DATA ('nodelocal://0/t7/export*.csv') sql.multiregion.import # Test for locality optimized search counter. + +# Lower the closed timestamp subsystem so system config info is transmitted +# rapidly. +exec +SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms'; +SET CLUSTER SETTING kv.closed_timestamp.target_duration = '5ms'; +---- + feature-allowlist sql.plan.opt.locality-optimized-search ---- @@ -393,6 +401,12 @@ USE survive_region; CREATE TABLE t8 (a INT PRIMARY KEY) LOCALITY REGIONAL BY ROW ---- +# Sleep a large multiple of the closed timestamp target duration to ensure +# that a fresh system config has made its way to the optimizer. +exec +SELECT pg_sleep(.05); +---- + feature-usage SELECT * FROM t8 WHERE a = 1 ---- diff --git a/pkg/kv/kvserver/reports/reporter.go b/pkg/kv/kvserver/reports/reporter.go index 98335afefa13..5c944a175248 100644 --- a/pkg/kv/kvserver/reports/reporter.go +++ b/pkg/kv/kvserver/reports/reporter.go @@ -73,6 +73,7 @@ type Reporter struct { settings *cluster.Settings storePool *kvserver.StorePool executor sqlutil.InternalExecutor + cfgs config.SystemConfigProvider frequencyMu struct { syncutil.Mutex @@ -89,6 +90,7 @@ func NewReporter( st *cluster.Settings, liveness *liveness.NodeLiveness, executor sqlutil.InternalExecutor, + provider config.SystemConfigProvider, ) *Reporter { r := Reporter{ db: db, @@ -97,6 +99,7 @@ func NewReporter( settings: st, liveness: liveness, executor: executor, + cfgs: provider, } r.frequencyMu.changeCh = make(chan struct{}) return &r @@ -279,7 +282,7 @@ func (stats *Reporter) meta1LeaseHolderStore(ctx context.Context) *kvserver.Stor } func (stats *Reporter) updateLatestConfig() { - stats.latestConfig = stats.meta1LeaseHolder.Gossip().GetSystemConfig() + stats.latestConfig = stats.cfgs.GetSystemConfig() } // nodeChecker checks whether a node is to be considered alive or not. diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 8971d8f3e1de..99ef987aa0f1 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -121,6 +121,7 @@ go_library( "//pkg/server/settingswatcher", "//pkg/server/status", "//pkg/server/status/statuspb", + "//pkg/server/systemconfigwatcher", "//pkg/server/telemetry", "//pkg/server/tracedumper", "//pkg/settings", diff --git a/pkg/server/server.go b/pkg/server/server.go index 84e59cb95255..d30e263d516d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -47,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/diagnostics" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/status" + "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -618,8 +619,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { kvserver.RegisterPerReplicaServer(grpcServer.Server, node.perReplicaServer) kvserver.RegisterPerStoreServer(grpcServer.Server, node.perReplicaServer) ctpb.RegisterSideTransportServer(grpcServer.Server, ctReceiver) + + systemConfigWatcher := systemconfigwatcher.New( + keys.SystemSQLCodec, clock, rangeFeedFactory, &cfg.DefaultZoneConfig, + ) replicationReporter := reports.NewReporter( - db, node.stores, storePool, st, nodeLiveness, internalExecutor) + db, node.stores, storePool, st, nodeLiveness, internalExecutor, systemConfigWatcher, + ) lateBoundServer := &Server{} // TODO(tbg): give adminServer only what it needs (and avoid circular deps). @@ -695,7 +701,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { runtime: runtimeSampler, rpcContext: rpcContext, nodeDescs: g, - systemConfigProvider: g, + systemConfigWatcher: systemConfigWatcher, spanConfigAccessor: spanConfig.kvAccessor, nodeDialer: nodeDialer, distSender: distSender, diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 5f2c65ebdd28..f235af0895f8 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/blobs" "github.com/cockroachdb/cockroach/pkg/blobs/blobspb" "github.com/cockroachdb/cockroach/pkg/cloud" - "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/featureflag" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -48,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" "github.com/cockroachdb/cockroach/pkg/server/status" + "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/server/tracedumper" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -148,6 +148,8 @@ type SQLServer struct { spanconfigSQLWatcher *spanconfigsqlwatcher.SQLWatcher settingsWatcher *settingswatcher.SettingsWatcher + systemConfigWatcher *systemconfigwatcher.Cache + // pgL is the shared RPC/SQL listener, opened when RPC was initialized. pgL net.Listener // connManager is the connection manager to use to set up additional @@ -228,7 +230,7 @@ type sqlServerArgs struct { nodeDescs kvcoord.NodeDescStore // Used by the executor config. - systemConfigProvider config.SystemConfigProvider + systemConfigWatcher *systemconfigwatcher.Cache // Used by the span config reconciliation job. spanConfigAccessor spanconfig.KVAccessor @@ -502,7 +504,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { hydratedTablesCache := hydratedtables.NewCache(cfg.Settings) cfg.registry.AddMetricStruct(hydratedTablesCache.Metrics()) - gcJobNotifier := gcjobnotifier.New(cfg.Settings, cfg.systemConfigProvider, codec, cfg.stopper) + gcJobNotifier := gcjobnotifier.New(cfg.Settings, cfg.systemConfigWatcher, codec, cfg.stopper) var compactEngineSpanFunc tree.CompactEngineSpanFunc if !codec.ForSystemTenant() { @@ -643,7 +645,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { DB: cfg.db, Gossip: cfg.gossip, NodeLiveness: cfg.nodeLiveness, - SystemConfig: cfg.systemConfigProvider, + SystemConfig: cfg.systemConfigWatcher, MetricsRecorder: cfg.recorder, DistSender: cfg.distSender, RPCContext: cfg.rpcContext, @@ -978,6 +980,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { spanconfigSQLTranslator: spanConfig.sqlTranslator, spanconfigSQLWatcher: spanConfig.sqlWatcher, settingsWatcher: settingsWatcher, + systemConfigWatcher: cfg.systemConfigWatcher, }, nil } @@ -1157,6 +1160,9 @@ func (s *SQLServer) preStart( if err := s.settingsWatcher.Start(ctx); err != nil { return errors.Wrap(err, "initializing settings") } + if err := s.systemConfigWatcher.Start(ctx, s.stopper); err != nil { + return errors.Wrap(err, "initializing settings") + } // Run startup migrations (note: these depend on jobs subsystem running). if err := startupMigrationsMgr.EnsureMigrations(ctx, bootstrapVersion); err != nil { diff --git a/pkg/server/systemconfigwatcher/BUILD.bazel b/pkg/server/systemconfigwatcher/BUILD.bazel new file mode 100644 index 000000000000..5d4f039b4881 --- /dev/null +++ b/pkg/server/systemconfigwatcher/BUILD.bazel @@ -0,0 +1,38 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "systemconfigwatcher", + srcs = ["cache.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher", + visibility = ["//visibility:public"], + deps = [ + "//pkg/config", + "//pkg/config/zonepb", + "//pkg/keys", + "//pkg/kv/kvclient/rangefeed:with-mocks", + "//pkg/kv/kvclient/rangefeed/rangefeedbuffer", + "//pkg/kv/kvclient/rangefeed/rangefeedcache", + "//pkg/roachpb:with-mocks", + "//pkg/util/hlc", + "//pkg/util/stop", + "//pkg/util/syncutil", + ], +) + +go_test( + name = "systemconfigwatcher_test", + srcs = [ + "cache_test.go", + "main_test.go", + ], + embed = [":systemconfigwatcher"], + deps = [ + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/server/systemconfigwatcher/systemconfigwatchertest", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + ], +) diff --git a/pkg/server/systemconfigwatcher/cache.go b/pkg/server/systemconfigwatcher/cache.go new file mode 100644 index 000000000000..9b11ac7813b6 --- /dev/null +++ b/pkg/server/systemconfigwatcher/cache.go @@ -0,0 +1,154 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package systemconfigwatcher + +import ( + "context" + "sort" + + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// Cache caches a set of KVs in a set of spans using a rangefeed. The +// cache provides a consistent snapshot when available, but the snapshot +// may be stale. +type Cache struct { + w *rangefeedcache.Watcher + defaultZoneConfig *zonepb.ZoneConfig + mu struct { + syncutil.RWMutex + + cfg *config.SystemConfig + timestamp hlc.Timestamp + + registered []chan<- struct{} + } +} + +// New constructs a new Cache. +func New( + codec keys.SQLCodec, clock *hlc.Clock, f *rangefeed.Factory, defaultZoneConfig *zonepb.ZoneConfig, +) *Cache { + // TODO(ajwerner): Deal with what happens if the system config has more than this + // many rows. + const bufferSize = 1 << 20 // infinite? + const withPrevValue = false + c := Cache{ + defaultZoneConfig: defaultZoneConfig, + } + + // TODO(ajwerner): Consider stripping this down to just watching + // descriptor and zones. + span := roachpb.Span{ + Key: append(codec.TenantPrefix(), keys.SystemConfigSplitKey...), + EndKey: append(codec.TenantPrefix(), keys.SystemConfigTableDataMax...), + } + c.w = rangefeedcache.NewWatcher( + "system-config-cache", clock, f, + bufferSize, + []roachpb.Span{span}, + withPrevValue, + passThroughTranslation, + c.handleUpdate, + nil) + return &c +} + +// Start starts the cache. +func (c *Cache) Start(ctx context.Context, stopper *stop.Stopper) error { + return rangefeedcache.Start(ctx, stopper, c.w, nil /* onError */) +} + +// GetSystemConfig is part of the config.SystemConfigProvider interface. +func (c *Cache) GetSystemConfig() *config.SystemConfig { + c.mu.RLock() + defer c.mu.RUnlock() + return c.mu.cfg +} + +// RegisterSystemConfigChannel is part of the config.SystemConfigProvider +// interface. +func (c *Cache) RegisterSystemConfigChannel() <-chan struct{} { + ch := make(chan struct{}, 1) + c.mu.Lock() + defer c.mu.Unlock() + c.mu.registered = append(c.mu.registered, ch) + return ch +} + +type keyValues []roachpb.KeyValue + +func (k keyValues) Len() int { return len(k) } +func (k keyValues) Swap(i, j int) { k[i], k[j] = k[j], k[i] } +func (k keyValues) Less(i, j int) bool { return k[i].Key.Compare(k[j].Key) < 0 } + +var _ sort.Interface = (keyValues)(nil) + +func (c *Cache) handleUpdate(_ context.Context, update rangefeedcache.Update) { + updateKVs := rangefeedbuffer.EventsToKVs(update.Events, + rangefeedbuffer.RangeFeedValueEventToKV) + var updatedData []roachpb.KeyValue + switch update.Type { + case rangefeedcache.CompleteUpdate: + sort.Sort(keyValues(updateKVs)) + updatedData = updateKVs + case rangefeedcache.IncrementalUpdate: + // Note that handleUpdate is called synchronously, so we can use the + // old snapshot as the basis for the new snapshot without any risk of + // missing anything. + prev := c.GetSystemConfig() + + // If there is nothing interesting, just update the timestamp and + // return without notifying anybody. + if len(updateKVs) == 0 { + c.setUpdatedConfig(prev, update.Timestamp) + return + } + updatedData = rangefeedbuffer.MergeKVs(prev.Values, updateKVs) + } + + updatedCfg := config.NewSystemConfig(c.defaultZoneConfig) + updatedCfg.Values = updatedData + toNotify := c.setUpdatedConfig(updatedCfg, update.Timestamp) + for _, c := range toNotify { + select { + case c <- struct{}{}: + default: + } + } +} + +func (c *Cache) setUpdatedConfig( + updated *config.SystemConfig, ts hlc.Timestamp, +) (toNotify []chan<- struct{}) { + c.mu.Lock() + defer c.mu.Unlock() + c.mu.cfg = updated + c.mu.timestamp = ts + return c.mu.registered +} + +func passThroughTranslation( + ctx context.Context, value *roachpb.RangeFeedValue, +) rangefeedbuffer.Event { + return value +} + +var _ config.SystemConfigProvider = (*Cache)(nil) diff --git a/pkg/server/systemconfigwatcher/cache_test.go b/pkg/server/systemconfigwatcher/cache_test.go new file mode 100644 index 000000000000..3b985aab2be6 --- /dev/null +++ b/pkg/server/systemconfigwatcher/cache_test.go @@ -0,0 +1,23 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package systemconfigwatcher + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher/systemconfigwatchertest" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestCache(t *testing.T) { + defer leaktest.AfterTest(t)() + systemconfigwatchertest.TestSystemConfigWatcher(t, true /* skipSecondary */) +} diff --git a/pkg/server/systemconfigwatcher/main_test.go b/pkg/server/systemconfigwatcher/main_test.go new file mode 100644 index 000000000000..49b2abf09a0f --- /dev/null +++ b/pkg/server/systemconfigwatcher/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package systemconfigwatcher_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/server/systemconfigwatcher/systemconfigwatchertest/BUILD.bazel b/pkg/server/systemconfigwatcher/systemconfigwatchertest/BUILD.bazel new file mode 100644 index 000000000000..cfb571e85099 --- /dev/null +++ b/pkg/server/systemconfigwatcher/systemconfigwatchertest/BUILD.bazel @@ -0,0 +1,23 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "systemconfigwatchertest", + srcs = ["test_system_config_watcher.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher/systemconfigwatchertest", + visibility = ["//visibility:public"], + deps = [ + "//pkg/base", + "//pkg/keys", + "//pkg/roachpb:with-mocks", + "//pkg/sql", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/util/leaktest", + "//pkg/util/log", + "@com_github_cockroachdb_errors//:errors", + "@com_github_kr_pretty//:pretty", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/server/systemconfigwatcher/systemconfigwatchertest/test_system_config_watcher.go b/pkg/server/systemconfigwatcher/systemconfigwatchertest/test_system_config_watcher.go new file mode 100644 index 000000000000..0935811c5c5f --- /dev/null +++ b/pkg/server/systemconfigwatcher/systemconfigwatchertest/test_system_config_watcher.go @@ -0,0 +1,115 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package systemconfigwatchertest exists to exercise systemconfigwatcher +// in both ccl and non-ccl configurations. +package systemconfigwatchertest + +import ( + "context" + gosql "database/sql" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/kr/pretty" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestSystemConfigWatcher is a test which exercises the end-to-end integration +// of the systemconfigwatcher. It exists in this subpackage so that it can be +// run to exercise secondary tenants, which are ccl-only. +func TestSystemConfigWatcher(t *testing.T, skipSecondary bool) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + tdb := sqlutils.MakeSQLRunner(sqlDB) + // Shorten the closed timestamp duration as a cheeky way to check the + // checkpointing code while also speeding up the test. + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10 ms'") + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10 ms'") + + t.Run("system", func(t *testing.T) { + runTest(t, s, sqlDB) + }) + if !skipSecondary { + t.Run("secondary", func(t *testing.T) { + tenant, tenantDB := serverutils.StartTenant(t, s, base.TestTenantArgs{ + TenantID: serverutils.TestTenantID(), + }) + runTest(t, tenant, tenantDB) + }) + } +} + +func runTest(t *testing.T, s serverutils.TestTenantInterface, sqlDB *gosql.DB) { + ctx := context.Background() + tdb := sqlutils.MakeSQLRunner(sqlDB) + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + kvDB := execCfg.DB + r := execCfg.SystemConfig + rc := r.RegisterSystemConfigChannel() + clearChan := func() { + select { + case <-rc: + default: + } + } + getSystemConfig := func(t *testing.T) []roachpb.KeyValue { + var ba roachpb.BatchRequest + ba.Add(roachpb.NewScan( + append(execCfg.Codec.TenantPrefix(), keys.SystemConfigSpan.Key...), + append(execCfg.Codec.TenantPrefix(), keys.SystemConfigSpan.EndKey...), + false, // forUpdate + )) + br, pErr := kvDB.NonTransactionalSender().Send(ctx, ba) + require.NoError(t, pErr.GoError()) + return br.Responses[0].GetScan().Rows + } + checkEqual := func(t *testing.T) error { + rs := r.GetSystemConfig() + if rs == nil { + return errors.New("nil config") + } + sc := getSystemConfig(t) + if !assert.Equal(noopT{}, sc, rs.Values) { + return errors.Errorf("mismatch: %v", pretty.Diff(sc, rs.Values)) + } + return nil + } + waitForEqual := func(t *testing.T) { + testutils.SucceedsSoon(t, func() error { + return checkEqual(t) + }) + } + waitForEqual(t) + clearChan() + tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + <-rc + waitForEqual(t) +} + +type noopT struct{} + +func (noopT) Errorf(string, ...interface{}) {} + +var _ assert.TestingT = (*noopT)(nil) diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 502fecd082fa..5505304a5698 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" @@ -35,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/debug" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/status" + "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor" "github.com/cockroachdb/cockroach/pkg/sql" @@ -443,6 +445,10 @@ func makeTenantSQLServerArgs( return sqlServerArgs{}, err } + systemConfigWatcher := systemconfigwatcher.New( + keys.MakeSQLCodec(sqlCfg.TenantID), clock, rangeFeedFactory, &baseCfg.DefaultZoneConfig, + ) + circularInternalExecutor := &sql.InternalExecutor{} circularJobRegistry := &jobs.Registry{} @@ -521,7 +527,7 @@ func makeTenantSQLServerArgs( runtime: runtime, rpcContext: rpcContext, nodeDescs: tenantConnect, - systemConfigProvider: tenantConnect, + systemConfigWatcher: systemConfigWatcher, spanConfigAccessor: tenantConnect, nodeDialer: nodeDialer, distSender: ds,