Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Partitioning to Datadog Sink #88

Merged
merged 1 commit into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/v1.0/sinks/datadog.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ sinks:
metric-translator: ""
metric-prefix: ""
dogstatsd-host: ""
```
api-compress: "true"
```
177 changes: 153 additions & 24 deletions sink/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
package sink

import (
"bytes"
"compress/zlib"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/DataDog/datadog-go/v5/statsd"
Expand All @@ -23,6 +29,10 @@ import (

var portRe = regexp.MustCompile(`:\d+$`)

const (
MAX_PAYLOAD_SIZE int = 512000
)

// Datadog sends metrics to Datadog.
type Datadog struct {
monitorId string
Expand All @@ -35,6 +45,11 @@ type Datadog struct {
apiKeyAuth string
appKeyAuth string
resources []datadogV2.MetricResource
compress bool

maxMetricsPerRequest int // Limit the number of metrics we send per request. Only used with the API
maxMetricsPerRequestLock sync.Mutex
maxPayloadSize int

// -- DogStatsD
dogstatsd bool
Expand Down Expand Up @@ -62,9 +77,12 @@ func NewDatadog(monitorId string, opts, tags map[string]string, httpClient *http
}

d := &Datadog{
monitorId: monitorId,
tags: tagList,
resources: resources,
monitorId: monitorId,
tags: tagList,
resources: resources,
maxMetricsPerRequest: math.MaxInt32, // By default don't limit the number of metrics per request.
compress: true,
maxPayloadSize: MAX_PAYLOAD_SIZE,
}

for k, v := range opts {
Expand Down Expand Up @@ -104,6 +122,9 @@ func NewDatadog(monitorId string, opts, tags map[string]string, httpClient *http
}
d.prefix = v

case "api-compress":
d.compress = blip.Bool(v)

case "dogstatsd-host":
d.dogstatsdHost = v

Expand Down Expand Up @@ -141,6 +162,7 @@ func NewDatadog(monitorId string, opts, tags map[string]string, httpClient *http

c := datadog.NewConfiguration()
c.HTTPClient = httpClient
c.Compress = d.compress
metricsApi := datadogV2.NewMetricsApi(datadog.NewAPIClient(c))
d.metricsApi = metricsApi
}
Expand Down Expand Up @@ -171,6 +193,25 @@ func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error {
}
n = 0

// Make a copy of maxMetricsPerRequest in case it gets updated by other threads
localMaxMetricsPerRequest := s.maxMetricsPerRequest
rangeStart := 0
var apiErrors []string

// Setup our context for API calls
ddCtx := context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: s.apiKeyAuth,
},
"appKeyAuth": {
Key: s.apiKeyAuth,
},
},
)

// Convert each Blip metric value to an SFX data point
for domain := range m.Values { // each domain
metrics := m.Values[domain]
Expand Down Expand Up @@ -280,6 +321,14 @@ func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error {
}

n++

// Check if we have reached the maximum number of metrics per request
if !s.dogstatsd && n%localMaxMetricsPerRequest == 0 {
if err := s.sendApi(ddCtx, dp[rangeStart:n]); err != nil {
apiErrors = append(apiErrors, err.Error())
}
rangeStart = n
}
} // metric
} // domain

Expand All @@ -293,33 +342,113 @@ func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error {
return nil
}

// send metrics via API
ddCtx := context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: s.apiKeyAuth,
},
"appKeyAuth": {
Key: s.apiKeyAuth,
},
},
)
if n-rangeStart > 0 {
if err := s.sendApi(ddCtx, dp[rangeStart:n]); err != nil {
apiErrors = append(apiErrors, err.Error())
}
}

if len(apiErrors) > 0 {
return fmt.Errorf("%s", strings.Join(apiErrors, "\n"))
}

return nil
}

// Send metrics to the API taking into consideration the number of metrics sent per request.
func (s *Datadog) sendApi(ddCtx context.Context, dp []datadogV2.MetricSeries) error {
localMaxMetricsPerRequest := s.maxMetricsPerRequest

for rangeStart := 0; rangeStart < len(dp); {
// Determine the subetset of metrics to send based on our
// max per request
rangeEnd := rangeStart + localMaxMetricsPerRequest
if rangeEnd > len(dp) {
rangeEnd = len(dp)
}

if _, r, err := s.metricsApi.SubmitMetrics(ddCtx, *datadogV2.NewMetricPayload(dp[rangeStart:rangeEnd]), *datadogV2.NewSubmitMetricsOptionalParameters()); err != nil {
if r.StatusCode == http.StatusRequestEntityTooLarge {
// Is the number of metrics sent already the smallest possible?
if localMaxMetricsPerRequest == 1 {
return fmt.Errorf("Unable to send metrics: %v", err)
}

// The payload was too large, so we need to recalculate it and try with a smaller size
if localMaxMetricsPerRequest, err = s.estimateMaxMetricsPerRequest(dp[rangeStart:rangeEnd], localMaxMetricsPerRequest); err != nil {
return fmt.Errorf("Unable to determine proper number of metrics per request: %v", err)
}

// Retry the metrics with the new payload size
continue
}

blip.Debug("error sending data points to Datadog: %s, Http Response: %s", err, r)
return err
}

rangeStart = rangeEnd
}

payload := datadogV2.MetricPayload{
Series: dp,
// Update the maxMetricsPerRequest for the sink
if localMaxMetricsPerRequest < s.maxMetricsPerRequest {
s.maxMetricsPerRequestLock.Lock()
defer s.maxMetricsPerRequestLock.Unlock()
// Check the value again in case it changed after getting the lock
if localMaxMetricsPerRequest < s.maxMetricsPerRequest {
s.maxMetricsPerRequest = localMaxMetricsPerRequest
}
}

// Send metrics to Datadog. The Datadog client handles everything; we just pass
// it data points.
_, r, err := s.metricsApi.SubmitMetrics(ddCtx, payload, *datadogV2.NewSubmitMetricsOptionalParameters())
return nil
}

// Estimate the number of metrics we can send in a payload based on a sample metric
func (s *Datadog) estimateMaxMetricsPerRequest(metrics []datadogV2.MetricSeries, currentMaxMetricsPerRequest int) (int, error) {
// Estimate the size of a single metric
estMetricSize, err := s.estimateSize(metrics)
if err != nil {
blip.Debug("error sending data points to Datadog: %s", err)
blip.Debug("error sending data points to Datadog: Http Response - %s", r)
return 0, err
}

// Using our estimated metric size determine out how many metrics can fit inside the max payload, but pad it slightly to control for headers, etc.
estMaxMetricsPerRequest := (s.maxPayloadSize - 300) / estMetricSize

if estMaxMetricsPerRequest >= currentMaxMetricsPerRequest {
// If the estimated maximum is greater than what we currently have set as the maximum then
// reduce the current maximum by 10% as a guess for finding a maximnum number of metrics
// to send that will not be rejected by the API.
estMaxMetricsPerRequest = int(float32(currentMaxMetricsPerRequest) * .9)
}

// Ensure we send at least one metric per request
if estMaxMetricsPerRequest <= 0 {
estMaxMetricsPerRequest = 1
}

return estMaxMetricsPerRequest, nil
}

// Estimate the size of a metric payload for use in determining the maximum number of
// metrics per request. We take the total size of the payload and divide by the number
// of metrics.
func (s *Datadog) estimateSize(metrics []datadogV2.MetricSeries) (int, error) {
data, err := json.Marshal(metrics)
if err != nil {
return 0, err
}

size := len(data)

if s.compress {
var b bytes.Buffer
w := zlib.NewWriter(&b)
w.Write(data)
w.Close()
size = len(b.Bytes())
}

return err
return size / len(metrics), nil
}

func (s *Datadog) Name() string {
Expand Down
Loading