Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor configmapprovider.Provider and Retrieved interfaces #4403

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions config/configmapprovider/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func TestDefaultMapProvider(t *testing.T) {
mp := NewDefault("testdata/default-config.yaml", nil)
retr, err := mp.Retrieve(context.Background())
retr, err := mp.Retrieve(context.Background(), nil)
require.NoError(t, err)

expectedMap, err := config.NewMapFromBuffer(strings.NewReader(`
Expand All @@ -37,14 +37,16 @@ exporters:
otlp:
endpoint: "localhost:4317"`))
require.NoError(t, err)
assert.Equal(t, expectedMap, retr.Get())
m, err := retr.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, expectedMap, m)

assert.NoError(t, mp.Close(context.Background()))
assert.NoError(t, mp.Shutdown(context.Background()))
}

func TestDefaultMapProvider_AddNewConfig(t *testing.T) {
mp := NewDefault("testdata/default-config.yaml", []string{"processors.batch.timeout=2s"})
cp, err := mp.Retrieve(context.Background())
cp, err := mp.Retrieve(context.Background(), nil)
require.NoError(t, err)

expectedMap, err := config.NewMapFromBuffer(strings.NewReader(`
Expand All @@ -55,16 +57,18 @@ exporters:
otlp:
endpoint: "localhost:4317"`))
require.NoError(t, err)
assert.Equal(t, expectedMap, cp.Get())
m, err := cp.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, expectedMap, m)

assert.NoError(t, mp.Close(context.Background()))
assert.NoError(t, mp.Shutdown(context.Background()))
}

func TestDefaultMapProvider_OverwriteConfig(t *testing.T) {
mp := NewDefault(
"testdata/default-config.yaml",
[]string{"processors.batch.timeout=2s", "exporters.otlp.endpoint=localhost:1234"})
cp, err := mp.Retrieve(context.Background())
cp, err := mp.Retrieve(context.Background(), nil)
require.NoError(t, err)

expectedMap, err := config.NewMapFromBuffer(strings.NewReader(`
Expand All @@ -75,24 +79,26 @@ exporters:
otlp:
endpoint: "localhost:1234"`))
require.NoError(t, err)
assert.Equal(t, expectedMap, cp.Get())
m, err := cp.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, expectedMap, m)

assert.NoError(t, mp.Close(context.Background()))
assert.NoError(t, mp.Shutdown(context.Background()))
}

func TestDefaultMapProvider_InexistentFile(t *testing.T) {
mp := NewDefault("testdata/otelcol-config.yaml", nil)
require.NotNil(t, mp)
_, err := mp.Retrieve(context.Background())
_, err := mp.Retrieve(context.Background(), nil)
require.Error(t, err)

assert.NoError(t, mp.Close(context.Background()))
assert.NoError(t, mp.Shutdown(context.Background()))
}

func TestDefaultMapProvider_EmptyFileName(t *testing.T) {
mp := NewDefault("", nil)
_, err := mp.Retrieve(context.Background())
_, err := mp.Retrieve(context.Background(), nil)
require.Error(t, err)

assert.NoError(t, mp.Close(context.Background()))
assert.NoError(t, mp.Shutdown(context.Background()))
}
13 changes: 8 additions & 5 deletions config/configmapprovider/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,23 @@ func NewExpand(base Provider) Provider {
}
}

func (emp *expandMapProvider) Retrieve(ctx context.Context) (Retrieved, error) {
retr, err := emp.base.Retrieve(ctx)
func (emp *expandMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
retr, err := emp.base.Retrieve(ctx, onChange)
if err != nil {
return nil, err
}
cfgMap, err := retr.Get(ctx)
if err != nil {
return nil, err
}
cfgMap := retr.Get()
for _, k := range cfgMap.AllKeys() {
cfgMap.Set(k, expandStringValues(cfgMap.Get(k)))
}
return &simpleRetrieved{confMap: cfgMap}, nil
}

func (emp *expandMapProvider) Close(ctx context.Context) error {
return emp.base.Close(ctx)
func (emp *expandMapProvider) Shutdown(ctx context.Context) error {
return emp.base.Shutdown(ctx)
}

func expandStringValues(value interface{}) interface{} {
Expand Down
12 changes: 8 additions & 4 deletions config/configmapprovider/expand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ func TestExpand(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
// Retrieve the config
emp := NewExpand(NewFile(path.Join("testdata", test.name)))
cp, err := emp.Retrieve(context.Background())
cp, err := emp.Retrieve(context.Background(), nil)
require.NoError(t, err, "Unable to get config")

// Test that expanded configs are the same with the simple config with no env vars.
assert.Equal(t, expectedCfgMap.ToStringMap(), cp.Get().ToStringMap())
m, err := cp.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, expectedCfgMap.ToStringMap(), m.ToStringMap())
})
}
}
Expand All @@ -75,7 +77,7 @@ func TestExpand_EscapedEnvVars(t *testing.T) {

// Retrieve the config
emp := NewExpand(NewFile(path.Join("testdata", "expand-escaped-env.yaml")))
cp, err := emp.Retrieve(context.Background())
cp, err := emp.Retrieve(context.Background(), nil)
require.NoError(t, err, "Unable to get config")

expectedMap := map[string]interface{}{
Expand All @@ -95,5 +97,7 @@ func TestExpand_EscapedEnvVars(t *testing.T) {
// escaped $ alone
"recv.7": "$",
}}
assert.Equal(t, expectedMap, cp.Get().ToStringMap())
m, err := cp.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, expectedMap, m.ToStringMap())
}
4 changes: 2 additions & 2 deletions config/configmapprovider/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewFile(fileName string) Provider {
}
}

func (fmp *fileMapProvider) Retrieve(context.Context) (Retrieved, error) {
func (fmp *fileMapProvider) Retrieve(_ context.Context, _ func(*ChangeEvent)) (Retrieved, error) {
if fmp.fileName == "" {
return nil, errors.New("config file not specified")
}
Expand All @@ -46,6 +46,6 @@ func (fmp *fileMapProvider) Retrieve(context.Context) (Retrieved, error) {
return &simpleRetrieved{confMap: cp}, nil
}

func (*fileMapProvider) Close(context.Context) error {
func (*fileMapProvider) Shutdown(context.Context) error {
return nil
}
4 changes: 2 additions & 2 deletions config/configmapprovider/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ func NewInMemory(buf io.Reader) Provider {
return &inMemoryMapProvider{buf: buf}
}

func (inp *inMemoryMapProvider) Retrieve(context.Context) (Retrieved, error) {
func (inp *inMemoryMapProvider) Retrieve(_ context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
cfg, err := config.NewMapFromBuffer(inp.buf)
if err != nil {
return nil, err
}
return &simpleRetrieved{confMap: cfg}, nil
}

func (inp *inMemoryMapProvider) Close(context.Context) error {
func (inp *inMemoryMapProvider) Shutdown(context.Context) error {
return nil
}
14 changes: 9 additions & 5 deletions config/configmapprovider/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,28 @@ func NewMerge(ps ...Provider) Provider {
return &mergeMapProvider{providers: ps}
}

func (mp *mergeMapProvider) Retrieve(ctx context.Context) (Retrieved, error) {
func (mp *mergeMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
retCfgMap := config.NewMap()
for _, p := range mp.providers {
retr, err := p.Retrieve(ctx)
retr, err := p.Retrieve(ctx, onChange)
if err != nil {
return nil, err
}
if err = retCfgMap.Merge(retr.Get()); err != nil {
cfgMap, err := retr.Get(ctx)
if err != nil {
return nil, err
}
if err = retCfgMap.Merge(cfgMap); err != nil {
return nil, err
}
}
return &simpleRetrieved{confMap: retCfgMap}, nil
}

func (mp *mergeMapProvider) Close(ctx context.Context) error {
func (mp *mergeMapProvider) Shutdown(ctx context.Context) error {
var errs error
for _, p := range mp.providers {
errs = multierr.Append(errs, p.Close(ctx))
errs = multierr.Append(errs, p.Shutdown(ctx))
}

return errs
Expand Down
8 changes: 4 additions & 4 deletions config/configmapprovider/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,28 @@ import (
func TestMerge_GetError(t *testing.T) {
pl := NewMerge(&errProvider{err: nil}, &errProvider{errors.New("my error")})
require.NotNil(t, pl)
cp, err := pl.Retrieve(context.Background())
cp, err := pl.Retrieve(context.Background(), nil)
assert.Error(t, err)
assert.Nil(t, cp)
}

func TestMerge_CloseError(t *testing.T) {
pl := NewMerge(&errProvider{err: nil}, &errProvider{errors.New("my error")})
require.NotNil(t, pl)
assert.Error(t, pl.Close(context.Background()))
assert.Error(t, pl.Shutdown(context.Background()))
}

type errProvider struct {
err error
}

func (epl *errProvider) Retrieve(context.Context) (Retrieved, error) {
func (epl *errProvider) Retrieve(context.Context, func(*ChangeEvent)) (Retrieved, error) {
if epl.err == nil {
return &simpleRetrieved{confMap: config.NewMap()}, nil
}
return nil, epl.err
}

func (epl *errProvider) Close(context.Context) error {
func (epl *errProvider) Shutdown(context.Context) error {
return epl.err
}
4 changes: 2 additions & 2 deletions config/configmapprovider/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewProperties(properties []string) Provider {
}
}

func (pmp *propertiesMapProvider) Retrieve(context.Context) (Retrieved, error) {
func (pmp *propertiesMapProvider) Retrieve(_ context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
if len(pmp.properties) == 0 {
return &simpleRetrieved{confMap: config.NewMap()}, nil
}
Expand Down Expand Up @@ -70,6 +70,6 @@ func (pmp *propertiesMapProvider) Retrieve(context.Context) (Retrieved, error) {
return &simpleRetrieved{confMap: config.NewMapFromStringMap(prop)}, nil
}

func (*propertiesMapProvider) Close(context.Context) error {
func (*propertiesMapProvider) Shutdown(context.Context) error {
return nil
}
14 changes: 8 additions & 6 deletions config/configmapprovider/properties_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,25 @@ func TestPropertiesProvider(t *testing.T) {
}

pmp := NewProperties(setFlagStr)
retr, err := pmp.Retrieve(context.Background())
retr, err := pmp.Retrieve(context.Background(), nil)
require.NoError(t, err)
cfgMap, err := retr.Get(context.Background())
require.NoError(t, err)
cfgMap := retr.Get()
keys := cfgMap.AllKeys()
assert.Len(t, keys, 4)
assert.Equal(t, "2s", cfgMap.Get("processors::batch::timeout"))
assert.Equal(t, "3s", cfgMap.Get("processors::batch/foo::timeout"))
assert.Equal(t, "foo:9200,foo2:9200", cfgMap.Get("exporters::kafka::brokers"))
assert.Equal(t, "localhost:1818", cfgMap.Get("receivers::otlp::protocols::grpc::endpoint"))
require.NoError(t, pmp.Close(context.Background()))
require.NoError(t, pmp.Shutdown(context.Background()))
}

func TestPropertiesProvider_empty(t *testing.T) {
pmp := NewProperties(nil)
retr, err := pmp.Retrieve(context.Background())
retr, err := pmp.Retrieve(context.Background(), nil)
require.NoError(t, err)
cfgMap, err := retr.Get(context.Background())
require.NoError(t, err)
cfgMap := retr.Get()
assert.Equal(t, 0, len(cfgMap.AllKeys()))
require.NoError(t, pmp.Close(context.Background()))
require.NoError(t, pmp.Shutdown(context.Background()))
}
Loading