Skip to content

Commit

Permalink
Add support for sharding based on metric name (#3170)
Browse files Browse the repository at this point in the history
  • Loading branch information
nevins-b authored and danielnelson committed Aug 28, 2017
1 parent 914a813 commit 77c6089
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 29 deletions.
28 changes: 25 additions & 3 deletions plugins/outputs/kinesis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ For this output plugin to function correctly the following variables must be con

* region
* streamname
* partitionkey

### region

Expand All @@ -44,17 +43,40 @@ The streamname is used by the plugin to ensure that data is sent to the correct
note that the stream *MUST* be pre-configured for this plugin to function correctly. If the stream does not exist the
plugin will result in telegraf exiting with an exit code of 1.

### partitionkey
### partitionkey [DEPRECATED]

This is used to group data within a stream. Currently this plugin only supports a single partitionkey.
Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable
solution to scale out.

### use_random_partitionkey
### use_random_partitionkey [DEPRECATED]

When true a random UUID will be generated and used as the partitionkey when sending data to Kinesis. This allows data to evenly spread across multiple shards in the stream. Due to using a random paritionKey there can be no guarantee of ordering when consuming the data off the shards.
If true then the partitionkey option will be ignored.

### partition

This is used to group data within a stream. Currently four methods are supported: random, static, tag or measurement

#### random

This will generate a UUIDv4 for each metric to spread them across shards.
Any guarantee of ordering is lost with this method

#### static

This uses a static string as a partitionkey.
All metrics will be mapped to the same shard which may limit throughput.

#### tag

This will take the value of the specified tag from each metric as the paritionKey.
If the tag is not found an empty string will be used.

#### measurement

This will use the measurement's name as the partitionKey.

### format

The format configuration value has been designated to allow people to change the format of the Point as written to
Expand Down
107 changes: 81 additions & 26 deletions plugins/outputs/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,31 @@ import (
"github.com/influxdata/telegraf/plugins/serializers"
)

type KinesisOutput struct {
Region string `toml:"region"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"`
Filename string `toml:"shared_credential_file"`
Token string `toml:"token"`

StreamName string `toml:"streamname"`
PartitionKey string `toml:"partitionkey"`
RandomPartitionKey bool `toml:"use_random_partitionkey"`
Debug bool `toml:"debug"`
svc *kinesis.Kinesis

serializer serializers.Serializer
}
type (
KinesisOutput struct {
Region string `toml:"region"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"`
Filename string `toml:"shared_credential_file"`
Token string `toml:"token"`

StreamName string `toml:"streamname"`
PartitionKey string `toml:"partitionkey"`
RandomPartitionKey bool `toml:"use_random_partitionkey"`
Partition *Partition `toml:"partition"`
Debug bool `toml:"debug"`
svc *kinesis.Kinesis

serializer serializers.Serializer
}

Partition struct {
Method string `toml:"method"`
Key string `toml:"key"`
}
)

var sampleConfig = `
## Amazon REGION of kinesis endpoint.
Expand All @@ -54,12 +62,32 @@ var sampleConfig = `
## Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName"
## PartitionKey as used for sharding data.
## DEPRECATED: PartitionKey as used for sharding data.
partitionkey = "PartitionKey"
## If set the paritionKey will be a random UUID on every put.
## DEPRECATED: If set the paritionKey will be a random UUID on every put.
## This allows for scaling across multiple shards in a stream.
## This will cause issues with ordering.
use_random_partitionkey = false
## The partition key can be calculated using one of several methods:
##
## Use a static value for all writes:
# [outputs.kinesis.partition]
# method = "static"
# key = "howdy"
#
## Use a random partition key on each write:
# [outputs.kinesis.partition]
# method = "random"
#
## Use the measurement name as the partition key:
# [outputs.kinesis.partition]
# method = "measurement"
#
## Use the value of a tag for all writes, if the tag is not set the empty
## string will be used:
# [outputs.kinesis.partition]
# method = "tag"
# key = "host"
## Data format to output.
Expand Down Expand Up @@ -129,6 +157,9 @@ func (k *KinesisOutput) Connect() error {
log.Printf("E! kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
os.Exit(1)
}
if k.Partition == nil {
log.Print("E! kinesis : Deprecated paritionkey configuration in use, please consider using outputs.kinesis.partition")
}
return err
}

Expand Down Expand Up @@ -163,6 +194,32 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du
return time.Since(start)
}

func (k *KinesisOutput) getPartitionKey(metric telegraf.Metric) string {
if k.Partition != nil {
switch k.Partition.Method {
case "static":
return k.Partition.Key
case "random":
u := uuid.NewV4()
return u.String()
case "measurement":
return metric.Name()
case "tag":
if metric.HasTag(k.Partition.Key) {
return metric.Tags()[k.Partition.Key]
}
log.Printf("E! kinesis : You have configured a Partition using tag %+v which does not exist.", k.Partition.Key)
default:
log.Printf("E! kinesis : You have configured a Partition method of %+v which is not supported", k.Partition.Method)
}
}
if k.RandomPartitionKey {
u := uuid.NewV4()
return u.String()
}
return k.PartitionKey
}

func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
var sz uint32

Expand All @@ -180,11 +237,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
return err
}

partitionKey := k.PartitionKey
if k.RandomPartitionKey {
u := uuid.NewV4()
partitionKey = u.String()
}
partitionKey := k.getPartitionKey(metric)

d := kinesis.PutRecordsRequestEntry{
Data: values,
Expand All @@ -202,8 +255,10 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
}

}

writekinesis(k, r)
if sz > 0 {
elapsed := writekinesis(k, r)
log.Printf("E! Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed)
}

return nil
}
Expand Down
77 changes: 77 additions & 0 deletions plugins/outputs/kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package kinesis

import (
"testing"

"github.com/influxdata/telegraf/testutil"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/assert"
)

func TestPartitionKey(t *testing.T) {

assert := assert.New(t)
testPoint := testutil.TestMetric(1)

k := KinesisOutput{
Partition: &Partition{
Method: "static",
Key: "-",
},
}
assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'")

k = KinesisOutput{
Partition: &Partition{
Method: "tag",
Key: "tag1",
},
}
assert.Equal(testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'")

k = KinesisOutput{
Partition: &Partition{
Method: "tag",
Key: "doesnotexist",
},
}
assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''")

k = KinesisOutput{
Partition: &Partition{
Method: "not supported",
},
}
assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''")

k = KinesisOutput{
Partition: &Partition{
Method: "measurement",
},
}
assert.Equal(testPoint.Name(), k.getPartitionKey(testPoint), "PartitionKey should be value of measurement name")

k = KinesisOutput{
Partition: &Partition{
Method: "random",
},
}
partitionKey := k.getPartitionKey(testPoint)
u, err := uuid.FromString(partitionKey)
assert.Nil(err, "Issue parsing UUID")
assert.Equal(uint(4), u.Version(), "PartitionKey should be UUIDv4")

k = KinesisOutput{
PartitionKey: "-",
}
assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'")

k = KinesisOutput{
RandomPartitionKey: true,
}
partitionKey = k.getPartitionKey(testPoint)
u, err = uuid.FromString(partitionKey)
assert.Nil(err, "Issue parsing UUID")
assert.Equal(uint(4), u.Version(), "PartitionKey should be UUIDv4")

}

0 comments on commit 77c6089

Please sign in to comment.