From 202c03010605012f00b01c45ff90b1a0afd46c01 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 30 Jan 2023 17:24:22 -0800 Subject: [PATCH] Add support for concatenating multiple embedded uris, or uris with other string parts (#7055) Signed-off-by: Bogdan Drutu --- .chloggen/supportconcat.yaml | 11 ++ confmap/provider_test.go | 5 + confmap/resolver.go | 88 ++++++++++++---- confmap/resolver_test.go | 196 +++++++++++++++++++++++++++++++---- 4 files changed, 257 insertions(+), 43 deletions(-) create mode 100755 .chloggen/supportconcat.yaml diff --git a/.chloggen/supportconcat.yaml b/.chloggen/supportconcat.yaml new file mode 100755 index 00000000000..cad9e32b4a4 --- /dev/null +++ b/.chloggen/supportconcat.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: confmap + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support to resolve embedded uris inside a string, concatenate results. + +# One or more tracking issues or pull requests related to the change +issues: [6932] diff --git a/confmap/provider_test.go b/confmap/provider_test.go index 85871350d93..89aea968c69 100644 --- a/confmap/provider_test.go +++ b/confmap/provider_test.go @@ -41,3 +41,8 @@ func TestNewRetrievedWithOptions(t *testing.T) { assert.Equal(t, New(), retMap) assert.Equal(t, want, ret.Close(context.Background())) } + +func TestNewRetrievedUnsupportedType(t *testing.T) { + _, err := NewRetrieved(errors.New("my error")) + require.Error(t, err) +} diff --git a/confmap/resolver.go b/confmap/resolver.go index fe3358f2ff6..cb4360c561e 100644 --- a/confmap/resolver.go +++ b/confmap/resolver.go @@ -18,7 +18,9 @@ import ( "context" "errors" "fmt" + "reflect" "regexp" + "strconv" "strings" "go.uber.org/multierr" @@ -26,15 +28,21 @@ import ( "go.opentelemetry.io/collector/featuregate" ) +// schemePattern defines the regexp pattern for scheme names. +// Scheme name consist of a sequence of characters beginning with a letter and followed by any +// combination of letters, digits, plus ("+"), period ("."), or hyphen ("-"). +const schemePattern = `[A-Za-z][A-Za-z0-9+.-]+` + var ( // follows drive-letter specification: // https://datatracker.ietf.org/doc/html/draft-kerwin-file-scheme-07.html#section-2.2 driverLetterRegexp = regexp.MustCompile("^[A-z]:") - // Scheme name consist of a sequence of characters beginning with a letter and followed by any - // combination of letters, digits, plus ("+"), period ("."), or hyphen ("-"). // Need to match new line as well in the OpaqueValue, so setting the "s" flag. See https://pkg.go.dev/regexp/syntax. - locationRegexp = regexp.MustCompile(`(?s:^(?P[A-Za-z][A-Za-z0-9+.-]+):(?P.*)$)`) + uriRegexp = regexp.MustCompile(`(?s:^(?P` + schemePattern + `):(?P.*)$)`) + + // embeddedURI matches "embedded" provider uris into a string value. + embeddedURI = regexp.MustCompile(`\${` + schemePattern + `:.*?}`) errTooManyRecursiveExpansions = errors.New("too many recursive expansions") ) @@ -43,7 +51,7 @@ var ( var expandEnabledGauge = featuregate.GlobalRegistry().MustRegister( "confmap.expandEnabled", featuregate.StageBeta, - featuregate.WithRegisterDescription("controls whether expending embedded external config providers URIs")) + featuregate.WithRegisterDescription("controls whether expanding embedded external config providers URIs")) // Resolver resolves a configuration as a Conf. type Resolver struct { @@ -168,6 +176,7 @@ func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) { } retMap = NewFromStringMap(cfgMap) } + // Apply the converters in the given order. for _, confConv := range mr.converters { if err := confConv.Convert(ctx, retMap); err != nil { @@ -234,26 +243,44 @@ func (mr *Resolver) expandValueRecursively(ctx context.Context, value any) (any, func (mr *Resolver) expandValue(ctx context.Context, value any) (any, bool, error) { switch v := value.(type) { case string: - // If it doesn't have the format "${scheme:opaque}" no need to expand. - if !strings.HasPrefix(v, "${") || !strings.HasSuffix(v, "}") || !strings.Contains(v, ":") { - return value, false, nil - } - lURI, err := newLocation(v[2 : len(v)-1]) - if err != nil { - // Cannot return error, since a case like "${HOST}:${PORT}" is invalid location, - // but is supported in the legacy implementation. + // If no embedded "uris" no need to expand. embeddedURI regexp matches uriRegexp as well. + if !embeddedURI.MatchString(v) { return value, false, nil } - if strings.Contains(lURI.opaqueValue, "$") { - return nil, false, fmt.Errorf("the uri %q contains unsupported characters ('$')", lURI.asString()) - } - ret, err := mr.retrieveValue(ctx, lURI) - if err != nil { - return nil, false, err + + // If the value is a single URI, then the return value can be anything. + // This is the case `foo: ${file:some_extra_config.yml}`. + if embeddedURI.FindString(v) == v { + return mr.expandStringURI(ctx, v) } - mr.closers = append(mr.closers, ret.Close) - val, err := ret.AsRaw() - return val, true, err + + // If the URI is embedded into the string, return value must be a string, and we have to concatenate all strings. + var nerr error + var nchanged bool + nv := embeddedURI.ReplaceAllStringFunc(v, func(s string) string { + ret, changed, err := mr.expandStringURI(ctx, s) + nchanged = nchanged || changed + nerr = multierr.Append(nerr, err) + if err != nil { + return "" + } + // This list must be kept in sync with checkRawConfType. + val := reflect.ValueOf(ret) + switch val.Kind() { + case reflect.String: + return val.String() + case reflect.Int, reflect.Int32, reflect.Int64: + return strconv.FormatInt(val.Int(), 10) + case reflect.Float32, reflect.Float64: + return strconv.FormatFloat(val.Float(), 'f', -1, 64) + case reflect.Bool: + return strconv.FormatBool(val.Bool()) + default: + nerr = multierr.Append(nerr, fmt.Errorf("expanding %v, expected string value type, got %T", s, ret)) + return v + } + }) + return nv, nchanged, nerr case []any: nslice := make([]any, 0, len(v)) nchanged := false @@ -282,6 +309,23 @@ func (mr *Resolver) expandValue(ctx context.Context, value any) (any, bool, erro return value, false, nil } +func (mr *Resolver) expandStringURI(ctx context.Context, uri string) (any, bool, error) { + lURI, err := newLocation(uri[2 : len(uri)-1]) + if err != nil { + return nil, false, err + } + if strings.Contains(lURI.opaqueValue, "$") { + return nil, false, fmt.Errorf("the uri %q contains unsupported characters ('$')", lURI.asString()) + } + ret, err := mr.retrieveValue(ctx, lURI) + if err != nil { + return nil, false, err + } + mr.closers = append(mr.closers, ret.Close) + val, err := ret.AsRaw() + return val, true, err +} + type location struct { scheme string opaqueValue string @@ -292,7 +336,7 @@ func (c location) asString() string { } func newLocation(uri string) (location, error) { - submatches := locationRegexp.FindStringSubmatch(uri) + submatches := uriRegexp.FindStringSubmatch(uri) if len(submatches) != 3 { return location{}, fmt.Errorf("invalid uri: %q", uri) } diff --git a/confmap/resolver_test.go b/confmap/resolver_test.go index c37d448e907..5e9b0502a78 100644 --- a/confmap/resolver_test.go +++ b/confmap/resolver_test.go @@ -423,6 +423,161 @@ func TestResolverExpandMapAndSliceValues(t *testing.T) { assert.Equal(t, expectedMap, cfgMap.ToStringMap()) } +func TestResolverExpandStringValues(t *testing.T) { + tests := []struct { + name string + input string + output any + }{ + { + name: "test_no_match_old", + input: "${HOST}:${PORT}", + output: "${HOST}:${PORT}", + }, + { + name: "test_no_match_old_no_brackets", + input: "${HOST}:$PORT", + output: "${HOST}:$PORT", + }, + { + name: "test_match_value", + input: "${env:COMPLEX_VALUE}", + output: []any{"localhost:3042"}, + }, + { + name: "test_match_embedded", + input: "${env:HOST}:3043", + output: "localhost:3043", + }, + { + name: "test_match_embedded_multi", + input: "${env:HOST}:${env:PORT}", + output: "localhost:3044", + }, + { + name: "test_match_embedded_concat", + input: "https://${env:HOST}:3045", + output: "https://localhost:3045", + }, + { + name: "test_match_embedded_new_and_old", + input: "${env:HOST}:${PORT}", + output: "localhost:${PORT}", + }, + { + name: "test_match_int", + input: "test_${env:INT}", + output: "test_1", + }, + { + name: "test_match_int32", + input: "test_${env:INT32}", + output: "test_32", + }, + { + name: "test_match_int64", + input: "test_${env:INT64}", + output: "test_64", + }, + { + name: "test_match_float32", + input: "test_${env:FLOAT32}", + output: "test_3.25", + }, + { + name: "test_match_float64", + input: "test_${env:FLOAT64}", + output: "test_6.4", + }, + { + name: "test_match_bool", + input: "test_${env:BOOL}", + output: "test_true", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) { + return NewRetrieved(map[string]any{tt.name: tt.input}) + }) + + testProvider := newFakeProvider("env", func(_ context.Context, uri string, _ WatcherFunc) (*Retrieved, error) { + switch uri { + case "env:COMPLEX_VALUE": + return NewRetrieved([]any{"localhost:3042"}) + case "env:HOST": + return NewRetrieved("localhost") + case "env:PORT": + return NewRetrieved(3044) + case "env:INT": + return NewRetrieved(1) + case "env:INT32": + return NewRetrieved(32) + case "env:INT64": + return NewRetrieved(64) + case "env:FLOAT32": + return NewRetrieved(float32(3.25)) + case "env:FLOAT64": + return NewRetrieved(float64(6.4)) + case "env:BOOL": + return NewRetrieved(true) + } + return nil, errors.New("impossible") + }) + + resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) + require.NoError(t, err) + + cfgMap, err := resolver.Resolve(context.Background()) + require.NoError(t, err) + assert.Equal(t, map[string]any{tt.name: tt.output}, cfgMap.ToStringMap()) + }) + } +} + +func TestResolverExpandReturnError(t *testing.T) { + tests := []struct { + name string + input any + }{ + { + name: "string_value", + input: "${test:VALUE}", + }, + { + name: "slice_value", + input: []any{"${test:VALUE}"}, + }, + { + name: "map_value", + input: map[string]any{"test": "${test:VALUE}"}, + }, + { + name: "string_embedded_value", + input: "https://${test:HOST}:3045", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) { + return NewRetrieved(map[string]any{tt.name: tt.input}) + }) + + myErr := errors.New(tt.name) + testProvider := newFakeProvider("test", func(context.Context, string, WatcherFunc) (*Retrieved, error) { + return nil, myErr + }) + + resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) + require.NoError(t, err) + + _, err = resolver.Resolve(context.Background()) + assert.ErrorIs(t, err, myErr) + }) + } +} func TestResolverInfiniteExpand(t *testing.T) { const receiverValue = "${test:VALUE}" provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) { @@ -440,71 +595,70 @@ func TestResolverInfiniteExpand(t *testing.T) { assert.ErrorIs(t, err, errTooManyRecursiveExpansions) } -func TestResolverExpandSliceValueError(t *testing.T) { +func TestResolverExpandInvalidScheme(t *testing.T) { provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) { - return NewRetrieved(map[string]any{"test": []any{"${test:VALUE}"}}) + return NewRetrieved(map[string]any{"test": "${g_c_s:VALUE}"}) }) - testProvider := newFakeProvider("test", func(context.Context, string, WatcherFunc) (*Retrieved, error) { - return NewRetrieved(errors.New("invalid value")) + testProvider := newFakeProvider("g_c_s", func(context.Context, string, WatcherFunc) (*Retrieved, error) { + panic("must not be called") }) resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) require.NoError(t, err) _, err = resolver.Resolve(context.Background()) - assert.EqualError(t, err, "unsupported type=*errors.errorString for retrieved config") + // TODO: Investigate how to return an error like `invalid uri: "g_c_s:VALUE"` and keep the legacy behavior + // for cases like "${HOST}:${PORT}" + assert.NoError(t, err) } -func TestResolverExpandMapValueError(t *testing.T) { +func TestResolverExpandInvalidOpaqueValue(t *testing.T) { provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) { - return NewRetrieved(map[string]any{"test": []any{map[string]any{"test": "${test:VALUE}"}}}) + return NewRetrieved(map[string]any{"test": []any{map[string]any{"test": "${test:$VALUE}"}}}) }) testProvider := newFakeProvider("test", func(context.Context, string, WatcherFunc) (*Retrieved, error) { - return NewRetrieved(errors.New("invalid value")) + panic("must not be called") }) resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) require.NoError(t, err) _, err = resolver.Resolve(context.Background()) - assert.EqualError(t, err, "unsupported type=*errors.errorString for retrieved config") + assert.EqualError(t, err, `the uri "test:$VALUE" contains unsupported characters ('$')`) } -func TestResolverExpandInvalidScheme(t *testing.T) { - const receiverValue = "${g_c_s:VALUE}" +func TestResolverExpandUnsupportedScheme(t *testing.T) { provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) { - return NewRetrieved(map[string]any{"test": receiverValue}) + return NewRetrieved(map[string]any{"test": "${unsupported:VALUE}"}) }) - testProvider := newFakeProvider("g_c_s", func(context.Context, string, WatcherFunc) (*Retrieved, error) { - return NewRetrieved(receiverValue) + testProvider := newFakeProvider("test", func(context.Context, string, WatcherFunc) (*Retrieved, error) { + panic("must not be called") }) resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) require.NoError(t, err) _, err = resolver.Resolve(context.Background()) - // TODO: Investigate how to return an error like `invalid uri: "g_c_s:VALUE"` and keep the legacy behavior - // for cases like "${HOST}:${PORT}" - assert.NoError(t, err) + assert.EqualError(t, err, `scheme "unsupported" is not supported for uri "unsupported:VALUE"`) } -func TestResolverExpandInvalidOpaqueValue(t *testing.T) { +func TestResolverExpandStringValueInvalidReturnValue(t *testing.T) { provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) { - return NewRetrieved(map[string]any{"test": []any{map[string]any{"test": "${test:$VALUE}"}}}) + return NewRetrieved(map[string]any{"test": "localhost:${test:PORT}"}) }) testProvider := newFakeProvider("test", func(context.Context, string, WatcherFunc) (*Retrieved, error) { - return NewRetrieved(errors.New("invalid value")) + return NewRetrieved([]any{1243}) }) resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) require.NoError(t, err) _, err = resolver.Resolve(context.Background()) - assert.EqualError(t, err, `the uri "test:$VALUE" contains unsupported characters ('$')`) + assert.EqualError(t, err, `expanding ${test:PORT}, expected string value type, got []interface {}`) } func makeMapProvidersMap(providers ...Provider) map[string]Provider {