Skip to content

Commit

Permalink
[exporter/elasticsearch] validate endpoints (open-telemetry#33350)
Browse files Browse the repository at this point in the history
**Description:**

Check that Elasticsearch `endpoints` is a list of valid URLs during
config validation. This ensures that syntactically invalid endpoints
causes a fatal error during collector startup, rather than leading to a
persistent runtime error.

Previously, setting an endpoint without a scheme would lead to an error
at runtime when attempting a bulk request to Elasticsearch:

```
...
2024-06-04T10:30:23.244+0800    error
elasticsearchexporter/elasticsearch_bulk.go:313 bulk indexer flush error
{"kind": "exporter", "data_type": "traces", "name": "elasticsearch",
"error": "failed to execute the request: unsupported protocol scheme
\"\""}
...
```

Now the collector fails to start up:

```
$ make run
cd ./cmd/otelcontribcol && GO111MODULE=on go run --race . --config ../../local/config.yaml 
Error: invalid configuration: exporters::elasticsearch: invalid endpoint "localhost": invalid scheme "", expected "http" or "https"
2024/06/04 12:23:50 collector server run finished with error: invalid configuration: exporters::elasticsearch: invalid endpoint "localhost": invalid scheme "", expected "http" or "https"
exit status 1
make: *** [Makefile:255: run] Error 1
```

**Link to tracking Issue:**

N/A

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:**

N/A
  • Loading branch information
axw authored Jun 6, 2024
1 parent bebbcc9 commit 9ac428d
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 14 deletions.
30 changes: 30 additions & 0 deletions .chloggen/elasticsearchexporter-configvalidate-url.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Check that endpoints are valid URLs during config validation.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33350]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Check that endpoints are valid URLs during config validation so that
an invalid endpoint causes a fatal error during startup, rather than
leading to a persistent runtime error.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
25 changes: 24 additions & 1 deletion exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,16 @@ const defaultElasticsearchEnvName = "ELASTICSEARCH_URL"
// Validate validates the elasticsearch server configuration.
func (cfg *Config) Validate() error {
if len(cfg.Endpoints) == 0 && cfg.CloudID == "" {
if os.Getenv(defaultElasticsearchEnvName) == "" {
v := os.Getenv(defaultElasticsearchEnvName)
if v == "" {
return errConfigNoEndpoint
}
for _, endpoint := range strings.Split(v, ",") {
endpoint = strings.TrimSpace(endpoint)
if err := validateEndpoint(endpoint); err != nil {
return fmt.Errorf("invalid endpoint %q: %w", endpoint, err)
}
}
}

if cfg.CloudID != "" {
Expand All @@ -242,6 +249,9 @@ func (cfg *Config) Validate() error {
if endpoint == "" {
return errConfigEmptyEndpoint
}
if err := validateEndpoint(endpoint); err != nil {
return fmt.Errorf("invalid endpoint %q: %w", endpoint, err)
}
}

if _, ok := mappingModes[cfg.Mapping.Mode]; !ok {
Expand All @@ -251,6 +261,19 @@ func (cfg *Config) Validate() error {
return nil
}

func validateEndpoint(endpoint string) error {
u, err := url.Parse(endpoint)
if err != nil {
return err
}
switch u.Scheme {
case "http", "https":
default:
return fmt.Errorf(`invalid scheme %q, expected "http" or "https"`, u.Scheme)
}
return nil
}

// Based on "addrFromCloudID" in go-elasticsearch.
func parseCloudID(input string) (*url.URL, error) {
_, after, ok := strings.Cut(input, ":")
Expand Down
32 changes: 26 additions & 6 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ func TestConfig_Validate(t *testing.T) {
}),
err: "endpoints must not include empty entries",
},
"invalid endpoint": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"*:!"}
}),
err: `invalid endpoint "*:!": parse "*:!": first path segment in URL cannot contain colon`,
},
"invalid cloudid": {
config: withDefaultConfig(func(cfg *Config) {
cfg.CloudID = "invalid"
Expand All @@ -219,18 +225,24 @@ func TestConfig_Validate(t *testing.T) {
},
"endpoint and cloudid both set": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"test:9200"}
cfg.Endpoints = []string{"http://test:9200"}
cfg.CloudID = "foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY="
}),
err: "only one of endpoints or cloudid may be specified",
},
"invalid mapping mode": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"test:9200"}
cfg.Endpoints = []string{"http://test:9200"}
cfg.Mapping.Mode = "invalid"
}),
err: `unknown mapping mode "invalid"`,
},
"invalid scheme": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"without_scheme"}
}),
err: `invalid endpoint "without_scheme": invalid scheme "", expected "http" or "https"`,
},
}

for name, tt := range tests {
Expand All @@ -242,10 +254,18 @@ func TestConfig_Validate(t *testing.T) {
}

func TestConfig_Validate_Environment(t *testing.T) {
t.Setenv("ELASTICSEARCH_URL", "test:9200")
config := withDefaultConfig()
err := config.Validate()
require.NoError(t, err)
t.Run("valid", func(t *testing.T) {
t.Setenv("ELASTICSEARCH_URL", "http://test:9200")
config := withDefaultConfig()
err := config.Validate()
require.NoError(t, err)
})
t.Run("invalid", func(t *testing.T) {
t.Setenv("ELASTICSEARCH_URL", "http://valid:9200, *:!")
config := withDefaultConfig()
err := config.Validate()
assert.EqualError(t, err, `invalid endpoint "*:!": parse "*:!": first path segment in URL cannot contain colon`)
})
}

func withDefaultConfig(fns ...func(*Config)) *Config {
Expand Down
4 changes: 0 additions & 4 deletions exporter/elasticsearchexporter/elasticsearch_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ func newElasticsearchClient(logger *zap.Logger, config *Config) (*esClientCurren
headers.Add(k, v)
}

// TODO: validate settings:
// - try to parse address and validate scheme (address must be a valid URL)
// - check if cloud ID is valid

// maxRetries configures the maximum number of event publishing attempts,
// including the first send and additional retries.

Expand Down
6 changes: 3 additions & 3 deletions exporter/elasticsearchexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestCreateDefaultConfig(t *testing.T) {
func TestFactory_CreateLogsExporter(t *testing.T) {
factory := NewFactory()
cfg := withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"test:9200"}
cfg.Endpoints = []string{"http://test:9200"}
})
params := exportertest.NewNopCreateSettings()
exporter, err := factory.CreateLogsExporter(context.Background(), params, cfg)
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestFactory_CreateMetricsExporter_Fail(t *testing.T) {
func TestFactory_CreateTracesExporter(t *testing.T) {
factory := NewFactory()
cfg := withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"test:9200"}
cfg.Endpoints = []string{"http://test:9200"}
})
params := exportertest.NewNopCreateSettings()
exporter, err := factory.CreateTracesExporter(context.Background(), params, cfg)
Expand All @@ -76,7 +76,7 @@ func TestFactory_CreateTracesExporter_Fail(t *testing.T) {
func TestFactory_CreateLogsAndTracesExporterWithDeprecatedIndexOption(t *testing.T) {
factory := NewFactory()
cfg := withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"test:9200"}
cfg.Endpoints = []string{"http://test:9200"}
cfg.Index = "test_index"
})
params := exportertest.NewNopCreateSettings()
Expand Down

0 comments on commit 9ac428d

Please sign in to comment.