From 6bb670cc1905e7994e26d1a115008a9f3aa1535f Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez Date: Thu, 11 Apr 2024 23:18:30 +0200 Subject: [PATCH] Integration test Signed-off-by: Friedrich Gonzalez --- integration/e2ecortex/client.go | 71 ++++++++++++++++++++++++++++++ integration/otlp_test.go | 76 +++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) create mode 100644 integration/otlp_test.go diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index b5a991a0714..f8f3fab2a71 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -27,6 +27,9 @@ import ( yaml "gopkg.in/yaml.v3" "github.com/cortexproject/cortex/pkg/ruler" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" ) var ErrNotFound = errors.New("not found") @@ -121,6 +124,74 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) { return res, nil } +func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) { + var metricName string + attributes := make(map[string]any) + for _, label := range ts.Labels { + if label.Name == model.MetricNameLabel { + metricName = label.Value + } else { + attributes[label.Name] = label.Value + } + } + return metricName, attributes +} + +func createDatapointsGauge(newMetric pmetric.Metric, attributes map[string]any, samples []prompb.Sample) { + newMetric.SetEmptyGauge() + for _, sample := range samples { + datapoint := newMetric.Gauge().DataPoints().AppendEmpty() + datapoint.SetDoubleValue(sample.Value) + datapoint.SetTimestamp(pcommon.Timestamp(sample.Timestamp * time.Millisecond.Nanoseconds())) + datapoint.Attributes().FromRaw(attributes) + } +} + +// Convert Timeseries to Metrics +func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics { + metrics := pmetric.NewMetrics() + for _, ts := range timeseries { + metricName, attributes := getNameAndAttributes(ts) + newMetric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + newMetric.SetName(metricName) + //TODO Set description for new metric + //TODO Set unit for new metric + createDatapointsGauge(newMetric, attributes, ts.Samples) + //TODO(friedrichg): Add support for histograms + } + return metrics +} + +// Push series to OTLP endpoint +func (c *Client) OTLP(timeseries []prompb.TimeSeries) (*http.Response, error) { + + data, err := pmetricotlp.NewExportRequestFromMetrics(convertTimeseriesToMetrics(timeseries)).MarshalProto() + if err != nil { + return nil, err + } + + // Create HTTP request + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/otlp/v1/metrics", c.distributorAddress), bytes.NewReader(data)) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("X-Scope-OrgID", c.orgID) + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + // Execute HTTP request + res, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + + defer res.Body.Close() + return res, nil +} + // Query runs an instant query. func (c *Client) Query(query string, ts time.Time) (model.Value, error) { value, _, err := c.querierClient.Query(context.Background(), query, ts) diff --git a/integration/otlp_test.go b/integration/otlp_test.go new file mode 100644 index 00000000000..64c33334f75 --- /dev/null +++ b/integration/otlp_test.go @@ -0,0 +1,76 @@ +//go:build requires_docker +// +build requires_docker + +package integration + +import ( + "fmt" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" +) + +func TestOTLP(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + // Start Cortex components. + require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks.yaml", cortexConfigFile)) + + // Start Cortex in single binary mode, reading the config from file and overwriting + // the backend config to make it work with Minio. + flags := map[string]string{ + "-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey, + "-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey, + "-blocks-storage.s3.bucket-name": bucketName, + "-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName), + "-blocks-storage.s3.insecure": "true", + } + + cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex-1", cortexConfigFile, flags, "", 9009, 9095) + require.NoError(t, s.StartAndWaitReady(cortex)) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Push some series to Cortex. + now := time.Now() + series, expectedVector := generateSeries("series_1", now, prompb.Label{Name: "foo", Value: "bar"}) + + res, err := c.OTLP(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Query the series. + result, err := c.Query("series_1", now) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector, result.(model.Vector)) + + labelValues, err := c.LabelValues("foo", time.Time{}, time.Time{}, nil) + require.NoError(t, err) + require.Equal(t, model.LabelValues{"bar"}, labelValues) + + labelNames, err := c.LabelNames(time.Time{}, time.Time{}) + require.NoError(t, err) + require.Equal(t, []string{"__name__", "foo"}, labelNames) + + // Check that a range query does not return an error to sanity check the queryrange tripperware. + _, err = c.QueryRange("series_1", now.Add(-15*time.Minute), now, 15*time.Second) + require.NoError(t, err) + + //TODO(friedrichg): test histograms +}