From 6afdf2b590417a4677b5ee4b5e30f6e4f7487427 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 25 Oct 2021 00:12:33 -0400 Subject: [PATCH 1/3] server/settingswatcher: minor refactor Release note: None --- .../settingswatcher/settings_watcher.go | 140 ++++++++++-------- 1 file changed, 78 insertions(+), 62 deletions(-) diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index 95f84bdd358d..b1bb90434528 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -39,6 +39,9 @@ type SettingsWatcher struct { f *rangefeed.Factory stopper *stop.Stopper dec RowDecoder + + // Running state, access underneath the rangefeed callback. + updater settings.Updater } // New constructs a new SettingsWatcher. @@ -59,6 +62,50 @@ func New( } } +func (s *SettingsWatcher) onEntry(ctx context.Context, kv *roachpb.RangeFeedValue) { + k, val, valType, tombstone, err := s.dec.DecodeRow(roachpb.KeyValue{ + Key: kv.Key, + Value: kv.Value, + }) + if err != nil { + log.Warningf(ctx, "failed to decode settings row %v: %v", kv.Key, err) + } + // This event corresponds to a deletion. + if tombstone { + s, ok := settings.Lookup(k, settings.LookupForLocalAccess) + if !ok { + log.Warningf(ctx, "failed to find setting %s, skipping update", + log.Safe(k)) + return + } + ws, ok := s.(settings.NonMaskedSetting) + if !ok { + log.Fatalf(ctx, "expected writable setting, got %T", s) + } + val, valType = ws.EncodedDefault(), ws.Typ() + } + + // The system tenant (i.e. the KV layer) does not use the SettingsWatcher + // to propagate cluster version changes (it uses the BumpClusterVersion + // RPC). However, non-system tenants (i.e. SQL pods) (asynchronously) get + // word of the new cluster version below. + const versionSettingKey = "version" + if k == versionSettingKey && !s.codec.ForSystemTenant() { + var v clusterversion.ClusterVersion + if err := protoutil.Unmarshal([]byte(val), &v); err != nil { + log.Warningf(ctx, "failed to set cluster version: %v", err) + } else if err := s.settings.Version.SetActiveVersion(ctx, v); err != nil { + log.Warningf(ctx, "failed to set cluster version: %v", err) + } else { + log.Infof(ctx, "set cluster version to: %v", v) + } + } else if err := s.updater.Set(ctx, k, val, valType); err != nil { + log.Warningf(ctx, "failed to set setting %s to %s: %v", + log.Safe(k), val, err) + } + +} + // Start will start the SettingsWatcher. It returns after the initial settings // have been retrieved. An error will be returned if the context is canceled or // the stopper is stopped prior to the initial data being retrieved. @@ -68,76 +115,45 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { Key: settingsTablePrefix, EndKey: settingsTablePrefix.PrefixEnd(), } - now := s.clock.Now() - u := s.settings.MakeUpdater() - initialScanDone := make(chan struct{}) - var initialScanErr error - rf, err := s.f.RangeFeed(ctx, "settings", []roachpb.Span{settingsTableSpan}, now, func( - ctx context.Context, kv *roachpb.RangeFeedValue, - ) { - k, val, valType, tombstone, err := s.dec.DecodeRow(roachpb.KeyValue{ - Key: kv.Key, - Value: kv.Value, - }) - if err != nil { - log.Warningf(ctx, "failed to decode settings row %v: %v", kv.Key, err) + s.updater = s.settings.MakeUpdater() + + var ( + initialScanDoneCh = make(chan struct{}) + initialScanFunc = func(ctx context.Context) { + s.updater.ResetRemaining(ctx) + close(initialScanDoneCh) } - // This event corresponds to a deletion. - if tombstone { - s, ok := settings.Lookup(k, settings.LookupForLocalAccess) - if !ok { - log.Warningf(ctx, "failed to find setting %s, skipping update", - log.Safe(k)) - return - } - ws, ok := s.(settings.NonMaskedSetting) - if !ok { - log.Fatalf(ctx, "expected writable setting, got %T", s) + initialScanErr error + initialScanErrorFunc = func( + ctx context.Context, err error, + ) (shouldFail bool) { + // TODO(ajwerner): Consider if there are other errors which we want to + // treat as permanent. + if grpcutil.IsAuthError(err) || + // This is a hack around the fact that we do not get properly structured + // errors out of gRPC. See #56208. + strings.Contains(err.Error(), "rpc error: code = Unauthenticated") { + initialScanErr = err + close(initialScanDoneCh) + shouldFail = true } - val, valType = ws.EncodedDefault(), ws.Typ() + return shouldFail } + ) + opts := []rangefeed.Option{ + rangefeed.WithInitialScan(initialScanFunc), + rangefeed.WithOnInitialScanError(initialScanErrorFunc), + } - // The system tenant (i.e. the KV layer) does not use the SettingsWatcher - // to propagate cluster version changes (it uses the BumpClusterVersion - // RPC). However, non-system tenants (i.e. SQL pods) (asynchronously) get - // word of the new cluster version below. - const versionSettingKey = "version" - if k == versionSettingKey && !s.codec.ForSystemTenant() { - var v clusterversion.ClusterVersion - if err := protoutil.Unmarshal([]byte(val), &v); err != nil { - log.Warningf(ctx, "failed to set cluster version: %v", err) - } else if err := s.settings.Version.SetActiveVersion(ctx, v); err != nil { - log.Warningf(ctx, "failed to set cluster version: %v", err) - } else { - log.Infof(ctx, "set cluster version to: %v", v) - } - } else if err := u.Set(ctx, k, val, valType); err != nil { - log.Warningf(ctx, "failed to set setting %s to %s: %v", - log.Safe(k), val, err) - } - }, rangefeed.WithInitialScan(func(ctx context.Context) { - u.ResetRemaining(ctx) - close(initialScanDone) - }), rangefeed.WithOnInitialScanError(func( - ctx context.Context, err error, - ) (shouldFail bool) { - // TODO(ajwerner): Consider if there are other errors which we want to - // treat as permanent. - if grpcutil.IsAuthError(err) || - // This is a hack around the fact that we do not get properly structured - // errors out of gRPC. See #56208. - strings.Contains(err.Error(), "rpc error: code = Unauthenticated") { - initialScanErr = err - close(initialScanDone) - shouldFail = true - } - return shouldFail - })) + now := s.clock.Now() + rf, err := s.f.RangeFeed( + ctx, "settings", []roachpb.Span{settingsTableSpan}, now, s.onEntry, opts..., + ) if err != nil { return err } select { - case <-initialScanDone: + case <-initialScanDoneCh: if initialScanErr != nil { return initialScanErr } From a35cf0ca09bf4c488ed926f23194573c86a3c44f Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 25 Oct 2021 01:00:02 -0400 Subject: [PATCH 2/3] server/settingswatcher: add support for writing checkpoints to storage Release note: None --- pkg/server/server_sql.go | 2 +- pkg/server/settingswatcher/BUILD.bazel | 5 + .../settingswatcher/settings_watcher.go | 146 +++++++++++++++++- .../settings_watcher_external_test.go | 116 ++++++++++++-- 4 files changed, 257 insertions(+), 12 deletions(-) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 112608642430..6430a1f3daa7 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -939,7 +939,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { var settingsWatcher *settingswatcher.SettingsWatcher if !codec.ForSystemTenant() { settingsWatcher = settingswatcher.New( - cfg.clock, codec, cfg.Settings, cfg.rangeFeedFactory, cfg.stopper, + cfg.clock, codec, cfg.Settings, cfg.rangeFeedFactory, cfg.stopper, nil, /* storage */ ) } diff --git a/pkg/server/settingswatcher/BUILD.bazel b/pkg/server/settingswatcher/BUILD.bazel index bd33266cf6b2..6f425e589706 100644 --- a/pkg/server/settingswatcher/BUILD.bazel +++ b/pkg/server/settingswatcher/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/clusterversion", "//pkg/keys", "//pkg/kv/kvclient/rangefeed:with-mocks", + "//pkg/kv/kvclient/rangefeed/rangefeedbuffer", "//pkg/roachpb:with-mocks", "//pkg/settings", "//pkg/settings/cluster", @@ -28,6 +29,8 @@ go_library( "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/stop", + "//pkg/util/syncutil", + "//pkg/util/syncutil/singleflight", "@com_github_cockroachdb_errors//:errors", ], ) @@ -43,6 +46,7 @@ go_test( ":settingswatcher", "//pkg/base", "//pkg/keys", + "//pkg/kv", "//pkg/roachpb:with-mocks", "//pkg/security", "//pkg/security/securitytest", @@ -56,6 +60,7 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util/hlc", "//pkg/util/leaktest", + "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index b1bb90434528..5e943409fe36 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -14,11 +14,13 @@ package settingswatcher import ( "context" + "sort" "strings" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -27,6 +29,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/errors" ) @@ -42,6 +46,24 @@ type SettingsWatcher struct { // Running state, access underneath the rangefeed callback. updater settings.Updater + + storage Storage + + // State used to store settings values to disk. + buffer *rangefeedbuffer.Buffer + g singleflight.Group + mu struct { + syncutil.Mutex + frontierSaved hlc.Timestamp + frontierToSave hlc.Timestamp + data []roachpb.KeyValue + } +} + +// Storage is used to store a snapshot of KVs which can then be used to +// bootstrap settings state. +type Storage interface { + WriteKVs(ctx context.Context, kvs []roachpb.KeyValue) error } // New constructs a new SettingsWatcher. @@ -51,6 +73,7 @@ func New( settingsToUpdate *cluster.Settings, f *rangefeed.Factory, stopper *stop.Stopper, + storage Storage, ) *SettingsWatcher { return &SettingsWatcher{ clock: clock, @@ -59,6 +82,7 @@ func New( f: f, stopper: stopper, dec: MakeRowDecoder(codec), + storage: storage, } } @@ -104,6 +128,15 @@ func (s *SettingsWatcher) onEntry(ctx context.Context, kv *roachpb.RangeFeedValu log.Safe(k), val, err) } + if s.buffer != nil { + if err := s.buffer.Add((*event)(kv)); err != nil { + // TODO(ajwerner): What do I do here? Stop buffering? + // I really wish there were no limit. + log.Warningf(ctx, "failed to buffer setting for storage, "+ + "giving up on storing settings") + s.buffer = nil + } + } } // Start will start the SettingsWatcher. It returns after the initial settings @@ -144,7 +177,18 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { rangefeed.WithInitialScan(initialScanFunc), rangefeed.WithOnInitialScanError(initialScanErrorFunc), } - + if s.storage != nil { + const aNumberThatBetterBeLargeEnough = 1 << 16 + s.buffer = rangefeedbuffer.New(aNumberThatBetterBeLargeEnough) + opts = append(opts, rangefeed.WithOnFrontierAdvance(func( + ctx context.Context, timestamp hlc.Timestamp, + ) { + if !s.setFrontierToSnapshot(timestamp) { + return + } + s.snapshotFrontier(ctx, timestamp) + })) + } now := s.clock.Now() rf, err := s.f.RangeFeed( ctx, "settings", []roachpb.Span{settingsTableSpan}, now, s.onEntry, opts..., @@ -167,3 +211,103 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { "failed to retrieve initial cluster settings") } } + +// setFrontierToSnapshot sets the frontier timestamp for which there is +// a pending snapshot. +func (s *SettingsWatcher) setFrontierToSnapshot(timestamp hlc.Timestamp) (forwarded bool) { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.frontierToSave.Forward(timestamp) +} + +func (s *SettingsWatcher) getFrontierState() (toSnapshop, stored hlc.Timestamp) { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.frontierToSave, s.mu.frontierSaved +} + +func (s *SettingsWatcher) snapshotFrontier(ctx context.Context, toSnapshot hlc.Timestamp) { + // If this returns an error, it's because we are shutting down. Ignore it. + _ = s.stopper.RunAsyncTask(ctx, "snapshot-settings", func( + ctx context.Context, + ) { + for { + tsInterface, _, err := s.g.Do("", func() (interface{}, error) { + toSave, saved := s.getFrontierState() + if toSave.Equal(saved) { + return nil, nil + } + newData := s.copyEventsToData(s.buffer.Flush(ctx, toSave)) + if err := s.storage.WriteKVs(ctx, newData); err != nil { + return nil, err + } + s.mu.Lock() + defer s.mu.Unlock() + s.mu.frontierSaved = toSave + return toSave, nil + }) + if err != nil { + return + } + if ts, ok := tsInterface.(hlc.Timestamp); ok && toSnapshot.LessEq(ts) { + return + } + } + }) +} + +type event roachpb.RangeFeedValue + +func (r *event) Timestamp() hlc.Timestamp { + return r.Value.Timestamp +} + +var _ rangefeedbuffer.Event = (*event)(nil) + +func (s *SettingsWatcher) copyEventsToData(events []rangefeedbuffer.Event) []roachpb.KeyValue { + var kvs []roachpb.KeyValue + func() { + s.mu.Lock() + defer s.mu.Unlock() + kvs = append(kvs, s.mu.data...) + }() + for _, ev := range events { + ev := ev.(*event) + kvs = append(kvs, roachpb.KeyValue{Key: ev.Key, Value: ev.Value}) + } + // Sort the keys data before compacting it away. + sort.Slice(kvs, func(i, j int) bool { + cmp := kvs[i].Key.Compare(kvs[j].Key) + switch { + case cmp < 0: + return true + case cmp > 0: + return false + default: + // Sort larger timestamps earlier. + return !kvs[i].Value.Timestamp.LessEq(kvs[j].Value.Timestamp) + } + }) + // Remove the older entries. + truncated := kvs[:0] + for _, kv := range kvs { + if len(truncated) > 0 && + truncated[len(truncated)-1].Key.Equal(kv.Key) { + continue + } + truncated = append(truncated, kv) + } + // Remove the tombstones. + kvs, truncated = truncated, truncated[:0] + for _, kv := range kvs { + if !kv.Value.IsPresent() { + continue + } + truncated = append(truncated, kv) + } + // Clear the memory in the rest of the slice. + s.mu.Lock() + defer s.mu.Unlock() + s.mu.data = append(make([]roachpb.KeyValue, 0, len(truncated)), kvs...) + return s.mu.data +} diff --git a/pkg/server/settingswatcher/settings_watcher_external_test.go b/pkg/server/settingswatcher/settings_watcher_external_test.go index 030dda42d5e1..3e12b009a8bc 100644 --- a/pkg/server/settingswatcher/settings_watcher_external_test.go +++ b/pkg/server/settingswatcher/settings_watcher_external_test.go @@ -11,11 +11,13 @@ package settingswatcher_test import ( + "bytes" "context" "testing" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" "github.com/cockroachdb/cockroach/pkg/settings" @@ -26,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -56,17 +59,39 @@ func TestSettingWatcher(t *testing.T) { db := tc.Server(0).DB() - copySettingsFromSystemToFakeTenant := func() { + getSourceClusterRows := func() []kv.KeyValue { rows, err := db.Scan(ctx, systemTable, systemTable.PrefixEnd(), 0 /* maxRows */) require.NoError(t, err) - for _, row := range rows { - rem, _, err := keys.DecodeTenantPrefix(row.Key) - require.NoError(t, err) - tenantKey := append(fakeTenantPrefix, rem...) - row.Value.ClearChecksum() - row.Value.Timestamp = hlc.Timestamp{} - require.NoError(t, db.Put(ctx, tenantKey, row.Value)) - } + return rows + } + copySettingsFromSystemToFakeTenant := func() (numSet int) { + rows := getSourceClusterRows() + require.NoError(t, db.Txn(ctx, func( + ctx context.Context, txn *kv.Txn, + ) error { + if _, err := txn.DelRange( + ctx, + fakeCodec.TablePrefix(keys.SettingsTableID), + fakeCodec.TablePrefix(keys.SettingsTableID).PrefixEnd(), + false, + ); err != nil { + return err + } + + ba := txn.NewBatch() + for _, row := range rows { + rem, _, err := keys.DecodeTenantPrefix(row.Key) + require.NoError(t, err) + var req roachpb.PutRequest + req.Key = append(fakeTenantPrefix, rem...) + req.Value = *row.Value + req.Value.ClearChecksum() + req.Value.Timestamp = hlc.Timestamp{} + ba.AddRawRequest(&req) + } + return txn.Run(ctx, ba) + })) + return len(rows) } checkSettingsValuesMatch := func(a, b *cluster.Settings) error { for _, k := range settings.Keys() { @@ -78,22 +103,93 @@ func TestSettingWatcher(t *testing.T) { } return nil } + checkStoredValuesMatch := func(expected []roachpb.KeyValue) error { + got := getSourceClusterRows() + if len(got) != len(expected) { + return errors.Errorf("expected %d rows, got %d", len(expected), len(got)) + } + for i, kv := range got { + rem, _, err := keys.DecodeTenantPrefix(kv.Key) + require.NoError(t, err) + tenantKey := append(fakeTenantPrefix, rem...) + if !tenantKey.Equal(expected[i].Key) { + return errors.Errorf("mismatched key %d: %v expected, got %d", i, expected[i].Key, tenantKey) + } + // Look past the checksum because it uses the key too. + const checksumLen = 4 + if !bytes.Equal( + kv.Value.RawBytes[checksumLen:], + expected[i].Value.RawBytes[checksumLen:], + ) { + return errors.Errorf("mismatched value %d: %q expected, got %q", + i, kv.Value.RawBytes, expected[i].Value.RawBytes) + } + } + return nil + } + baseNumSet := copySettingsFromSystemToFakeTenant() for k, v := range toSet { tdb.Exec(t, "SET CLUSTER SETTING "+k+" = $1", v[0]) } copySettingsFromSystemToFakeTenant() s0 := tc.Server(0) fakeSettings := cluster.MakeTestingClusterSettings() + storage := &fakeStorage{} sw := settingswatcher.New(s0.Clock(), fakeCodec, fakeSettings, s0.ExecutorConfig().(sql.ExecutorConfig).RangeFeedFactory, - tc.Stopper()) + tc.Stopper(), storage) require.NoError(t, sw.Start(ctx)) require.NoError(t, checkSettingsValuesMatch(s0.ClusterSettings(), fakeSettings)) + for k, v := range toSet { tdb.Exec(t, "SET CLUSTER SETTING "+k+" = $1", v[1]) } + copySettingsFromSystemToFakeTenant() testutils.SucceedsSoon(t, func() error { return checkSettingsValuesMatch(s0.ClusterSettings(), fakeSettings) }) + // Shorten the closed timestamp duration as a cheeky way to check the + // checkpointing code while also speeding up the test. + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10 ms'") + copySettingsFromSystemToFakeTenant() + testutils.SucceedsSoon(t, func() error { + return checkStoredValuesMatch(storage.getKVs()) + }) + + // Set and unset. + for k, v := range toSet { + tdb.Exec(t, "SET CLUSTER SETTING "+k+" = $1", v[1]) + } + copySettingsFromSystemToFakeTenant() + for k := range toSet { + tdb.Exec(t, "SET CLUSTER SETTING "+k+" = DEFAULT") + } + copySettingsFromSystemToFakeTenant() + + // Make sure that the storage layer gets the right ultimate values. + testutils.SucceedsSoon(t, func() error { + return checkStoredValuesMatch(storage.getKVs()) + }) + // Confirm that we've removed the unset values. + expectedSet := baseNumSet + 1 // for the closed_timestamp setting + require.Equal(t, expectedSet, len(storage.getKVs())) +} + +type fakeStorage struct { + syncutil.Mutex + kvs []roachpb.KeyValue +} + +func (f *fakeStorage) WriteKVs(ctx context.Context, kvs []roachpb.KeyValue) error { + f.Lock() + defer f.Unlock() + f.kvs = kvs + return nil +} + +func (f *fakeStorage) getKVs() []roachpb.KeyValue { + f.Lock() + defer f.Unlock() + return f.kvs } From 2e8c2c4d37d7ead6cb0826083ace3c3ab4571c26 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 25 Oct 2021 00:02:54 -0400 Subject: [PATCH 3/3] server: adopt for settings rangefeed-backed settingswatcher, remove gossip This commit removes the code which connected the settings to their backing table via the gossipped system config. Instead it unconditionally enables the rangefeed-backed `settingswatcher` which was developed to support tenants. Note that it is rather tested code that has been used in multi-tenant sql pods for about a year now and all the existing tests still pass. Release note: None --- pkg/server/BUILD.bazel | 4 +- pkg/server/server.go | 6 +- pkg/server/server_sql.go | 19 ++-- pkg/server/settings_cache.go | 33 +++++++ pkg/server/settings_cache_test.go | 5 +- ...ettingsworker_test.go => settings_test.go} | 0 pkg/server/settingsworker.go | 90 ------------------- 7 files changed, 53 insertions(+), 104 deletions(-) rename pkg/server/{settingsworker_test.go => settings_test.go} (100%) delete mode 100644 pkg/server/settingsworker.go diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index d0d66862a7ac..87d8f935472a 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -38,7 +38,6 @@ go_library( "server_sql.go", "server_systemlog_gc.go", "settings_cache.go", - "settingsworker.go", "sql_stats.go", "statement_diagnostics_requests.go", "statements.go", @@ -297,7 +296,7 @@ go_test( "server_systemlog_gc_test.go", "server_test.go", "settings_cache_test.go", - "settingsworker_test.go", + "settings_test.go", "statements_test.go", "stats_test.go", "status_test.go", @@ -321,6 +320,7 @@ go_test( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/closedts", "//pkg/kv/kvserver/closedts/ctpb", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", diff --git a/pkg/server/server.go b/pkg/server/server.go index 2cc1e91e6f24..c7f6c8d3c4aa 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -785,6 +785,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { }) registry.AddMetricStruct(kvProber.Metrics()) + settingsWriter := &settingsCacheWriter{eng: engines[0]} sqlServer, err := newSQLServer(ctx, sqlServerArgs{ sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{ nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(sStatus), @@ -824,6 +825,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { regionsServer: sStatus, tenantUsageServer: tenantUsage, monitorAndMetrics: sqlMonitorAndMetrics, + settingsStorage: settingsWriter, }) if err != nil { return nil, err @@ -1545,7 +1547,9 @@ func (s *Server) PreStart(ctx context.Context) error { // Apply any cached initial settings (and start the gossip listener) as early // as possible, to avoid spending time with stale settings. - if err := s.refreshSettings(state.initialSettingsKVs); err != nil { + if err := initializeCachedSettings( + ctx, keys.SystemSQLCodec, s.st.MakeUpdater(), state.initialSettingsKVs, + ); err != nil { return errors.Wrap(err, "during initializing settings updater") } diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 6430a1f3daa7..af16579f82aa 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -302,6 +302,10 @@ type sqlServerArgs struct { // monitorAndMetrics contains the return value of newRootSQLMemoryMonitor. monitorAndMetrics monitorAndMetrics + + // settingsStorage is an optional interface to drive storing of settings + // data on disk to provide a fresh source of settings upon next startup. + settingsStorage settingswatcher.Storage } type monitorAndMetrics struct { @@ -936,12 +940,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { reporter.TestingKnobs = &cfg.TestingKnobs.Server.(*TestingKnobs).DiagnosticsTestingKnobs } - var settingsWatcher *settingswatcher.SettingsWatcher - if !codec.ForSystemTenant() { - settingsWatcher = settingswatcher.New( - cfg.clock, codec, cfg.Settings, cfg.rangeFeedFactory, cfg.stopper, nil, /* storage */ - ) - } + settingsWatcher := settingswatcher.New( + cfg.clock, codec, cfg.Settings, cfg.rangeFeedFactory, cfg.stopper, cfg.settingsStorage, + ) return &SQLServer{ ambientCtx: cfg.BaseConfig.AmbientCtx, @@ -1146,10 +1147,8 @@ func (s *SQLServer) preStart( bootstrapVersion = roachpb.Version{Major: 20, Minor: 1, Internal: 1} } - if s.settingsWatcher != nil { - if err := s.settingsWatcher.Start(ctx); err != nil { - return errors.Wrap(err, "initializing settings") - } + if err := s.settingsWatcher.Start(ctx); err != nil { + return errors.Wrap(err, "initializing settings") } // Run startup migrations (note: these depend on jobs subsystem running). diff --git a/pkg/server/settings_cache.go b/pkg/server/settings_cache.go index 43803bd3d68b..a8035b1a626f 100644 --- a/pkg/server/settings_cache.go +++ b/pkg/server/settings_cache.go @@ -15,12 +15,26 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" ) +type settingsCacheWriter struct { + eng storage.Engine +} + +func (s settingsCacheWriter) WriteKVs(ctx context.Context, kvs []roachpb.KeyValue) error { + return storeCachedSettingsKVs(ctx, s.eng, kvs) +} + +var _ settingswatcher.Storage = (*settingsCacheWriter)(nil) + // storeCachedSettingsKVs stores or caches node's settings locally. // This helps in restoring the node restart with the at least the same settings with which it died. func storeCachedSettingsKVs(ctx context.Context, eng storage.Engine, kvs []roachpb.KeyValue) error { @@ -64,3 +78,22 @@ func loadCachedSettingsKVs(_ context.Context, eng storage.Engine) ([]roachpb.Key } return settingsKVs, nil } + +func initializeCachedSettings( + ctx context.Context, codec keys.SQLCodec, updater settings.Updater, kvs []roachpb.KeyValue, +) error { + dec := settingswatcher.MakeRowDecoder(codec) + for _, kv := range kvs { + settings, val, valType, _, err := dec.DecodeRow(kv) + if err != nil { + return errors.Wrap(err, `while decoding settings data +-this likely indicates the settings table structure or encoding has been altered; +-skipping settings updates`) + } + if err := updater.Set(ctx, settings, val, valType); err != nil { + log.Warningf(ctx, "setting %q to %q failed: %+v", settings, val, err) + } + } + updater.ResetRemaining(ctx) + return nil +} diff --git a/pkg/server/settings_cache_test.go b/pkg/server/settings_cache_test.go index 31d00234c9b6..bddd2a13b2fb 100644 --- a/pkg/server/settings_cache_test.go +++ b/pkg/server/settings_cache_test.go @@ -14,9 +14,11 @@ import ( "context" "fmt" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -73,9 +75,10 @@ func TestCachedSettingsServerRestart(t *testing.T) { }, }, } - var settingsCache []roachpb.KeyValue testServer, _, _ := serverutils.StartServer(t, serverArgs) + closedts.TargetDuration.Override(ctx, &testServer.ClusterSettings().SV, 10*time.Millisecond) + closedts.SideTransportCloseInterval.Override(ctx, &testServer.ClusterSettings().SV, 10*time.Millisecond) testutils.SucceedsSoon(t, func() error { store, err := testServer.GetStores().(*kvserver.Stores).GetStore(1) if err != nil { diff --git a/pkg/server/settingsworker_test.go b/pkg/server/settings_test.go similarity index 100% rename from pkg/server/settingsworker_test.go rename to pkg/server/settings_test.go diff --git a/pkg/server/settingsworker.go b/pkg/server/settingsworker.go deleted file mode 100644 index e2d19fbbd021..000000000000 --- a/pkg/server/settingsworker.go +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright 2017 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package server - -import ( - "bytes" - "context" - - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" - "github.com/cockroachdb/cockroach/pkg/settings" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" - "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/errors" -) - -func processSystemConfigKVs( - ctx context.Context, kvs []roachpb.KeyValue, u settings.Updater, eng storage.Engine, -) error { - tbl := systemschema.SettingsTable - - codec := keys.TODOSQLCodec - settingsTablePrefix := codec.TablePrefix(uint32(tbl.GetID())) - dec := settingswatcher.MakeRowDecoder(codec) - - var settingsKVs []roachpb.KeyValue - processKV := func(ctx context.Context, kv roachpb.KeyValue, u settings.Updater) error { - if !bytes.HasPrefix(kv.Key, settingsTablePrefix) { - return nil - } - k, v, t, _, err := dec.DecodeRow(kv) - if err != nil { - return err - } - settingsKVs = append(settingsKVs, kv) - - if err := u.Set(ctx, k, v, t); err != nil { - log.Warningf(ctx, "setting %q to %q failed: %+v", k, v, err) - } - return nil - } - for _, kv := range kvs { - if err := processKV(ctx, kv, u); err != nil { - return errors.Wrap(err, `while decoding settings data -this likely indicates the settings table structure or encoding has been altered; -skipping settings updates`) - } - } - u.ResetRemaining(ctx) - return errors.Wrap(storeCachedSettingsKVs(ctx, eng, settingsKVs), "while storing settings kvs") -} - -// RefreshSettings starts a settings-changes listener. -func (s *Server) refreshSettings(initialSettingsKVs []roachpb.KeyValue) error { - ctx := s.AnnotateCtx(context.Background()) - // If we have initial settings KV pairs loaded from the local engines, - // apply them before starting the gossip listener. - if len(initialSettingsKVs) != 0 { - if err := processSystemConfigKVs(ctx, initialSettingsKVs, s.st.MakeUpdater(), s.engines[0]); err != nil { - return errors.Wrap(err, "during processing initial settings") - } - } - // Setup updater that listens for changes in settings. - return s.stopper.RunAsyncTask(ctx, "refresh-settings", func(ctx context.Context) { - gossipUpdateC := s.gossip.RegisterSystemConfigChannel() - // No new settings can be defined beyond this point. - for { - select { - case <-gossipUpdateC: - cfg := s.gossip.GetSystemConfig() - u := s.st.MakeUpdater() - if err := processSystemConfigKVs(ctx, cfg.Values, u, s.engines[0]); err != nil { - log.Warningf(ctx, "error processing config KVs: %+v", err) - } - case <-s.stopper.ShouldQuiesce(): - return - } - } - }) -}