Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup and adding support for sharding based on metric name #3170

Merged
merged 6 commits into from
Aug 28, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions plugins/outputs/kinesis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ For this output plugin to function correctly the following variables must be con

* region
* streamname
* partitionkey

### region

Expand All @@ -44,17 +43,40 @@ The streamname is used by the plugin to ensure that data is sent to the correct
note that the stream *MUST* be pre-configured for this plugin to function correctly. If the stream does not exist the
plugin will result in telegraf exiting with an exit code of 1.

### partitionkey
### partitionkey [DEPRECATED]

This is used to group data within a stream. Currently this plugin only supports a single partitionkey.
Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable
solution to scale out.

### use_random_partitionkey
### use_random_partitionkey [DEPRECATED]

When true a random UUID will be generated and used as the partitionkey when sending data to Kinesis. This allows data to evenly spread across multiple shards in the stream. Due to using a random paritionKey there can be no guarantee of ordering when consuming the data off the shards.
If true then the partitionkey option will be ignored.

### partition

This is used to group data within a stream. Currently four methods are supported: random, static, tag or measurement

#### random

This will generate a UUIDv4 for each metric to spread them across shards.
Any guarantee of ordering is lost with this method

#### static

This uses a static string as a partitionkey.
All metrics will be mapped to the same shard which may limit throughput.

#### tag

This will take the value of the specified tag from each metric as the paritionKey.
If the tag is not found an empty string will be used.

#### measurement

This will use the measurement's name as the partitionKey.

### format

The format configuration value has been designated to allow people to change the format of the Point as written to
Expand Down
108 changes: 82 additions & 26 deletions plugins/outputs/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,31 @@ import (
"github.com/influxdata/telegraf/plugins/serializers"
)

type KinesisOutput struct {
Region string `toml:"region"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"`
Filename string `toml:"shared_credential_file"`
Token string `toml:"token"`

StreamName string `toml:"streamname"`
PartitionKey string `toml:"partitionkey"`
RandomPartitionKey bool `toml:"use_random_partitionkey"`
Debug bool `toml:"debug"`
svc *kinesis.Kinesis

serializer serializers.Serializer
}
type (
KinesisOutput struct {
Region string `toml:"region"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"`
Filename string `toml:"shared_credential_file"`
Token string `toml:"token"`

StreamName string `toml:"streamname"`
PartitionKey string `toml:"partitionkey"`
RandomPartitionKey bool `toml:"use_random_partitionkey"`
Partition *Partition `toml:"partition"`
Debug bool `toml:"debug"`
svc *kinesis.Kinesis

serializer serializers.Serializer
}

Partition struct {
Method string `toml:"method"`
Key string `toml:"key"`
}
)

var sampleConfig = `
## Amazon REGION of kinesis endpoint.
Expand All @@ -54,12 +62,32 @@ var sampleConfig = `

## Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName"
## PartitionKey as used for sharding data.
## DEPRECATED: PartitionKey as used for sharding data, if not set the metric name will be used.
partitionkey = "PartitionKey"
## If set the paritionKey will be a random UUID on every put.
## DEPRECATED: If set the paritionKey will be a random UUID on every put.
## This allows for scaling across multiple shards in a stream.
## This will cause issues with ordering.
use_random_partitionkey = false
## The partition key can be calculated using one of several methods:
##
## Use a static value for all writes:
# [[outputs.kinesis.partition]]
# method = "static"
# key = "howdy"
#
## Use a random partition key on each write:
# [outputs.kinesis.partition]
# method = "random"
#
## Use the measurement name as the partition key:
# [[outputs.kinesis.partition]]
# method = "measurement"
#
## Use the value of a tag for all writes, if the tag is not set the empty
## string will be used:
# [[outputs.kinesis.partition]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make these all single bracket options so you can only set one.

# method = "tag"
# key = "host"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use 2 space indent in the sample config.



## Data format to output.
Expand Down Expand Up @@ -129,6 +157,9 @@ func (k *KinesisOutput) Connect() error {
log.Printf("E! kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
os.Exit(1)
}
if k.Partition == nil {
log.Print("E! kinesis : Deprecated paritionkey configuration in use, please consider using outputs.kinesis.partition")
}
return err
}

Expand Down Expand Up @@ -159,10 +190,37 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du
if err != nil {
log.Printf("E! kinesis: Unable to write to Kinesis : %+v \n", err.Error())
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

}
return time.Since(start)
}

func (k *KinesisOutput) getPartitionKey(metric telegraf.Metric) string {
if k.Partition != nil {
switch k.Partition.Method {
case "static":
return k.Partition.Key
case "random":
u := uuid.NewV4()
return u.String()
case "measurement":
return metric.Name()
case "tag":
if metric.HasTag(k.Partition.Key) {
return metric.Tags()[k.Partition.Key]
}
log.Printf("E! kinesis : You have configured a Partition using tag %+v which does not exist.", k.Partition.Key)
default:
log.Printf("E! kinesis : You have configured a Partition method of %+v which is not supported", k.Partition.Method)
}
}
if k.RandomPartitionKey {
u := uuid.NewV4()
return u.String()
}
return k.PartitionKey
}

func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
var sz uint32

Expand All @@ -180,11 +238,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
return err
}

partitionKey := k.PartitionKey
if k.RandomPartitionKey {
u := uuid.NewV4()
partitionKey = u.String()
}
partitionKey := k.getPartitionKey(metric)

d := kinesis.PutRecordsRequestEntry{
Data: values,
Expand All @@ -202,8 +256,10 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
}

}

writekinesis(k, r)
if sz > 0 {
elapsed := writekinesis(k, r)
log.Printf("E! Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed)
}

return nil
}
Expand Down
77 changes: 77 additions & 0 deletions plugins/outputs/kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package kinesis

import (
"testing"

"github.com/influxdata/telegraf/testutil"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/assert"
)

func TestPartitionKey(t *testing.T) {

assert := assert.New(t)
testPoint := testutil.TestMetric(1)

k := KinesisOutput{
Partition: &Partition{
Method: "static",
Key: "-",
},
}
assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'")

k = KinesisOutput{
Partition: &Partition{
Method: "tag",
Key: "tag1",
},
}
assert.Equal(testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'")

k = KinesisOutput{
Partition: &Partition{
Method: "tag",
Key: "doesnotexist",
},
}
assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''")

k = KinesisOutput{
Partition: &Partition{
Method: "not supported",
},
}
assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''")

k = KinesisOutput{
Partition: &Partition{
Method: "measurement",
},
}
assert.Equal(testPoint.Name(), k.getPartitionKey(testPoint), "PartitionKey should be value of measurement name")

k = KinesisOutput{
Partition: &Partition{
Method: "random",
},
}
partitionKey := k.getPartitionKey(testPoint)
u, err := uuid.FromString(partitionKey)
assert.Nil(err, "Issue parsing UUID")
assert.Equal(uint(4), u.Version(), "PartitionKey should be UUIDv4")

k = KinesisOutput{
PartitionKey: "-",
}
assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'")

k = KinesisOutput{
RandomPartitionKey: true,
}
partitionKey = k.getPartitionKey(testPoint)
u, err = uuid.FromString(partitionKey)
assert.Nil(err, "Issue parsing UUID")
assert.Equal(uint(4), u.Version(), "PartitionKey should be UUIDv4")

}