From 8b6d456b1c2319642913499ab34ac8a18dae7ae5 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 6 Oct 2021 16:24:13 -0400 Subject: [PATCH 1/2] safe per tenant overrides loading --- pkg/util/flagext/bytesize.go | 11 +++ pkg/validation/limits.go | 88 +++++++++++++++++----- pkg/validation/limits_test.go | 134 ++++++++++++++++++++++++++++++++++ 3 files changed, 216 insertions(+), 17 deletions(-) diff --git a/pkg/util/flagext/bytesize.go b/pkg/util/flagext/bytesize.go index 0d59a6aba271..4195fd875db7 100644 --- a/pkg/util/flagext/bytesize.go +++ b/pkg/util/flagext/bytesize.go @@ -46,6 +46,12 @@ func (bs *ByteSize) UnmarshalYAML(unmarshal func(interface{}) error) error { return bs.Set(str) } +// MarshalYAML implements yaml.Marshaller. +// Use a string representation for consistency +func (bs *ByteSize) MarshalYAML() (interface{}, error) { + return bs.String(), nil +} + // UnmarshalJSON implements json.Unmarsal interface to work with JSON. func (bs *ByteSize) UnmarshalJSON(val []byte) error { var str string @@ -56,3 +62,8 @@ func (bs *ByteSize) UnmarshalJSON(val []byte) error { return bs.Set(str) } + +// Use a string representation for consistency +func (bs *ByteSize) MarshalJSON() ([]byte, error) { + return json.Marshal(bs.String()) +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index dca5271fda85..21be6abca979 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -1,14 +1,17 @@ package validation import ( + "encoding/json" "flag" "fmt" "strconv" "time" + "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "golang.org/x/time/rate" + "gopkg.in/yaml.v2" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/ruler/util" @@ -81,23 +84,23 @@ type Limits struct { // this field is the inversion of the general remote_write.enabled because the zero value of a boolean is false, // and if it were ruler_remote_write_enabled, it would be impossible to know if the value was explicitly set or default - RulerRemoteWriteDisabled bool `yaml:"ruler_remote_write_disabled" json:"ruler_remote_write_disabled"` - RulerRemoteWriteURL string `yaml:"ruler_remote_write_url" json:"ruler_remote_write_url"` - RulerRemoteWriteTimeout time.Duration `yaml:"ruler_remote_write_timeout" json:"ruler_remote_write_timeout"` - RulerRemoteWriteHeaders map[string]string `yaml:"ruler_remote_write_headers" json:"ruler_remote_write_headers"` - RulerRemoteWriteRelabelConfigs []*util.RelabelConfig `yaml:"ruler_remote_write_relabel_configs" json:"ruler_remote_write_relabel_configs"` - RulerRemoteWriteQueueCapacity int `yaml:"ruler_remote_write_queue_capacity" json:"ruler_remote_write_queue_capacity"` - RulerRemoteWriteQueueMinShards int `yaml:"ruler_remote_write_queue_min_shards" json:"ruler_remote_write_queue_min_shards"` - RulerRemoteWriteQueueMaxShards int `yaml:"ruler_remote_write_queue_max_shards" json:"ruler_remote_write_queue_max_shards"` - RulerRemoteWriteQueueMaxSamplesPerSend int `yaml:"ruler_remote_write_queue_max_samples_per_send" json:"ruler_remote_write_queue_max_samples_per_send"` - RulerRemoteWriteQueueBatchSendDeadline time.Duration `yaml:"ruler_remote_write_queue_batch_send_deadline" json:"ruler_remote_write_queue_batch_send_deadline"` - RulerRemoteWriteQueueMinBackoff time.Duration `yaml:"ruler_remote_write_queue_min_backoff" json:"ruler_remote_write_queue_min_backoff"` - RulerRemoteWriteQueueMaxBackoff time.Duration `yaml:"ruler_remote_write_queue_max_backoff" json:"ruler_remote_write_queue_max_backoff"` - RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"` + RulerRemoteWriteDisabled bool `yaml:"ruler_remote_write_disabled" json:"ruler_remote_write_disabled"` + RulerRemoteWriteURL string `yaml:"ruler_remote_write_url" json:"ruler_remote_write_url"` + RulerRemoteWriteTimeout time.Duration `yaml:"ruler_remote_write_timeout" json:"ruler_remote_write_timeout"` + RulerRemoteWriteHeaders OverwriteMarshalingStringMap `yaml:"ruler_remote_write_headers" json:"ruler_remote_write_headers"` + RulerRemoteWriteRelabelConfigs []*util.RelabelConfig `yaml:"ruler_remote_write_relabel_configs,omitempty" json:"ruler_remote_write_relabel_configs,omitempty"` + RulerRemoteWriteQueueCapacity int `yaml:"ruler_remote_write_queue_capacity" json:"ruler_remote_write_queue_capacity"` + RulerRemoteWriteQueueMinShards int `yaml:"ruler_remote_write_queue_min_shards" json:"ruler_remote_write_queue_min_shards"` + RulerRemoteWriteQueueMaxShards int `yaml:"ruler_remote_write_queue_max_shards" json:"ruler_remote_write_queue_max_shards"` + RulerRemoteWriteQueueMaxSamplesPerSend int `yaml:"ruler_remote_write_queue_max_samples_per_send" json:"ruler_remote_write_queue_max_samples_per_send"` + RulerRemoteWriteQueueBatchSendDeadline time.Duration `yaml:"ruler_remote_write_queue_batch_send_deadline" json:"ruler_remote_write_queue_batch_send_deadline"` + RulerRemoteWriteQueueMinBackoff time.Duration `yaml:"ruler_remote_write_queue_min_backoff" json:"ruler_remote_write_queue_min_backoff"` + RulerRemoteWriteQueueMaxBackoff time.Duration `yaml:"ruler_remote_write_queue_max_backoff" json:"ruler_remote_write_queue_max_backoff"` + RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"` // Global and per tenant retention RetentionPeriod model.Duration `yaml:"retention_period" json:"retention_period"` - StreamRetention []StreamRetention `yaml:"retention_stream" json:"retention_stream"` + StreamRetention []StreamRetention `yaml:"retention_stream,omitempty" json:"retention_stream,omitempty"` // Config for overrides, convenient if it goes here. PerTenantOverrideConfig string `yaml:"per_tenant_override_config" json:"per_tenant_override_config"` @@ -179,12 +182,18 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error { // We want to set c to the defaults and then overwrite it with the input. // To make unmarshal fill the plain data struct rather than calling UnmarshalYAML // again, we have to hide it using a type indirection. See prometheus/config. + type plain Limits // During startup we wont have a default value so we don't want to overwrite them if defaultLimits != nil { - *l = *defaultLimits + b, err := yaml.Marshal(defaultLimits) + if err != nil { + return errors.Wrap(err, "cloning limits (marshaling)") + } + if err := yaml.Unmarshal(b, (*plain)(l)); err != nil { + return errors.Wrap(err, "cloning limits (unmarshaling)") + } } - type plain Limits return unmarshal((*plain)(l)) } @@ -434,7 +443,7 @@ func (o *Overrides) RulerRemoteWriteTimeout(userID string) time.Duration { // RulerRemoteWriteHeaders returns the headers to use in a remote-write for a given user. func (o *Overrides) RulerRemoteWriteHeaders(userID string) map[string]string { - return o.getOverridesForUser(userID).RulerRemoteWriteHeaders + return o.getOverridesForUser(userID).RulerRemoteWriteHeaders.Map() } // RulerRemoteWriteRelabelConfigs returns the write relabel configs to use in a remote-write for a given user. @@ -522,3 +531,48 @@ func (o *Overrides) getOverridesForUser(userID string) *Limits { } return o.defaultLimits } + +// OverwriteMarshalingStringMap will overwrite the src map when unmarshaling +// as opposed to merging. +type OverwriteMarshalingStringMap struct { + m map[string]string +} + +func (sm *OverwriteMarshalingStringMap) Map() map[string]string { + return sm.m +} + +// MarshalJSON explicitly uses the the type receiver and not pointer receiver +// or it won't be called +func (sm OverwriteMarshalingStringMap) MarshalJSON() ([]byte, error) { + return json.Marshal(sm.m) +} + +func (sm *OverwriteMarshalingStringMap) UnmarshalJSON(val []byte) error { + var def map[string]string + if err := json.Unmarshal(val, &def); err != nil { + return err + } + sm.m = def + + return nil + +} + +// MarshalYAML explicitly uses the the type receiver and not pointer receiver +// or it won't be called +func (sm OverwriteMarshalingStringMap) MarshalYAML() (interface{}, error) { + return sm.m, nil +} + +func (sm *OverwriteMarshalingStringMap) UnmarshalYAML(unmarshal func(interface{}) error) error { + var def map[string]string + + err := unmarshal(&def) + if err != nil { + return err + } + sm.m = def + + return nil +} diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 727f86b92413..789bca0aee4d 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -4,7 +4,9 @@ import ( "encoding/json" "reflect" "testing" + "time" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" @@ -109,3 +111,135 @@ per_tenant_override_period: 230s assert.Equal(t, limitsYAML, limitsJSON) } + +func TestOverwriteMarshalingStringMapJSON(t *testing.T) { + m := OverwriteMarshalingStringMap{ + m: map[string]string{"foo": "bar"}, + } + + require.Nil(t, json.Unmarshal([]byte(`{"bazz": "buzz"}`), &m)) + require.Equal(t, map[string]string{"bazz": "buzz"}, m.Map()) + out, err := json.Marshal(m) + require.Nil(t, err) + var back OverwriteMarshalingStringMap + require.Nil(t, json.Unmarshal(out, &back)) + require.Equal(t, m, back) +} + +func TestOverwriteMarshalingStringMapYAML(t *testing.T) { + m := OverwriteMarshalingStringMap{ + m: map[string]string{"foo": "bar"}, + } + + require.Nil(t, yaml.Unmarshal([]byte(`{"bazz": "buzz"}`), &m)) + require.Equal(t, map[string]string{"bazz": "buzz"}, m.Map()) + out, err := yaml.Marshal(m) + require.Nil(t, err) + var back OverwriteMarshalingStringMap + require.Nil(t, yaml.Unmarshal(out, &back)) + require.Equal(t, m, back) +} + +func TestLimitsDoesNotMutate(t *testing.T) { + initialDefault := defaultLimits + defer func() { + defaultLimits = initialDefault + }() + + // Set new defaults with non-nil values for non-scalar types + newDefaults := Limits{ + RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, + StreamRetention: []StreamRetention{ + { + Period: model.Duration(24 * time.Hour), + Selector: `{a="b"}`, + }, + }, + } + SetDefaultLimitsForYAMLUnmarshalling(newDefaults) + + for _, tc := range []struct { + desc string + yaml string + exp Limits + }{ + { + desc: "map", + yaml: ` +ruler_remote_write_headers: + foo: "bar" +`, + exp: Limits{ + RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"foo": "bar"}}, + + // Rest from new defaults + StreamRetention: []StreamRetention{ + { + Period: model.Duration(24 * time.Hour), + Selector: `{a="b"}`, + }, + }, + }, + }, + { + desc: "empty map overrides defaults", + yaml: ` +ruler_remote_write_headers: +`, + exp: Limits{ + + // Rest from new defaults + StreamRetention: []StreamRetention{ + { + Period: model.Duration(24 * time.Hour), + Selector: `{a="b"}`, + }, + }, + }, + }, + { + desc: "slice", + yaml: ` +retention_stream: + - period: '24h' + selector: '{foo="bar"}' +`, + exp: Limits{ + StreamRetention: []StreamRetention{ + { + Period: model.Duration(24 * time.Hour), + Selector: `{foo="bar"}`, + }, + }, + + // Rest from new defaults + RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, + }, + }, + { + desc: "scalar field", + yaml: ` +reject_old_samples: true +`, + exp: Limits{ + RejectOldSamples: true, + + // Rest from new defaults + RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, + StreamRetention: []StreamRetention{ + { + Period: model.Duration(24 * time.Hour), + Selector: `{a="b"}`, + }, + }, + }, + }, + } { + + t.Run(tc.desc, func(t *testing.T) { + var out Limits + require.Nil(t, yaml.UnmarshalStrict([]byte(tc.yaml), &out)) + require.Equal(t, tc.exp, out) + }) + } +} From da66a18148284678ab3c4fd988317a753982bdd8 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 6 Oct 2021 16:50:42 -0400 Subject: [PATCH 2/2] exports submap for OverwriteMarshalingStringMap --- pkg/ruler/registry_test.go | 12 +++++++----- pkg/validation/limits.go | 12 ++++++------ pkg/validation/limits_test.go | 4 ++-- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/pkg/ruler/registry_test.go b/pkg/ruler/registry_test.go index 4c8cb116a4e2..e180b7ae9c51 100644 --- a/pkg/ruler/registry_test.go +++ b/pkg/ruler/registry_test.go @@ -43,11 +43,13 @@ func newFakeLimits() fakeLimits { RulerRemoteWriteDisabled: true, }, additionalHeadersRWTenant: { - RulerRemoteWriteHeaders: map[string]string{ - user.OrgIDHeaderName: "overridden", - strings.ToLower(user.OrgIDHeaderName): "overridden-lower", - strings.ToUpper(user.OrgIDHeaderName): "overridden-upper", - "Additional": "Header", + RulerRemoteWriteHeaders: validation.OverwriteMarshalingStringMap{ + M: map[string]string{ + user.OrgIDHeaderName: "overridden", + strings.ToLower(user.OrgIDHeaderName): "overridden-lower", + strings.ToUpper(user.OrgIDHeaderName): "overridden-upper", + "Additional": "Header", + }, }, }, customRelabelsTenant: { diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 21be6abca979..8b3397ab1819 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -535,17 +535,17 @@ func (o *Overrides) getOverridesForUser(userID string) *Limits { // OverwriteMarshalingStringMap will overwrite the src map when unmarshaling // as opposed to merging. type OverwriteMarshalingStringMap struct { - m map[string]string + M map[string]string } func (sm *OverwriteMarshalingStringMap) Map() map[string]string { - return sm.m + return sm.M } // MarshalJSON explicitly uses the the type receiver and not pointer receiver // or it won't be called func (sm OverwriteMarshalingStringMap) MarshalJSON() ([]byte, error) { - return json.Marshal(sm.m) + return json.Marshal(sm.M) } func (sm *OverwriteMarshalingStringMap) UnmarshalJSON(val []byte) error { @@ -553,7 +553,7 @@ func (sm *OverwriteMarshalingStringMap) UnmarshalJSON(val []byte) error { if err := json.Unmarshal(val, &def); err != nil { return err } - sm.m = def + sm.M = def return nil @@ -562,7 +562,7 @@ func (sm *OverwriteMarshalingStringMap) UnmarshalJSON(val []byte) error { // MarshalYAML explicitly uses the the type receiver and not pointer receiver // or it won't be called func (sm OverwriteMarshalingStringMap) MarshalYAML() (interface{}, error) { - return sm.m, nil + return sm.M, nil } func (sm *OverwriteMarshalingStringMap) UnmarshalYAML(unmarshal func(interface{}) error) error { @@ -572,7 +572,7 @@ func (sm *OverwriteMarshalingStringMap) UnmarshalYAML(unmarshal func(interface{} if err != nil { return err } - sm.m = def + sm.M = def return nil } diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 789bca0aee4d..507f5ea71bff 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -114,7 +114,7 @@ per_tenant_override_period: 230s func TestOverwriteMarshalingStringMapJSON(t *testing.T) { m := OverwriteMarshalingStringMap{ - m: map[string]string{"foo": "bar"}, + M: map[string]string{"foo": "bar"}, } require.Nil(t, json.Unmarshal([]byte(`{"bazz": "buzz"}`), &m)) @@ -128,7 +128,7 @@ func TestOverwriteMarshalingStringMapJSON(t *testing.T) { func TestOverwriteMarshalingStringMapYAML(t *testing.T) { m := OverwriteMarshalingStringMap{ - m: map[string]string{"foo": "bar"}, + M: map[string]string{"foo": "bar"}, } require.Nil(t, yaml.Unmarshal([]byte(`{"bazz": "buzz"}`), &m))