Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Prometheus Remote Write Exporter supporting Cortex - factory and config #1544

Merged
merged 22 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Prometheus Remote Write Exporter

This Exporter sends metrics data in Prometheus TimeSeries format to Cortex or any Prometheus [remote write compatible backend](https://prometheus.io/docs/operating/integrations/).

Non-cumulative monotonic, histogram, and summary OTLP metrics are dropped by this exporter.

The following settings are required:

- `endpoint`: protocol:host:port to which the exporter is going to send traces or metrics, using
the HTTP/HTTPS protocol.

huyan0 marked this conversation as resolved.
Show resolved Hide resolved
The following settings can be optionally configured:
- `namespace`: prefix attached to each exported metric name.

- `headers`: additional headers attached to each HTTP request. `X-Prometheus-Remote-Write-Version` cannot be set by users
and is attached to each request.
- `insecure` (default = false): whether to enable client transport security for
the exporter's connection.
- `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should
only be used if `insecure` is set to true.
- `cert_file`: path to the TLS cert to use for TLS required connections. Should
only be used if `insecure` is set to true.
- `key_file`: path to the TLS key to use for TLS required connections. Should
only be used if `insecure` is set to true.
- `timeout` (default = 5s): How long to wait until the connection is close.
- `read_buffer_size` (default = 0): ReadBufferSize for HTTP client.
- `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client.

Example:

```yaml
exporters:
prometheusremotewrite:
endpoint: "http://some.url:9411/api/prom/push"
```
The full list of settings exposed for this exporter are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).

_Here is a link to the overall project [design](https://github.com/open-telemetry/opentelemetry-collector/pull/1464)_
39 changes: 39 additions & 0 deletions exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2020 The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prometheusremotewriteexporter

import (
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Config defines configuration for Remote Write exporter.
type Config struct {
// squash ensures fields are correctly decoded in embedded struct.
configmodels.ExporterSettings `mapstructure:",squash"`
exporterhelper.TimeoutSettings `mapstructure:",squash"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

// prefix attached to each exported metric name
// See: https://prometheus.io/docs/practices/naming/#metric-names
Namespace string `mapstructure:"namespace"`
huyan0 marked this conversation as resolved.
Show resolved Hide resolved

// Optional headers configuration for authorization and security/extra metadata
Headers map[string]string `mapstructure:"headers"`

HTTPClientSettings confighttp.HTTPClientSettings `mapstructure:",squash"`
}
90 changes: 90 additions & 0 deletions exporter/prometheusremotewriteexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2020 The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prometheusremotewriteexporter

import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// TestLoadConfig checks whether yaml configuration can be loaded correctly
func Test_loadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := NewFactory()
factories.Exporters[typeStr] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

// From the default configurations -- checks if a correct exporter is instantiated
e0 := cfg.Exporters["prometheusremotewrite"]
assert.Equal(t, e0, factory.CreateDefaultConfig())

// checks if the correct Config struct can be instantiated from testdata/config.yaml
e1 := cfg.Exporters["prometheusremotewrite/2"]
assert.Equal(t, e1,
&Config{
ExporterSettings: configmodels.ExporterSettings{
NameVal: "prometheusremotewrite/2",
TypeVal: "prometheusremotewrite",
},
TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(),
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
Namespace: "test-space",

Headers: map[string]string{
"prometheus-remote-write-version": "0.1.0",
"tenant-id": "234"},

HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: "localhost:8888",
TLSSetting: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "/var/lib/mycert.pem", //This is subject to change, but currently I have no idea what else to put here lol
},
Insecure: false,
},
ReadBufferSize: 0,

WriteBufferSize: 512 * 1024,

Timeout: 5 * time.Second,
},
})
}
75 changes: 75 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2020 The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Note: implementation for this class is in a separate PR
package prometheusremotewriteexporter

import (
"context"
"net/http"
"net/url"
"sync"

"github.com/pkg/errors"

"go.opentelemetry.io/collector/consumer/pdata"
)

// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint
type prwExporter struct {
namespace string
endpointURL *url.URL
client *http.Client
headers map[string]string
wg *sync.WaitGroup
closeChan chan struct{}
}

// newPrwExporter initializes a new prwExporter instance and sets fields accordingly.
// client parameter cannot be nil.
func newPrwExporter(namespace string, endpoint string, client *http.Client, headers map[string]string) (*prwExporter, error) {

if client == nil {
return nil, errors.Errorf("http client cannot be nil")
}

endpointURL, err := url.ParseRequestURI(endpoint)
if err != nil {
return nil, errors.Errorf("invalid endpoint")
}

return &prwExporter{
namespace: namespace,
endpointURL: endpointURL,
client: client,
headers: headers,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
}, nil
}

// shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations
// to finish before returning
func (prwe *prwExporter) shutdown(context.Context) error {
close(prwe.closeChan)
prwe.wg.Wait()
return nil
}

// pushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of
// TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally
// exports the map.
func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int, error) {
return 0, nil
}
114 changes: 114 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2020 The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Note: implementation for this class is in a separate PR
package prometheusremotewriteexporter

import (
"context"
"net/http"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Test_newPrwExporter checks that a new exporter instance with non-nil fields is initialized
func Test_newPrwExporter(t *testing.T) {
config := &Config{
ExporterSettings: configmodels.ExporterSettings{},
TimeoutSettings: exporterhelper.TimeoutSettings{},
QueueSettings: exporterhelper.QueueSettings{},
RetrySettings: exporterhelper.RetrySettings{},
Namespace: "",
HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: ""},
}
tests := []struct {
name string
config *Config
namespace string
endpoint string
client *http.Client
returnError bool
}{
{
"invalid_URL",
config,
"test",
"invalid URL",
http.DefaultClient,
true,
},
{
"nil_client",
config,
"test",
"http://some.url:9411/api/prom/push",
nil,
true,
},
{
"success_case",
config,
"test",
"http://some.url:9411/api/prom/push",
http.DefaultClient,
false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
prwe, err := newPrwExporter(tt.namespace, tt.endpoint, tt.client, testHeaders)
if tt.returnError {
assert.Error(t, err)
return
}
require.NotNil(t, prwe)
assert.NotNil(t, prwe.namespace)
assert.NotNil(t, prwe.endpointURL)
assert.NotNil(t, prwe.client)
assert.NotNil(t, prwe.closeChan)
assert.NotNil(t, prwe.wg)
})
}
}

// Test_shutdown checks after shutdown is called, incoming calls to pushMetrics return error.
func Test_shutdown(t *testing.T) {
prwe := &prwExporter{
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
}
err := prwe.shutdown(context.Background())
assert.NoError(t, err)

}

// Test_pushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as
// expected
func Test_pushMetrics(t *testing.T) {
prwe := &prwExporter{
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
}
_, err := prwe.pushMetrics(context.Background(), pdata.Metrics{})
assert.NoError(t, err)
}
Loading