Skip to content

Commit

Permalink
Safe per tenant overrides loading (#4421)
Browse files Browse the repository at this point in the history
* safe per tenant overrides loading

* exports submap for OverwriteMarshalingStringMap
  • Loading branch information
owen-d authored and Danny Kopping committed Oct 7, 2021
1 parent 9b0f61d commit 36ba602
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 22 deletions.
12 changes: 7 additions & 5 deletions pkg/ruler/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ func newFakeLimits() fakeLimits {
RulerRemoteWriteDisabled: true,
},
additionalHeadersRWTenant: {
RulerRemoteWriteHeaders: map[string]string{
user.OrgIDHeaderName: "overridden",
strings.ToLower(user.OrgIDHeaderName): "overridden-lower",
strings.ToUpper(user.OrgIDHeaderName): "overridden-upper",
"Additional": "Header",
RulerRemoteWriteHeaders: validation.OverwriteMarshalingStringMap{
M: map[string]string{
user.OrgIDHeaderName: "overridden",
strings.ToLower(user.OrgIDHeaderName): "overridden-lower",
strings.ToUpper(user.OrgIDHeaderName): "overridden-upper",
"Additional": "Header",
},
},
},
customRelabelsTenant: {
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/flagext/bytesize.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ func (bs *ByteSize) UnmarshalYAML(unmarshal func(interface{}) error) error {
return bs.Set(str)
}

// MarshalYAML implements yaml.Marshaller.
// Use a string representation for consistency
func (bs *ByteSize) MarshalYAML() (interface{}, error) {
return bs.String(), nil
}

// UnmarshalJSON implements json.Unmarsal interface to work with JSON.
func (bs *ByteSize) UnmarshalJSON(val []byte) error {
var str string
Expand All @@ -56,3 +62,8 @@ func (bs *ByteSize) UnmarshalJSON(val []byte) error {

return bs.Set(str)
}

// Use a string representation for consistency
func (bs *ByteSize) MarshalJSON() ([]byte, error) {
return json.Marshal(bs.String())
}
88 changes: 71 additions & 17 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package validation

import (
"encoding/json"
"flag"
"fmt"
"strconv"
"time"

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"golang.org/x/time/rate"
"gopkg.in/yaml.v2"

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/ruler/util"
Expand Down Expand Up @@ -81,23 +84,23 @@ type Limits struct {

// this field is the inversion of the general remote_write.enabled because the zero value of a boolean is false,
// and if it were ruler_remote_write_enabled, it would be impossible to know if the value was explicitly set or default
RulerRemoteWriteDisabled bool `yaml:"ruler_remote_write_disabled" json:"ruler_remote_write_disabled"`
RulerRemoteWriteURL string `yaml:"ruler_remote_write_url" json:"ruler_remote_write_url"`
RulerRemoteWriteTimeout time.Duration `yaml:"ruler_remote_write_timeout" json:"ruler_remote_write_timeout"`
RulerRemoteWriteHeaders map[string]string `yaml:"ruler_remote_write_headers" json:"ruler_remote_write_headers"`
RulerRemoteWriteRelabelConfigs []*util.RelabelConfig `yaml:"ruler_remote_write_relabel_configs" json:"ruler_remote_write_relabel_configs"`
RulerRemoteWriteQueueCapacity int `yaml:"ruler_remote_write_queue_capacity" json:"ruler_remote_write_queue_capacity"`
RulerRemoteWriteQueueMinShards int `yaml:"ruler_remote_write_queue_min_shards" json:"ruler_remote_write_queue_min_shards"`
RulerRemoteWriteQueueMaxShards int `yaml:"ruler_remote_write_queue_max_shards" json:"ruler_remote_write_queue_max_shards"`
RulerRemoteWriteQueueMaxSamplesPerSend int `yaml:"ruler_remote_write_queue_max_samples_per_send" json:"ruler_remote_write_queue_max_samples_per_send"`
RulerRemoteWriteQueueBatchSendDeadline time.Duration `yaml:"ruler_remote_write_queue_batch_send_deadline" json:"ruler_remote_write_queue_batch_send_deadline"`
RulerRemoteWriteQueueMinBackoff time.Duration `yaml:"ruler_remote_write_queue_min_backoff" json:"ruler_remote_write_queue_min_backoff"`
RulerRemoteWriteQueueMaxBackoff time.Duration `yaml:"ruler_remote_write_queue_max_backoff" json:"ruler_remote_write_queue_max_backoff"`
RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"`
RulerRemoteWriteDisabled bool `yaml:"ruler_remote_write_disabled" json:"ruler_remote_write_disabled"`
RulerRemoteWriteURL string `yaml:"ruler_remote_write_url" json:"ruler_remote_write_url"`
RulerRemoteWriteTimeout time.Duration `yaml:"ruler_remote_write_timeout" json:"ruler_remote_write_timeout"`
RulerRemoteWriteHeaders OverwriteMarshalingStringMap `yaml:"ruler_remote_write_headers" json:"ruler_remote_write_headers"`
RulerRemoteWriteRelabelConfigs []*util.RelabelConfig `yaml:"ruler_remote_write_relabel_configs,omitempty" json:"ruler_remote_write_relabel_configs,omitempty"`
RulerRemoteWriteQueueCapacity int `yaml:"ruler_remote_write_queue_capacity" json:"ruler_remote_write_queue_capacity"`
RulerRemoteWriteQueueMinShards int `yaml:"ruler_remote_write_queue_min_shards" json:"ruler_remote_write_queue_min_shards"`
RulerRemoteWriteQueueMaxShards int `yaml:"ruler_remote_write_queue_max_shards" json:"ruler_remote_write_queue_max_shards"`
RulerRemoteWriteQueueMaxSamplesPerSend int `yaml:"ruler_remote_write_queue_max_samples_per_send" json:"ruler_remote_write_queue_max_samples_per_send"`
RulerRemoteWriteQueueBatchSendDeadline time.Duration `yaml:"ruler_remote_write_queue_batch_send_deadline" json:"ruler_remote_write_queue_batch_send_deadline"`
RulerRemoteWriteQueueMinBackoff time.Duration `yaml:"ruler_remote_write_queue_min_backoff" json:"ruler_remote_write_queue_min_backoff"`
RulerRemoteWriteQueueMaxBackoff time.Duration `yaml:"ruler_remote_write_queue_max_backoff" json:"ruler_remote_write_queue_max_backoff"`
RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"`

// Global and per tenant retention
RetentionPeriod model.Duration `yaml:"retention_period" json:"retention_period"`
StreamRetention []StreamRetention `yaml:"retention_stream" json:"retention_stream"`
StreamRetention []StreamRetention `yaml:"retention_stream,omitempty" json:"retention_stream,omitempty"`

// Config for overrides, convenient if it goes here.
PerTenantOverrideConfig string `yaml:"per_tenant_override_config" json:"per_tenant_override_config"`
Expand Down Expand Up @@ -179,12 +182,18 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error {
// We want to set c to the defaults and then overwrite it with the input.
// To make unmarshal fill the plain data struct rather than calling UnmarshalYAML
// again, we have to hide it using a type indirection. See prometheus/config.
type plain Limits

// During startup we wont have a default value so we don't want to overwrite them
if defaultLimits != nil {
*l = *defaultLimits
b, err := yaml.Marshal(defaultLimits)
if err != nil {
return errors.Wrap(err, "cloning limits (marshaling)")
}
if err := yaml.Unmarshal(b, (*plain)(l)); err != nil {
return errors.Wrap(err, "cloning limits (unmarshaling)")
}
}
type plain Limits
return unmarshal((*plain)(l))
}

Expand Down Expand Up @@ -434,7 +443,7 @@ func (o *Overrides) RulerRemoteWriteTimeout(userID string) time.Duration {

// RulerRemoteWriteHeaders returns the headers to use in a remote-write for a given user.
func (o *Overrides) RulerRemoteWriteHeaders(userID string) map[string]string {
return o.getOverridesForUser(userID).RulerRemoteWriteHeaders
return o.getOverridesForUser(userID).RulerRemoteWriteHeaders.Map()
}

// RulerRemoteWriteRelabelConfigs returns the write relabel configs to use in a remote-write for a given user.
Expand Down Expand Up @@ -522,3 +531,48 @@ func (o *Overrides) getOverridesForUser(userID string) *Limits {
}
return o.defaultLimits
}

// OverwriteMarshalingStringMap will overwrite the src map when unmarshaling
// as opposed to merging.
type OverwriteMarshalingStringMap struct {
M map[string]string
}

func (sm *OverwriteMarshalingStringMap) Map() map[string]string {
return sm.M
}

// MarshalJSON explicitly uses the the type receiver and not pointer receiver
// or it won't be called
func (sm OverwriteMarshalingStringMap) MarshalJSON() ([]byte, error) {
return json.Marshal(sm.M)
}

func (sm *OverwriteMarshalingStringMap) UnmarshalJSON(val []byte) error {
var def map[string]string
if err := json.Unmarshal(val, &def); err != nil {
return err
}
sm.M = def

return nil

}

// MarshalYAML explicitly uses the the type receiver and not pointer receiver
// or it won't be called
func (sm OverwriteMarshalingStringMap) MarshalYAML() (interface{}, error) {
return sm.M, nil
}

func (sm *OverwriteMarshalingStringMap) UnmarshalYAML(unmarshal func(interface{}) error) error {
var def map[string]string

err := unmarshal(&def)
if err != nil {
return err
}
sm.M = def

return nil
}
134 changes: 134 additions & 0 deletions pkg/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"encoding/json"
"reflect"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -109,3 +111,135 @@ per_tenant_override_period: 230s

assert.Equal(t, limitsYAML, limitsJSON)
}

func TestOverwriteMarshalingStringMapJSON(t *testing.T) {
m := OverwriteMarshalingStringMap{
M: map[string]string{"foo": "bar"},
}

require.Nil(t, json.Unmarshal([]byte(`{"bazz": "buzz"}`), &m))
require.Equal(t, map[string]string{"bazz": "buzz"}, m.Map())
out, err := json.Marshal(m)
require.Nil(t, err)
var back OverwriteMarshalingStringMap
require.Nil(t, json.Unmarshal(out, &back))
require.Equal(t, m, back)
}

func TestOverwriteMarshalingStringMapYAML(t *testing.T) {
m := OverwriteMarshalingStringMap{
M: map[string]string{"foo": "bar"},
}

require.Nil(t, yaml.Unmarshal([]byte(`{"bazz": "buzz"}`), &m))
require.Equal(t, map[string]string{"bazz": "buzz"}, m.Map())
out, err := yaml.Marshal(m)
require.Nil(t, err)
var back OverwriteMarshalingStringMap
require.Nil(t, yaml.Unmarshal(out, &back))
require.Equal(t, m, back)
}

func TestLimitsDoesNotMutate(t *testing.T) {
initialDefault := defaultLimits
defer func() {
defaultLimits = initialDefault
}()

// Set new defaults with non-nil values for non-scalar types
newDefaults := Limits{
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
StreamRetention: []StreamRetention{
{
Period: model.Duration(24 * time.Hour),
Selector: `{a="b"}`,
},
},
}
SetDefaultLimitsForYAMLUnmarshalling(newDefaults)

for _, tc := range []struct {
desc string
yaml string
exp Limits
}{
{
desc: "map",
yaml: `
ruler_remote_write_headers:
foo: "bar"
`,
exp: Limits{
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"foo": "bar"}},

// Rest from new defaults
StreamRetention: []StreamRetention{
{
Period: model.Duration(24 * time.Hour),
Selector: `{a="b"}`,
},
},
},
},
{
desc: "empty map overrides defaults",
yaml: `
ruler_remote_write_headers:
`,
exp: Limits{

// Rest from new defaults
StreamRetention: []StreamRetention{
{
Period: model.Duration(24 * time.Hour),
Selector: `{a="b"}`,
},
},
},
},
{
desc: "slice",
yaml: `
retention_stream:
- period: '24h'
selector: '{foo="bar"}'
`,
exp: Limits{
StreamRetention: []StreamRetention{
{
Period: model.Duration(24 * time.Hour),
Selector: `{foo="bar"}`,
},
},

// Rest from new defaults
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
},
},
{
desc: "scalar field",
yaml: `
reject_old_samples: true
`,
exp: Limits{
RejectOldSamples: true,

// Rest from new defaults
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
StreamRetention: []StreamRetention{
{
Period: model.Duration(24 * time.Hour),
Selector: `{a="b"}`,
},
},
},
},
} {

t.Run(tc.desc, func(t *testing.T) {
var out Limits
require.Nil(t, yaml.UnmarshalStrict([]byte(tc.yaml), &out))
require.Equal(t, tc.exp, out)
})
}
}

0 comments on commit 36ba602

Please sign in to comment.