Skip to content

Commit

Permalink
Adding Metric Partitioning to Datadog Sink
Browse files Browse the repository at this point in the history
Added request partitioning to the Datadog sink when the API method is used. When receiving an error related to the payload size being too large the sink will attempt to calculate a possible size that will satisfy the size requirements and re-send the metrics. In the event that it exhausts options and cannot send the metrics the attempt will fail.

Added am option manage compression for the API (on by default).
  • Loading branch information
Areson committed Jan 9, 2023
1 parent 8b48a85 commit a99ec0d
Show file tree
Hide file tree
Showing 5 changed files with 479 additions and 25 deletions.
3 changes: 2 additions & 1 deletion docs/v1.0/sinks/datadog.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ sinks:
metric-translator: ""
metric-prefix: ""
dogstatsd-host: ""
```
api-compress: "true"
```
177 changes: 153 additions & 24 deletions sink/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
package sink

import (
"bytes"
"compress/zlib"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/DataDog/datadog-go/v5/statsd"
Expand All @@ -23,6 +29,10 @@ import (

var portRe = regexp.MustCompile(`:\d+$`)

const (
MAX_PAYLOAD_SIZE int = 512000
)

// Datadog sends metrics to Datadog.
type Datadog struct {
monitorId string
Expand All @@ -35,6 +45,11 @@ type Datadog struct {
apiKeyAuth string
appKeyAuth string
resources []datadogV2.MetricResource
compress bool

maxMetricsPerRequest int // Limit the number of metrics we send per request. Only used with the API
maxMetricsPerRequestLock sync.Mutex
maxPayloadSize int

// -- DogStatsD
dogstatsd bool
Expand Down Expand Up @@ -62,9 +77,12 @@ func NewDatadog(monitorId string, opts, tags map[string]string, httpClient *http
}

d := &Datadog{
monitorId: monitorId,
tags: tagList,
resources: resources,
monitorId: monitorId,
tags: tagList,
resources: resources,
maxMetricsPerRequest: math.MaxInt32, // By default don't limit the number of metrics per request.
compress: true,
maxPayloadSize: MAX_PAYLOAD_SIZE,
}

for k, v := range opts {
Expand Down Expand Up @@ -104,6 +122,9 @@ func NewDatadog(monitorId string, opts, tags map[string]string, httpClient *http
}
d.prefix = v

case "api-compress":
d.compress = blip.Bool(v)

case "dogstatsd-host":
d.dogstatsdHost = v

Expand Down Expand Up @@ -141,6 +162,7 @@ func NewDatadog(monitorId string, opts, tags map[string]string, httpClient *http

c := datadog.NewConfiguration()
c.HTTPClient = httpClient
c.Compress = d.compress
metricsApi := datadogV2.NewMetricsApi(datadog.NewAPIClient(c))
d.metricsApi = metricsApi
}
Expand Down Expand Up @@ -171,6 +193,25 @@ func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error {
}
n = 0

// Make a copy of maxMetricsPerRequest in case it gets updated by other threads
localMaxMetricsPerRequest := s.maxMetricsPerRequest
rangeStart := 0
var apiErrors []string

// Setup our context for API calls
ddCtx := context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: s.apiKeyAuth,
},
"appKeyAuth": {
Key: s.apiKeyAuth,
},
},
)

// Convert each Blip metric value to an SFX data point
for domain := range m.Values { // each domain
metrics := m.Values[domain]
Expand Down Expand Up @@ -280,6 +321,14 @@ func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error {
}

n++

// Check if we have reached the maximum number of metrics per request
if !s.dogstatsd && n%localMaxMetricsPerRequest == 0 {
if err := s.sendApi(ddCtx, dp[rangeStart:n]); err != nil {
apiErrors = append(apiErrors, err.Error())
}
rangeStart = n
}
} // metric
} // domain

Expand All @@ -293,33 +342,113 @@ func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error {
return nil
}

// send metrics via API
ddCtx := context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: s.apiKeyAuth,
},
"appKeyAuth": {
Key: s.apiKeyAuth,
},
},
)
if n-rangeStart > 0 {
if err := s.sendApi(ddCtx, dp[rangeStart:n]); err != nil {
apiErrors = append(apiErrors, err.Error())
}
}

if len(apiErrors) > 0 {
return fmt.Errorf("%s", strings.Join(apiErrors, "\n"))
}

return nil
}

// Send metrics to the API taking into consideration the number of metrics sent per request.
func (s *Datadog) sendApi(ddCtx context.Context, dp []datadogV2.MetricSeries) error {
localMaxMetricsPerRequest := s.maxMetricsPerRequest

for rangeStart := 0; rangeStart < len(dp); {
// Determine the subetset of metrics to send based on our
// max per request
rangeEnd := rangeStart + localMaxMetricsPerRequest
if rangeEnd > len(dp) {
rangeEnd = len(dp)
}

if _, r, err := s.metricsApi.SubmitMetrics(ddCtx, *datadogV2.NewMetricPayload(dp[rangeStart:rangeEnd]), *datadogV2.NewSubmitMetricsOptionalParameters()); err != nil {
if r.StatusCode == http.StatusRequestEntityTooLarge {
// Is the number of metrics sent already the smallest possible?
if localMaxMetricsPerRequest == 1 {
return fmt.Errorf("Unable to send metrics: %v", err)
}

// The payload was too large, so we need to recalculate it and try with a smaller size
if localMaxMetricsPerRequest, err = s.estimateMaxMetricsPerRequest(dp[rangeStart:rangeEnd], localMaxMetricsPerRequest); err != nil {
return fmt.Errorf("Unable to determine proper number of metrics per request: %v", err)
}

// Retry the metrics with the new payload size
continue
}

blip.Debug("error sending data points to Datadog: %s, Http Response: %s", err, r)
return err
}

rangeStart = rangeEnd
}

payload := datadogV2.MetricPayload{
Series: dp,
// Update the maxMetricsPerRequest for the sink
if localMaxMetricsPerRequest < s.maxMetricsPerRequest {
s.maxMetricsPerRequestLock.Lock()
defer s.maxMetricsPerRequestLock.Unlock()
// Check the value again in case it changed after getting the lock
if localMaxMetricsPerRequest < s.maxMetricsPerRequest {
s.maxMetricsPerRequest = localMaxMetricsPerRequest
}
}

// Send metrics to Datadog. The Datadog client handles everything; we just pass
// it data points.
_, r, err := s.metricsApi.SubmitMetrics(ddCtx, payload, *datadogV2.NewSubmitMetricsOptionalParameters())
return nil
}

// Estimate the number of metrics we can send in a payload based on a sample metric
func (s *Datadog) estimateMaxMetricsPerRequest(metrics []datadogV2.MetricSeries, currentMaxMetricsPerRequest int) (int, error) {
// Estimate the size of a single metric
estMetricSize, err := s.estimateSize(metrics)
if err != nil {
blip.Debug("error sending data points to Datadog: %s", err)
blip.Debug("error sending data points to Datadog: Http Response - %s", r)
return 0, err
}

// Using our estimated metric size determine out how many metrics can fit inside the max payload, but pad it slightly to control for headers, etc.
estMaxMetricsPerRequest := (s.maxPayloadSize - 300) / estMetricSize

if estMaxMetricsPerRequest >= currentMaxMetricsPerRequest {
// If the estimated maximum is greater than what we currently have set as the maximum then
// reduce the current maximum by 10% as a guess for finding a maximnum number of metrics
// to send that will not be rejected by the API.
estMaxMetricsPerRequest = int(float32(currentMaxMetricsPerRequest) * .9)
}

// Ensure we send at least one metric per request
if estMaxMetricsPerRequest <= 0 {
estMaxMetricsPerRequest = 1
}

return estMaxMetricsPerRequest, nil
}

// Estimate the size of a metric payload for use in determining the maximum number of
// metrics per request. We take the total size of the payload and divide by the number
// of metrics.
func (s *Datadog) estimateSize(metrics []datadogV2.MetricSeries) (int, error) {
data, err := json.Marshal(metrics)
if err != nil {
return 0, err
}

size := len(data)

if s.compress {
var b bytes.Buffer
w := zlib.NewWriter(&b)
w.Write(data)
w.Close()
size = len(b.Bytes())
}

return err
return size / len(metrics), nil
}

func (s *Datadog) Name() string {
Expand Down
Loading

0 comments on commit a99ec0d

Please sign in to comment.