Skip to content

Commit

Permalink
Adding Partitioning to Datadog Sink
Browse files Browse the repository at this point in the history
Added options for compression and partitioning to the Datadog sink when the API method is used. When receiving an error related to the payload size being too large the sink will attempt to calculate a possible size that will satisfy the size requirements and re-send the metrics. In the event that it exhausts options and cannot send the metrics the attempt will fail.

Added the option to turn on compression for the API, which allows much larger payloads by default.
  • Loading branch information
Areson committed Jan 3, 2023
1 parent 8334942 commit e69b2bd
Show file tree
Hide file tree
Showing 4 changed files with 517 additions and 28 deletions.
7 changes: 7 additions & 0 deletions docs/v1.0/sinks/datadog.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ sinks:
metric-translator: ""
metric-prefix: ""
dogstatsd-host: ""
api-use-compression: "false"
api-partition-size: ""
api-auto-partition: "true"
```
## API Considerations
When using the API directly the sink may run into issues when sending large payloads, as the Datadog API [restricts the payload to a maximum of 500Kib](https://docs.datadoghq.com/api/latest/metrics/). The DogStatsD option avoids this as it always sends metrics individually. By default the API option will attempt to automatically adjust the size of the payloads sent when it encounters an error related to a payload being too large. You can also explicitly set the number of metrics to send at once using `api-partition-size` and setting `api-auto-partition` to `false`. You can optionally turn on compression for the API, in which case Datadog will accept any payloads whose uncomressed size is less than 5Mib.
212 changes: 184 additions & 28 deletions sink/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
package sink

import (
"bytes"
"compress/zlib"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/DataDog/datadog-go/v5/statsd"
Expand All @@ -23,6 +29,10 @@ import (

var portRe = regexp.MustCompile(`:\d+$`)

const (
MAX_PAYLOAD_SIZE int = 512000
)

// Datadog sends metrics to Datadog.
type Datadog struct {
monitorId string
Expand All @@ -31,10 +41,14 @@ type Datadog struct {
prefix string // datadog.metric-prefix

// -- Api
metricsApi *datadogV2.MetricsApi
apiKeyAuth string
appKeyAuth string
resources []datadogV2.MetricResource
metricsApi *datadogV2.MetricsApi
apiKeyAuth string
appKeyAuth string
resources []datadogV2.MetricResource
compress bool
partitionSize int
autoPartition bool
partitionLock sync.Mutex

// -- DogStatsD
dogstatsd bool
Expand Down Expand Up @@ -62,9 +76,11 @@ func NewDatadog(monitorId string, opts, tags map[string]string, httpClient *http
}

d := &Datadog{
monitorId: monitorId,
tags: tagList,
resources: resources,
monitorId: monitorId,
tags: tagList,
resources: resources,
partitionSize: math.MaxInt32,
autoPartition: true,
}

for k, v := range opts {
Expand Down Expand Up @@ -104,9 +120,23 @@ func NewDatadog(monitorId string, opts, tags map[string]string, httpClient *http
}
d.prefix = v

case "api-use-compression":
d.compress = strings.ToLower(v) == "true"

case "dogstatsd-host":
d.dogstatsdHost = v

case "api-partition-size":
partitionSize, err := strconv.Atoi(v)
if err != nil || partitionSize <= 0 {
return nil, fmt.Errorf("datadog sink partition-size must be a positive integer")
}

d.partitionSize = partitionSize

case "api-auto-partition":
d.autoPartition = strings.ToLower(v) == "true"

default:
return nil, fmt.Errorf("invalid option: %s", k)
}
Expand Down Expand Up @@ -141,13 +171,69 @@ func NewDatadog(monitorId string, opts, tags map[string]string, httpClient *http

c := datadog.NewConfiguration()
c.HTTPClient = httpClient
c.Compress = d.compress
metricsApi := datadogV2.NewMetricsApi(datadog.NewAPIClient(c))
d.metricsApi = metricsApi
}

return d, nil
}

func (s *Datadog) sendApi(ddCtx context.Context, dp []datadogV2.MetricSeries) error {
// Send metrics to Datadog. The Datadog client handles everything; we just pass
// it data points.
payload := datadogV2.MetricPayload{
Series: dp,
}
_, r, err := s.metricsApi.SubmitMetrics(ddCtx, payload, *datadogV2.NewSubmitMetricsOptionalParameters())
// Check to see if the request was too large
if r.StatusCode == http.StatusRequestEntityTooLarge {
oldPartitionSize := s.partitionSize

// Sample and try and partition this to fit
if pErr := s.updatePartitionSize(dp[0], len(dp)); pErr != nil {
return fmt.Errorf("Unable to compute paritions properly send metrics: %s: %s", pErr, err)
}

localPartitionSize := s.partitionSize

if (localPartitionSize == len(dp) && oldPartitionSize == localPartitionSize) || oldPartitionSize == localPartitionSize {
// The partition size already matches the size of the data we have, so we can't retry any more
return fmt.Errorf("Unable to send metrics due to size: %s", err)
}

// Repartition and send again
var pErr error
rangeStart := 0
var i int
for i = localPartitionSize; i < len(dp); i += localPartitionSize {
if err = s.sendApi(ddCtx, dp[rangeStart:i]); err != nil {
pErr = fmt.Errorf("%s\n%s", pErr, err)
}
rangeStart = i
}

if i-rangeStart > 0 {
if i > len(dp) {
i = len(dp)
}

if err = s.sendApi(ddCtx, dp[rangeStart:i]); err != nil {
pErr = fmt.Errorf("%s\n%s", pErr, err)
}
}

return pErr
}

if err != nil {
blip.Debug("error sending data points to Datadog: %s", err)
blip.Debug("error sending data points to Datadog: Http Response - %s", r)
}

return err
}

func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error {
status.Monitor(s.monitorId, s.Name(), "sending metrics")

Expand All @@ -171,6 +257,25 @@ func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error {
}
n = 0

// Make a copy of the partition size we are using for API calls
localPartitionSize := s.partitionSize
rangeStart := 0
var apiErrors []string

// Setup our context for API calls
ddCtx := context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: s.apiKeyAuth,
},
"appKeyAuth": {
Key: s.apiKeyAuth,
},
},
)

// Convert each Blip metric value to an SFX data point
for domain := range m.Values { // each domain
metrics := m.Values[domain]
Expand Down Expand Up @@ -280,6 +385,14 @@ func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error {
}

n++

// Check if we have filled the current partition quantity
if !s.dogstatsd && n%localPartitionSize == 0 {
if err := s.sendApi(ddCtx, dp[rangeStart:n]); err != nil {
apiErrors = append(apiErrors, err.Error())
}
rangeStart = n
}
} // metric
} // domain

Expand All @@ -293,33 +406,76 @@ func (s *Datadog) Send(ctx context.Context, m *blip.Metrics) error {
return nil
}

// send metrics via API
ddCtx := context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: s.apiKeyAuth,
},
"appKeyAuth": {
Key: s.apiKeyAuth,
},
},
)
if n-rangeStart > 0 {
if err := s.sendApi(ddCtx, dp[rangeStart:n]); err != nil {
apiErrors = append(apiErrors, err.Error())
}
}

payload := datadogV2.MetricPayload{
Series: dp,
if len(apiErrors) > 0 {
return fmt.Errorf("%s", strings.Join(apiErrors, "\n"))
}

// Send metrics to Datadog. The Datadog client handles everything; we just pass
// it data points.
_, r, err := s.metricsApi.SubmitMetrics(ddCtx, payload, *datadogV2.NewSubmitMetricsOptionalParameters())
return nil
}

// Update our estimated partition
func (s *Datadog) updatePartitionSize(metric datadogV2.MetricSeries, currentMessageLength int) error {
if !s.autoPartition {
return nil
}

size, err := s.estimateSize(metric)
if err != nil {
blip.Debug("error sending data points to Datadog: %s", err)
blip.Debug("error sending data points to Datadog: Http Response - %s", r)
return err
}

return err
// Figure out how many metrics can fit inside the max payload, but pad it slightly to control for headers, etc.
estPartitionSize := (MAX_PAYLOAD_SIZE - 1000) / size

s.partitionLock.Lock()
defer s.partitionLock.Unlock()

if estPartitionSize >= s.partitionSize {
// The new partition size is greater than what we have now, so let's reduce it by a small factor to make sure we can send metrics properly
estPartitionSize = int(float32(s.partitionSize) * .9)
}

if estPartitionSize > currentMessageLength {
// If we needed to recalculate the parition size but it is larger than the current
// collection of messages then we need to reduce this more.
estPartitionSize = currentMessageLength - 1
}

// Ensure a minimum partition size
if estPartitionSize <= 0 {
estPartitionSize = 1
}

s.partitionSize = estPartitionSize
blip.Debug("Updating Datadog API parittion size to %d", estPartitionSize)

return nil
}

// Estiamte the size of a metric payload for use in partitioning
func (s *Datadog) estimateSize(metric datadogV2.MetricSeries) (int, error) {
data, err := json.Marshal(metric)
if err != nil {
return 0, err
}

size := len(data)

if s.compress {
var b bytes.Buffer
w := zlib.NewWriter(&b)
w.Write(data)
w.Close()
size = len(b.Bytes())
}

return size, nil
}

func (s *Datadog) Name() string {
Expand Down
Loading

0 comments on commit e69b2bd

Please sign in to comment.