From e69b2bd20bdd0fa49d53bc98c98d52cd36a9a0dd Mon Sep 17 00:00:00 2001 From: Ian Oberst Date: Tue, 3 Jan 2023 11:13:24 -0800 Subject: [PATCH] Adding Partitioning to Datadog Sink Added options for compression and partitioning to the Datadog sink when the API method is used. When receiving an error related to the payload size being too large the sink will attempt to calculate a possible size that will satisfy the size requirements and re-send the metrics. In the event that it exhausts options and cannot send the metrics the attempt will fail. Added the option to turn on compression for the API, which allows much larger payloads by default. --- docs/v1.0/sinks/datadog.md | 7 + sink/datadog.go | 212 +++++++++++++++++++++---- sink/datadog_test.go | 311 +++++++++++++++++++++++++++++++++++++ test/mock/http.go | 15 ++ 4 files changed, 517 insertions(+), 28 deletions(-) create mode 100644 sink/datadog_test.go create mode 100644 test/mock/http.go diff --git a/docs/v1.0/sinks/datadog.md b/docs/v1.0/sinks/datadog.md index b995549..1542de4 100644 --- a/docs/v1.0/sinks/datadog.md +++ b/docs/v1.0/sinks/datadog.md @@ -32,4 +32,11 @@ sinks: metric-translator: "" metric-prefix: "" dogstatsd-host: "" + api-use-compression: "false" + api-partition-size: "" + api-auto-partition: "true" ``` + +## API Considerations + +When using the API directly the sink may run into issues when sending large payloads, as the Datadog API [restricts the payload to a maximum of 500Kib](https://docs.datadoghq.com/api/latest/metrics/). The DogStatsD option avoids this as it always sends metrics individually. By default the API option will attempt to automatically adjust the size of the payloads sent when it encounters an error related to a payload being too large. You can also explicitly set the number of metrics to send at once using `api-partition-size` and setting `api-auto-partition` to `false`. You can optionally turn on compression for the API, in which case Datadog will accept any payloads whose uncomressed size is less than 5Mib. diff --git a/sink/datadog.go b/sink/datadog.go index ad113a3..f8fce75 100644 --- a/sink/datadog.go +++ b/sink/datadog.go @@ -3,12 +3,18 @@ package sink import ( + "bytes" + "compress/zlib" "context" + "encoding/json" "fmt" "io/ioutil" + "math" "net/http" "regexp" "strconv" + "strings" + "sync" "time" "github.com/DataDog/datadog-go/v5/statsd" @@ -23,6 +29,10 @@ import ( var portRe = regexp.MustCompile(`:\d+$`) +const ( + MAX_PAYLOAD_SIZE int = 512000 +) + // Datadog sends metrics to Datadog. type Datadog struct { monitorId string @@ -31,10 +41,14 @@ type Datadog struct { prefix string // datadog.metric-prefix // -- Api - metricsApi *datadogV2.MetricsApi - apiKeyAuth string - appKeyAuth string - resources []datadogV2.MetricResource + metricsApi *datadogV2.MetricsApi + apiKeyAuth string + appKeyAuth string + resources []datadogV2.MetricResource + compress bool + partitionSize int + autoPartition bool + partitionLock sync.Mutex // -- DogStatsD dogstatsd bool @@ -62,9 +76,11 @@ func NewDatadog(monitorId string, opts, tags map[string]string, httpClient *http } d := &Datadog{ - monitorId: monitorId, - tags: tagList, - resources: resources, + monitorId: monitorId, + tags: tagList, + resources: resources, + partitionSize: math.MaxInt32, + autoPartition: true, } for k, v := range opts { @@ -104,9 +120,23 @@ func NewDatadog(monitorId string, opts, tags map[string]string, httpClient *http } d.prefix = v + case "api-use-compression": + d.compress = strings.ToLower(v) == "true" + case "dogstatsd-host": d.dogstatsdHost = v + case "api-partition-size": + partitionSize, err := strconv.Atoi(v) + if err != nil || partitionSize <= 0 { + return nil, fmt.Errorf("datadog sink partition-size must be a positive integer") + } + + d.partitionSize = partitionSize + + case "api-auto-partition": + d.autoPartition = strings.ToLower(v) == "true" + default: return nil, fmt.Errorf("invalid option: %s", k) } @@ -141,6 +171,7 @@ func NewDatadog(monitorId string, opts, tags map[string]string, httpClient *http c := datadog.NewConfiguration() c.HTTPClient = httpClient + c.Compress = d.compress metricsApi := datadogV2.NewMetricsApi(datadog.NewAPIClient(c)) d.metricsApi = metricsApi } @@ -148,6 +179,61 @@ func NewDatadog(monitorId string, opts, tags map[string]string, httpClient *http return d, nil } +func (s *Datadog) sendApi(ddCtx context.Context, dp []datadogV2.MetricSeries) error { + // Send metrics to Datadog. The Datadog client handles everything; we just pass + // it data points. + payload := datadogV2.MetricPayload{ + Series: dp, + } + _, r, err := s.metricsApi.SubmitMetrics(ddCtx, payload, *datadogV2.NewSubmitMetricsOptionalParameters()) + // Check to see if the request was too large + if r.StatusCode == http.StatusRequestEntityTooLarge { + oldPartitionSize := s.partitionSize + + // Sample and try and partition this to fit + if pErr := s.updatePartitionSize(dp[0], len(dp)); pErr != nil { + return fmt.Errorf("Unable to compute paritions properly send metrics: %s: %s", pErr, err) + } + + localPartitionSize := s.partitionSize + + if (localPartitionSize == len(dp) && oldPartitionSize == localPartitionSize) || oldPartitionSize == localPartitionSize { + // The partition size already matches the size of the data we have, so we can't retry any more + return fmt.Errorf("Unable to send metrics due to size: %s", err) + } + + // Repartition and send again + var pErr error + rangeStart := 0 + var i int + for i = localPartitionSize; i < len(dp); i += localPartitionSize { + if err = s.sendApi(ddCtx, dp[rangeStart:i]); err != nil { + pErr = fmt.Errorf("%s\n%s", pErr, err) + } + rangeStart = i + } + + if i-rangeStart > 0 { + if i > len(dp) { + i = len(dp) + } + + if err = s.sendApi(ddCtx, dp[rangeStart:i]); err != nil { + pErr = fmt.Errorf("%s\n%s", pErr, err) + } + } + + return pErr + } + + if err != nil { + blip.Debug("error sending data points to Datadog: %s", err) + blip.Debug("error sending data points to Datadog: Http Response - %s", r) + } + + return err +} + func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error { status.Monitor(s.monitorId, s.Name(), "sending metrics") @@ -171,6 +257,25 @@ func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error { } n = 0 + // Make a copy of the partition size we are using for API calls + localPartitionSize := s.partitionSize + rangeStart := 0 + var apiErrors []string + + // Setup our context for API calls + ddCtx := context.WithValue( + ctx, + datadog.ContextAPIKeys, + map[string]datadog.APIKey{ + "apiKeyAuth": { + Key: s.apiKeyAuth, + }, + "appKeyAuth": { + Key: s.apiKeyAuth, + }, + }, + ) + // Convert each Blip metric value to an SFX data point for domain := range m.Values { // each domain metrics := m.Values[domain] @@ -280,6 +385,14 @@ func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error { } n++ + + // Check if we have filled the current partition quantity + if !s.dogstatsd && n%localPartitionSize == 0 { + if err := s.sendApi(ddCtx, dp[rangeStart:n]); err != nil { + apiErrors = append(apiErrors, err.Error()) + } + rangeStart = n + } } // metric } // domain @@ -293,33 +406,76 @@ func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error { return nil } - // send metrics via API - ddCtx := context.WithValue( - ctx, - datadog.ContextAPIKeys, - map[string]datadog.APIKey{ - "apiKeyAuth": { - Key: s.apiKeyAuth, - }, - "appKeyAuth": { - Key: s.apiKeyAuth, - }, - }, - ) + if n-rangeStart > 0 { + if err := s.sendApi(ddCtx, dp[rangeStart:n]); err != nil { + apiErrors = append(apiErrors, err.Error()) + } + } - payload := datadogV2.MetricPayload{ - Series: dp, + if len(apiErrors) > 0 { + return fmt.Errorf("%s", strings.Join(apiErrors, "\n")) } - // Send metrics to Datadog. The Datadog client handles everything; we just pass - // it data points. - _, r, err := s.metricsApi.SubmitMetrics(ddCtx, payload, *datadogV2.NewSubmitMetricsOptionalParameters()) + return nil +} + +// Update our estimated partition +func (s *Datadog) updatePartitionSize(metric datadogV2.MetricSeries, currentMessageLength int) error { + if !s.autoPartition { + return nil + } + + size, err := s.estimateSize(metric) if err != nil { - blip.Debug("error sending data points to Datadog: %s", err) - blip.Debug("error sending data points to Datadog: Http Response - %s", r) + return err } - return err + // Figure out how many metrics can fit inside the max payload, but pad it slightly to control for headers, etc. + estPartitionSize := (MAX_PAYLOAD_SIZE - 1000) / size + + s.partitionLock.Lock() + defer s.partitionLock.Unlock() + + if estPartitionSize >= s.partitionSize { + // The new partition size is greater than what we have now, so let's reduce it by a small factor to make sure we can send metrics properly + estPartitionSize = int(float32(s.partitionSize) * .9) + } + + if estPartitionSize > currentMessageLength { + // If we needed to recalculate the parition size but it is larger than the current + // collection of messages then we need to reduce this more. + estPartitionSize = currentMessageLength - 1 + } + + // Ensure a minimum partition size + if estPartitionSize <= 0 { + estPartitionSize = 1 + } + + s.partitionSize = estPartitionSize + blip.Debug("Updating Datadog API parittion size to %d", estPartitionSize) + + return nil +} + +// Estiamte the size of a metric payload for use in partitioning +func (s *Datadog) estimateSize(metric datadogV2.MetricSeries) (int, error) { + data, err := json.Marshal(metric) + if err != nil { + return 0, err + } + + size := len(data) + + if s.compress { + var b bytes.Buffer + w := zlib.NewWriter(&b) + w.Write(data) + w.Close() + size = len(b.Bytes()) + } + + return size, nil } func (s *Datadog) Name() string { diff --git a/sink/datadog_test.go b/sink/datadog_test.go new file mode 100644 index 0000000..01ebcd9 --- /dev/null +++ b/sink/datadog_test.go @@ -0,0 +1,311 @@ +package sink + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + "testing" + "time" + + "github.com/DataDog/datadog-api-client-go/v2/api/datadogV2" + "github.com/cashapp/blip" + "github.com/cashapp/blip/test/mock" + "github.com/go-test/deep" +) + +func defaultOps() map[string]string { + return map[string]string{ + "api-key-auth": "testkey", + "app-key-auth": "testkey", + } +} + +func okHttpClient() *http.Client { + return &http.Client{ + Transport: &mock.Transport{ + RoundTripFunc: func(r *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + }, nil + }, + }, + } +} + +func getBlipMetrics() *blip.Metrics { + return &blip.Metrics{ + Begin: time.Now().Add(-1 * time.Hour), + End: time.Now(), + MonitorId: "testmonitor", + Plan: "testplan", + Level: "testlevel", + State: "teststate", + Values: map[string][]blip.MetricValue{ + "testdomain": { + { + Name: "testmetric1", + Value: 1.0, + Type: blip.GAUGE, + }, + { + Name: "testmetric2", + Value: 1.0, + Type: blip.GAUGE, + }, + { + Name: "testmetric3", + Value: 1.0, + Type: blip.GAUGE, + }, + { + Name: "testmetric4", + Value: 1.0, + Type: blip.GAUGE, + }, + { + Name: "testmetric5", + Value: 1.0, + Type: blip.GAUGE, + }, + }, + }, + } +} + +func TestDatadogSink(t *testing.T) { + ddSink, err := NewDatadog("testmonitor", defaultOps(), map[string]string{}, okHttpClient()) + + if err != nil { + t.Fatalf("Expected no error but got %v", err) + } + + err = ddSink.Send(context.Background(), getBlipMetrics()) + + if err != nil { + t.Fatalf("Expected no error but got %v", err) + } +} + +func TestDatadogPartitionSize(t *testing.T) { + callCount := 0 + + httpClient := &http.Client{ + Transport: &mock.Transport{ + RoundTripFunc: func(r *http.Request) (*http.Response, error) { + callCount++ + + if callCount == 1 { + return &http.Response{ + StatusCode: http.StatusRequestEntityTooLarge, + }, nil + } else { + return &http.Response{ + StatusCode: http.StatusOK, + }, nil + } + }, + }, + } + + ddSink, err := NewDatadog("testmonitor", defaultOps(), map[string]string{}, httpClient) + + if err != nil { + t.Fatalf("Expected no error but got %v", err) + } + + err = ddSink.Send(context.Background(), getBlipMetrics()) + + if err != nil { + t.Fatalf("Expected no error but got %v", err) + } + + if ddSink.partitionSize != 4 { + t.Fatalf("Expected a partition size of 4 but got %d", ddSink.partitionSize) + } + + if callCount != 3 { + t.Fatalf("Expected 3 calls but got %d", callCount) + } +} + +func TestDatadogPartitionSizeMultipleFail(t *testing.T) { + callCount := 0 + var collectedMetrics []string + + httpClient := &http.Client{ + Transport: &mock.Transport{ + RoundTripFunc: func(r *http.Request) (*http.Response, error) { + callCount++ + + if callCount <= 2 { + return &http.Response{ + StatusCode: http.StatusRequestEntityTooLarge, + }, nil + } else { + var payload datadogV2.MetricPayload + body, _ := r.GetBody() + defer body.Close() + data, _ := ioutil.ReadAll(body) + json.Unmarshal(data, &payload) + + for _, metric := range payload.Series { + collectedMetrics = append(collectedMetrics, metric.Metric) + } + + return &http.Response{ + StatusCode: http.StatusOK, + }, nil + } + }, + }, + } + + ddSink, err := NewDatadog("testmonitor", defaultOps(), map[string]string{}, httpClient) + + if err != nil { + t.Fatalf("Expected no error but got %v", err) + } + + err = ddSink.Send(context.Background(), getBlipMetrics()) + + if err != nil { + t.Fatalf("Expected no error but got %v", err) + } + + if ddSink.partitionSize != 3 { + t.Fatalf("Expected a partition size of 3 but got %d", ddSink.partitionSize) + } + + if callCount != 5 { + t.Fatalf("Expected 5 calls but got %d", callCount) + } + + expectedMetrics := []string{ + "testdomain.testmetric1", + "testdomain.testmetric2", + "testdomain.testmetric3", + "testdomain.testmetric4", + "testdomain.testmetric5", + } + + if diff := deep.Equal(expectedMetrics, collectedMetrics); diff != nil { + t.Fatal(diff) + } +} + +func TestDatadogSinkFixedPartition(t *testing.T) { + ops := defaultOps() + ops["api-partition-size"] = "2" + ops["api-auto-partition"] = "false" + + callCount := 0 + var collectedMetrics []string + + httpClient := &http.Client{ + Transport: &mock.Transport{ + RoundTripFunc: func(r *http.Request) (*http.Response, error) { + callCount++ + + var payload datadogV2.MetricPayload + body, _ := r.GetBody() + defer body.Close() + data, _ := ioutil.ReadAll(body) + json.Unmarshal(data, &payload) + + for _, metric := range payload.Series { + collectedMetrics = append(collectedMetrics, metric.Metric) + } + + return &http.Response{ + StatusCode: http.StatusOK, + }, nil + }, + }, + } + + ddSink, err := NewDatadog("testmonitor", ops, map[string]string{}, httpClient) + + if err != nil { + t.Fatalf("Expected no error but got %v", err) + } + + err = ddSink.Send(context.Background(), getBlipMetrics()) + + if err != nil { + t.Fatalf("Expected no error but got %v", err) + } + + if ddSink.partitionSize != 2 { + t.Fatalf("Expected a partition size of 2 but got %d", ddSink.partitionSize) + } + + if callCount != 3 { + t.Fatalf("Expected 3 calls but got %d", callCount) + } + + expectedMetrics := []string{ + "testdomain.testmetric1", + "testdomain.testmetric2", + "testdomain.testmetric3", + "testdomain.testmetric4", + "testdomain.testmetric5", + } + + if diff := deep.Equal(expectedMetrics, collectedMetrics); diff != nil { + t.Fatal(diff) + } +} + +func TestDatadogFixedPartitionSizeTooLarge(t *testing.T) { + ops := defaultOps() + ops["api-partition-size"] = "2" + ops["api-auto-partition"] = "false" + + callCount := 0 + + httpClient := &http.Client{ + Transport: &mock.Transport{ + RoundTripFunc: func(r *http.Request) (*http.Response, error) { + callCount++ + + if callCount < 10 { + return &http.Response{ + StatusCode: http.StatusRequestEntityTooLarge, + }, nil + } else { + return &http.Response{ + StatusCode: http.StatusBadRequest, + }, nil + } + }, + }, + } + + ddSink, err := NewDatadog("testmonitor", ops, map[string]string{}, httpClient) + + if err != nil { + t.Fatalf("Expected no error but got %v", err) + } + + err = ddSink.Send(context.Background(), getBlipMetrics()) + + if err == nil { + t.Fatalf("Expected an error but none.") + } + + expectedErr := fmt.Errorf("Unable to send metrics due to size: \nUnable to send metrics due to size: \nUnable to send metrics due to size: ") + if strings.Compare(err.Error(), expectedErr.Error()) != 0 { + t.Errorf("Expected 'Unable to send metrics due to size: ' but got '%v'", err) + } + + if ddSink.partitionSize != 2 { + t.Fatalf("Expected a partition size of 2 but got %d", ddSink.partitionSize) + } + + if callCount != 3 { + t.Fatalf("Expected 3 call but got %d", callCount) + } +} diff --git a/test/mock/http.go b/test/mock/http.go new file mode 100644 index 0000000..c1d4f18 --- /dev/null +++ b/test/mock/http.go @@ -0,0 +1,15 @@ +package mock + +import "net/http" + +// A mock of http.Transport. +type Transport struct { + RoundTripFunc func(*http.Request) (*http.Response, error) +} + +func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { + if t.RoundTripFunc != nil { + return t.RoundTripFunc(req) + } + return nil, nil +}