From b0cfdc9df67fb582887181cf9bfc26ec4324b00b Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 24 Nov 2022 13:40:38 -0500 Subject: [PATCH] settings,spanconfig: introduce a protobuf setting type For this setting type: - the protoutil.Message is held in memory, - the byte representation is stored in system.settings, and - the json representation is used when accepting input and rendering state (through SHOW CLUSTER SETTING , the raw form is visible when looking directly at system.settings) We also use this setting type to support power a spanconfig.store.fallback_config_override, which overrides the fallback config used for ranges with no explicit span configs set. Previously we used a hardcoded value -- this makes it a bit more configurable. This is a partial and backportable workaround (read: hack) for #91238 and \#91239. In an internal escalation we observed "orphaned" ranges from dropped tables that were not being being referenced by span configs (by virtue of them originating from now-dropped tables/configs). Typically ranges of this sort are short-lived, they're emptied out through GC and merged into LHS neighbors. But if the neighboring ranges are large enough, or load just high enough, the merge does not occur. For such orphaned ranges we were using a hardcoded "fallback config", with a replication factor of three. This made for confusing semantics where if RANGE DEFAULT was configured to have a replication factor of five, our replication reports indicated there were under-replicated ranges. This is partly because replication reports today are powered by zone configs (thus looking at RANGE DEFAULT) -- this will change shortly as part of \#89987 where we'll instead consider span config data. In any case, we were warning users of under-replicated ranges but within KV we were not taking any action to upreplicate them -- KV was respecting the hard-coded fallback config. The issues above describe that we should apply each tenant's RANGE DEFAULT config to all such orphaned ranges, which is probably the right fix. This was alluded to in an earlier TODO but is still left for future work. // TODO(irfansharif): We're using a static[1] fallback span config // here, we could instead have this directly track the host tenant's // RANGE DEFAULT config, or go a step further and use the tenant's own // RANGE DEFAULT instead if the key is within the tenant's keyspace. // We'd have to thread that through the KVAccessor interface by // reserving special keys for these default configs. // // [1]: Modulo the private spanconfig.store.fallback_config_override, which // applies globally. So this PR instead takes a shortcut -- it makes the static config configurable through a cluster setting. We can now do the following which alters what fallback config is applied to orphaned ranges, and in our example above, force such ranges to also have a replication factor of five. SET CLUSTER SETTING spanconfig.store.fallback_config_override = ' { "gcPolicy": {"ttlSeconds": 3600}, "numReplicas": 5, "rangeMaxBytes": "536870912", "rangeMinBytes": "134217728" }'; Release note: None --- pkg/kv/kvserver/client_spanconfigs_test.go | 52 +++++ pkg/kv/kvserver/store.go | 5 + pkg/settings/BUILD.bazel | 4 + pkg/settings/protobuf.go | 216 +++++++++++++++++++++ pkg/settings/updater.go | 17 ++ pkg/spanconfig/spanconfigstore/store.go | 41 +++- pkg/sql/set_cluster_setting.go | 14 +- pkg/sql/show_cluster_setting.go | 4 +- 8 files changed, 341 insertions(+), 12 deletions(-) create mode 100644 pkg/settings/protobuf.go diff --git a/pkg/kv/kvserver/client_spanconfigs_test.go b/pkg/kv/kvserver/client_spanconfigs_test.go index d5d28435f1a2..c4ed314a2ba3 100644 --- a/pkg/kv/kvserver/client_spanconfigs_test.go +++ b/pkg/kv/kvserver/client_spanconfigs_test.go @@ -100,6 +100,58 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) { }) } +// TestFallbackSpanConfigOverride ensures that +// spanconfig.store.fallback_config_override works as expected. +func TestFallbackSpanConfigOverride(t *testing.T) { + defer leaktest.AfterTest(t)() + + st := cluster.MakeTestingClusterSettings() + spanConfigStore := spanconfigstore.New(roachpb.TestingDefaultSpanConfig(), st, nil) + mockSubscriber := newMockSpanConfigSubscriber(spanConfigStore) + + ctx := context.Background() + args := base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + DisableGCQueue: true, + }, + SpanConfig: &spanconfig.TestingKnobs{ + StoreKVSubscriberOverride: mockSubscriber, + }, + }, + } + s, _, _ := serverutils.StartServer(t, args) + defer s.Stopper().Stop(context.Background()) + + _, err := s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", nil, + sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + `SET CLUSTER SETTING spanconfig.store.enabled = true`) + require.NoError(t, err) + + key, err := s.ScratchRange() + require.NoError(t, err) + store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + repl := store.LookupReplica(keys.MustAddr(key)) + span := repl.Desc().RSpan().AsRawSpanWithNoLocals() + + conf := roachpb.SpanConfig{NumReplicas: 5, NumVoters: 3} + spanconfigstore.FallbackConfigOverride.Override(ctx, &st.SV, &conf) + + require.NotNil(t, mockSubscriber.callback) + mockSubscriber.callback(ctx, span) // invoke the callback + testutils.SucceedsSoon(t, func() error { + repl := store.LookupReplica(keys.MustAddr(key)) + gotConfig := repl.SpanConfig() + if !gotConfig.Equal(conf) { + return errors.Newf("expected config=%s, got config=%s", conf.String(), gotConfig.String()) + } + return nil + }) +} + type mockSpanConfigSubscriber struct { callback func(ctx context.Context, config roachpb.Span) spanconfig.Store diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 3b1a9d7e3658..1e950873401a 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2129,6 +2129,11 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { } } }) + + // We also want to do it when the fallback config setting is changed. + spanconfigstore.FallbackConfigOverride.SetOnChange(&s.ClusterSettings().SV, func(ctx context.Context) { + s.applyAllFromSpanConfigStore(ctx) + }) } if !s.cfg.TestingKnobs.DisableAutomaticLeaseRenewal { diff --git a/pkg/settings/BUILD.bazel b/pkg/settings/BUILD.bazel index 4e5a2d27950e..afb5928c1dfc 100644 --- a/pkg/settings/BUILD.bazel +++ b/pkg/settings/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "float.go", "int.go", "masked.go", + "protobuf.go", "registry.go", "setting.go", "string.go", @@ -29,10 +30,13 @@ go_library( deps = [ "//pkg/util/buildutil", "//pkg/util/humanizeutil", + "//pkg/util/protoutil", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", "@com_github_cockroachdb_redact//interfaces", + "@com_github_gogo_protobuf//jsonpb", + "@com_github_gogo_protobuf//proto", ], ) diff --git a/pkg/settings/protobuf.go b/pkg/settings/protobuf.go new file mode 100644 index 000000000000..46329dbaa4bf --- /dev/null +++ b/pkg/settings/protobuf.go @@ -0,0 +1,216 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package settings + +import ( + "context" + "reflect" + "strings" + + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" + "github.com/gogo/protobuf/jsonpb" + "github.com/gogo/protobuf/proto" +) + +// ProtobufSetting is the interface of a setting variable that will be updated +// automatically when the corresponding cluster-wide setting of type "protobuf" +// is updated. The proto message is held in memory, the byte representation +// stored in system.settings, and JSON presentation used when accepting input +// and rendering (just through SHOW CLUSTER SETTING , the raw form +// is visible when looking directly at system.settings). +type ProtobufSetting struct { + defaultValue protoutil.Message + validateFn func(*Values, protoutil.Message) error + common +} + +var _ internalSetting = &ProtobufSetting{} + +// String returns the string representation of the setting's current value. +func (s *ProtobufSetting) String(sv *Values) string { + p := s.Get(sv) + json, err := s.MarshalToJSON(p) + if err != nil { + panic(errors.Wrapf(err, "marshaling %s: %+v", proto.MessageName(p), p)) + } + return json +} + +// Encoded returns the encoded value of the current value of the setting. +func (s *ProtobufSetting) Encoded(sv *Values) string { + p := s.Get(sv) + return EncodeProtobuf(p) +} + +// EncodedDefault returns the encoded value of the default value of the setting. +func (s *ProtobufSetting) EncodedDefault() string { + return EncodeProtobuf(s.defaultValue) +} + +// DecodeToString decodes and renders an encoded value. +func (s *ProtobufSetting) DecodeToString(encoded string) (string, error) { + message, err := s.DecodeValue(encoded) + if err != nil { + return "", err + } + return s.MarshalToJSON(message) +} + +// Typ returns the short (1 char) string denoting the type of setting. +func (*ProtobufSetting) Typ() string { + return "p" +} + +// DecodeValue decodes the value into a protobuf. +func (s *ProtobufSetting) DecodeValue(encoded string) (protoutil.Message, error) { + p, err := newProtoMessage(proto.MessageName(s.defaultValue)) + if err != nil { + return nil, err + } + + if err := protoutil.Unmarshal([]byte(encoded), p); err != nil { + return nil, err + } + return p, nil +} + +// Default returns default value for setting. +func (s *ProtobufSetting) Default() protoutil.Message { + return s.defaultValue +} + +// Get retrieves the protobuf value in the setting. +func (s *ProtobufSetting) Get(sv *Values) protoutil.Message { + loaded := sv.getGeneric(s.slot) + if loaded == nil { + return s.defaultValue + } + return loaded.(protoutil.Message) +} + +// Validate that a value conforms with the validation function. +func (s *ProtobufSetting) Validate(sv *Values, p protoutil.Message) error { + if s.validateFn == nil { + return nil // nothing to do + } + return s.validateFn(sv, p) +} + +// Override sets the setting to the given value, assuming it passes validation. +func (s *ProtobufSetting) Override(ctx context.Context, sv *Values, p protoutil.Message) { + _ = s.set(ctx, sv, p) +} + +func (s *ProtobufSetting) set(ctx context.Context, sv *Values, p protoutil.Message) error { + if err := s.Validate(sv, p); err != nil { + return err + } + if s.Get(sv) != p { + sv.setGeneric(ctx, s.slot, p) + } + return nil +} + +func (s *ProtobufSetting) setToDefault(ctx context.Context, sv *Values) { + if err := s.set(ctx, sv, s.defaultValue); err != nil { + panic(err) + } +} + +// WithPublic sets public visibility and can be chained. +func (s *ProtobufSetting) WithPublic() *ProtobufSetting { + s.SetVisibility(Public) + return s +} + +// MarshalToJSON returns a JSON representation of the protobuf. +func (s *ProtobufSetting) MarshalToJSON(p protoutil.Message) (string, error) { + jsonEncoder := jsonpb.Marshaler{EmitDefaults: false} + return jsonEncoder.MarshalToString(p) +} + +// UnmarshalFromJSON unmarshals a protobuf from a json representation. +func (s *ProtobufSetting) UnmarshalFromJSON(jsonEncoded string) (protoutil.Message, error) { + p, err := newProtoMessage(proto.MessageName(s.defaultValue)) + if err != nil { + return nil, err + } + + json := &jsonpb.Unmarshaler{} + if err := json.Unmarshal(strings.NewReader(jsonEncoded), p); err != nil { + return nil, errors.Wrapf(err, "unmarshaling json to %s", proto.MessageName(p)) + } + return p, nil +} + +// RegisterProtobufSetting defines a new setting with type protobuf. +func RegisterProtobufSetting( + class Class, key, desc string, defaultValue protoutil.Message, +) *ProtobufSetting { + return RegisterValidatedProtobufSetting(class, key, desc, defaultValue, nil) +} + +// RegisterValidatedProtobufSetting defines a new setting with type protobuf +// with a validation function. +func RegisterValidatedProtobufSetting( + class Class, + key, desc string, + defaultValue protoutil.Message, + validateFn func(*Values, protoutil.Message) error, +) *ProtobufSetting { + if validateFn != nil { + if err := validateFn(nil, defaultValue); err != nil { + panic(errors.Wrap(err, "invalid default")) + } + } + setting := &ProtobufSetting{ + defaultValue: defaultValue, + validateFn: validateFn, + } + + // By default, all protobuf settings are considered to contain PII and are + // thus non-reportable (to exclude them from telemetry reports). + setting.SetReportable(false) + register(class, key, desc, setting) + return setting +} + +// Defeat the unused linter. +var _ = (*ProtobufSetting).Default +var _ = (*ProtobufSetting).WithPublic + +// newProtoMessage creates a new protocol message object, given its fully +// qualified name. +func newProtoMessage(name string) (protoutil.Message, error) { + // Get the reflected type of the protocol message. + rt := proto.MessageType(name) + if rt == nil { + return nil, errors.Newf("unknown proto message type %s", name) + } + + // If the message is known, we should get the pointer to our message. + if rt.Kind() != reflect.Ptr { + return nil, errors.AssertionFailedf( + "expected ptr to message, got %s instead", rt.Kind().String()) + } + + // Construct message of appropriate type, through reflection. + rv := reflect.New(rt.Elem()) + msg, ok := rv.Interface().(protoutil.Message) + if !ok { + // Just to be safe. + return nil, errors.AssertionFailedf( + "unexpected proto type for %s; expected protoutil.Message, got %T", + name, rv.Interface()) + } + return msg, nil +} diff --git a/pkg/settings/updater.go b/pkg/settings/updater.go index 8c1e1d87f952..7850247ee1ba 100644 --- a/pkg/settings/updater.go +++ b/pkg/settings/updater.go @@ -15,7 +15,9 @@ import ( "strconv" "time" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" + "github.com/gogo/protobuf/proto" ) // EncodeDuration encodes a duration in the format of EncodedValue.Value. @@ -38,6 +40,15 @@ func EncodeFloat(f float64) string { return strconv.FormatFloat(f, 'G', -1, 64) } +// EncodeProtobuf encodes a protobuf in the format of EncodedValue.Value. +func EncodeProtobuf(p protoutil.Message) string { + data := make([]byte, p.Size()) + if _, err := p.MarshalTo(data); err != nil { + panic(errors.Wrapf(err, "encoding %s: %+v", proto.MessageName(p), p)) + } + return string(data) +} + type updater struct { sv *Values m map[string]struct{} @@ -91,6 +102,12 @@ func (u updater) Set(ctx context.Context, key string, value EncodedValue) error switch setting := d.(type) { case *StringSetting: return setting.set(ctx, u.sv, value.Value) + case *ProtobufSetting: + p, err := setting.DecodeValue(value.Value) + if err != nil { + return err + } + return setting.set(ctx, u.sv, p) case *BoolSetting: b, err := setting.DecodeValue(value.Value) if err != nil { diff --git a/pkg/spanconfig/spanconfigstore/store.go b/pkg/spanconfig/spanconfigstore/store.go index 6beff543e175..1523f6129ce8 100644 --- a/pkg/spanconfig/spanconfigstore/store.go +++ b/pkg/spanconfig/spanconfigstore/store.go @@ -28,7 +28,8 @@ import ( // using the gossip backed system config span to instead using the span configs // infrastructure. It has no effect if COCKROACH_DISABLE_SPAN_CONFIGS // is set. -// TODO(richardjcai): We can likely remove this. +// +// TODO(irfansharif): We should remove this. var EnabledSetting = settings.RegisterBoolSetting( settings.SystemOnly, "spanconfig.store.enabled", @@ -36,6 +37,15 @@ var EnabledSetting = settings.RegisterBoolSetting( true, ) +// FallbackConfigOverride is a hidden cluster setting to override the fallback +// config used for ranges with no explicit span configs set. +var FallbackConfigOverride = settings.RegisterProtobufSetting( + settings.SystemOnly, + "spanconfig.store.fallback_config_override", + "override the fallback used for ranges with no explicit span configs set", + &roachpb.SpanConfig{}, +) + // Store is an in-memory data structure to store, retrieve, and incrementally // update the span configuration state. Internally, it makes use of an interval // btree based spanConfigStore to store non-overlapping span configurations that @@ -47,17 +57,23 @@ type Store struct { systemSpanConfigStore *systemSpanConfigStore } - // TODO(irfansharif): We're using a static fall back span config here, we - // could instead have this track the host tenant's RANGE DEFAULT config, or - // go a step further and use the tenant's own RANGE DEFAULT instead if the - // key is within the tenant's keyspace. We'd have to thread that through the - // KVAccessor interface by reserving special keys for these default configs. - settings *cluster.Settings + // fallback is the span config we'll fall back on in the absence of // something more specific. + // + // TODO(irfansharif): We're using a static[1] fallback span config here, we + // could instead have this directly track the host tenant's RANGE DEFAULT + // config, or go a step further and use the tenant's own RANGE DEFAULT + // instead if the key is within the tenant's keyspace. We'd have to thread + // that through the KVAccessor interface by reserving special keys for these + // default configs. + // + // [1]: Modulo the private spanconfig.store.fallback_config_override, which + // applies globally. fallback roachpb.SpanConfig - knobs *spanconfig.TestingKnobs + + knobs *spanconfig.TestingKnobs } var _ spanconfig.Store = &Store{} @@ -113,11 +129,18 @@ func (s *Store) getSpanConfigForKeyRLocked( ) (roachpb.SpanConfig, error) { conf, found := s.mu.spanConfigStore.getSpanConfigForKey(ctx, key) if !found { - conf = s.fallback + conf = s.getFallbackConfig() } return s.mu.systemSpanConfigStore.combine(key, conf) } +func (s *Store) getFallbackConfig() roachpb.SpanConfig { + if conf := FallbackConfigOverride.Get(&s.settings.SV).(*roachpb.SpanConfig); !conf.IsEmpty() { + return *conf + } + return s.fallback +} + // Apply is part of the spanconfig.StoreWriter interface. func (s *Store) Apply( ctx context.Context, dryrun bool, updates ...spanconfig.Update, diff --git a/pkg/sql/set_cluster_setting.go b/pkg/sql/set_cluster_setting.go index 1815b908d129..5623cd09c4b5 100644 --- a/pkg/sql/set_cluster_setting.go +++ b/pkg/sql/set_cluster_setting.go @@ -178,7 +178,7 @@ func (p *planner) getAndValidateTypedClusterSetting( var dummyHelper tree.IndexedVarHelper switch setting.(type) { - case *settings.StringSetting, *settings.VersionSetting, *settings.ByteSizeSetting: + case *settings.StringSetting, *settings.VersionSetting, *settings.ByteSizeSetting, *settings.ProtobufSetting: requiredType = types.String case *settings.BoolSetting: requiredType = types.Bool @@ -662,6 +662,18 @@ func toSettingString( return string(*s), nil } return "", errors.Errorf("cannot use %s %T value for string setting", d.ResolvedType(), d) + case *settings.ProtobufSetting: + if s, ok := d.(*tree.DString); ok { + msg, err := setting.UnmarshalFromJSON(string(*s)) + if err != nil { + return "", err + } + if err := setting.Validate(&st.SV, msg); err != nil { + return "", err + } + return settings.EncodeProtobuf(msg), nil + } + return "", errors.Errorf("cannot use %s %T value for protobuf setting", d.ResolvedType(), d) case *settings.VersionSetting: if s, ok := d.(*tree.DString); ok { newRawVal, err := clusterversion.EncodingFromVersionStr(string(*s)) diff --git a/pkg/sql/show_cluster_setting.go b/pkg/sql/show_cluster_setting.go index 466b0e40e961..7b1bf6ecfff8 100644 --- a/pkg/sql/show_cluster_setting.go +++ b/pkg/sql/show_cluster_setting.go @@ -155,7 +155,7 @@ func getShowClusterSettingPlanColumns( switch val.(type) { case *settings.IntSetting: dType = types.Int - case *settings.StringSetting, *settings.ByteSizeSetting, *settings.VersionSetting, *settings.EnumSetting: + case *settings.StringSetting, *settings.ByteSizeSetting, *settings.VersionSetting, *settings.EnumSetting, *settings.ProtobufSetting: dType = types.String case *settings.BoolSetting: dType = types.Bool @@ -197,7 +197,7 @@ func planShowClusterSetting( } d = tree.NewDInt(tree.DInt(v)) case *settings.StringSetting, *settings.EnumSetting, - *settings.ByteSizeSetting, *settings.VersionSetting: + *settings.ByteSizeSetting, *settings.VersionSetting, *settings.ProtobufSetting: v, err := val.DecodeToString(encoded) if err != nil { return nil, err