Skip to content

Commit

Permalink
fix(kinesis): remove Kinesis aggregation size and count limits (#168)
Browse files Browse the repository at this point in the history
Substation uses the `internal/aggregate` package to buffer events
with consideration to buffer, size, counts, and durations which
shadow the checks here with adherence to Kinesis limits in a more
user friendly way. Inclusion of the size checks here allowed events
between 25KiB and 1MiB to be sent as empty events to destinations.
  • Loading branch information
shellcromancer committed May 3, 2024
1 parent 83f71b0 commit 156750b
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 127 deletions.
75 changes: 0 additions & 75 deletions internal/aws/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,10 @@ import (
"github.com/golang/protobuf/proto"
)

const (
// kplMaxBytes ensures that an aggregated Kinesis record will not exceed 25 KB, which is
// the minimum record size charged by the Kinesis service ("PUT Payload Unit"). Any record
// smaller than 25 KB will be charged as 25 KB and any record larger than 25 KB will be
// charged in 25 KB increments. See the Kinesis pricing page for more details:
// https://aws.amazon.com/kinesis/data-streams/pricing/.
kplMaxBytes = 1000 * 25
// kplMaxCount is the maximum number of records that can be aggregated into a single Kinesis
// record. There is no limit imposed by the Kinesis service on the number of records that can
// be aggregated into a single Kinesis record, so this value is set to a reasonable upper bound.
kplMaxCount = 10000
)

// Aggregate produces a KPL-compliant Kinesis record
type Aggregate struct {
Record *rec.AggregatedRecord
Count int
MaxCount int
MaxSize int
PartitionKey string
}

Expand All @@ -47,61 +32,10 @@ func (a *Aggregate) New() {
a.Record = &rec.AggregatedRecord{}
a.Count = 0

if a.MaxCount == 0 {
a.MaxCount = kplMaxCount
}
if a.MaxCount > kplMaxCount {
a.MaxCount = kplMaxCount
}

if a.MaxSize == 0 {
a.MaxSize = kplMaxBytes
}
if a.MaxSize > kplMaxBytes {
a.MaxSize = kplMaxBytes
}

a.PartitionKey = ""
a.Record.PartitionKeyTable = make([]string, 0)
}

func varIntSize(i int) int {
if i == 0 {
return 1
}

var needed int
for i > 0 {
needed++
i >>= 1
}

bytes := needed / 7
if needed%7 > 0 {
bytes++
}

return bytes
}

func (a *Aggregate) calculateRecordSize(data []byte, partitionKey string) int {
var recordSize int
// https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L344-L349
pkSize := 1 + varIntSize(len(partitionKey)) + len(partitionKey)
recordSize += pkSize
// https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L362-L364
pkiSize := 1 + varIntSize(a.Count)
recordSize += pkiSize
// https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L371-L374
dataSize := 1 + varIntSize(len(data)) + len(data)
recordSize += dataSize
// https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L376-L378
recordSize = recordSize + 1 + varIntSize(pkiSize+dataSize)

// input record size + current aggregated record size + 4 byte magic header + 16 byte MD5 digest
return recordSize + a.Record.XXX_Size() + 20
}

// Add inserts a Kinesis record into an aggregated Kinesis record
// https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L382
func (a *Aggregate) Add(data []byte, partitionKey string) bool {
Expand All @@ -115,15 +49,6 @@ func (a *Aggregate) Add(data []byte, partitionKey string) bool {
a.PartitionKey = partitionKey
}

if a.Count > a.MaxCount {
return false
}

newSize := a.calculateRecordSize(data, partitionKey)
if newSize > a.MaxSize {
return false
}

pki := uint64(a.Count)
r := &rec.Record{
PartitionKeyIndex: &pki,
Expand Down
40 changes: 0 additions & 40 deletions internal/aws/kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kinesis

import (
"bytes"
"context"
"testing"

Expand Down Expand Up @@ -129,42 +128,3 @@ func TestGetTags(t *testing.T) {
}
}
}

// tests that the calculated record size matches the size of returned data
func TestSize(t *testing.T) {
tests := []struct {
data []byte
repeat int
pk string
}{
{
[]byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit."),
1,
"8Ex8TUWD3dWUMh6dUKaT",
},
{
[]byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit."),
58,
"8Ex8TUWD3dWUMh6dUKaT",
},
{
[]byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit."),
235,
"8Ex8TUWD3dWUMh6dUKaT",
},
}

rec := Aggregate{}
rec.New()

for _, test := range tests {
b := bytes.Repeat(test.data, test.repeat)
check := rec.calculateRecordSize(b, test.pk)
rec.Add(b, test.pk)

data := rec.Get()
if check != len(data) {
t.Errorf("expected %v, got %v", len(data), check)
}
}
}
25 changes: 13 additions & 12 deletions transform/send_aws_kinesis_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,16 @@ func (tf *sendAWSKinesisDataStream) send(ctx context.Context, key string) error
partitionKey = uuid.NewString()
}

switch tf.conf.EnableRecordAggregation {
case false:
if _, err := tf.client.PutRecords(ctx, tf.conf.StreamName, partitionKey, data); err != nil {
return err
}
case true:
if _, err := tf.client.PutRecords(ctx, tf.conf.StreamName, partitionKey, tf.aggregateRecords(partitionKey, data)); err != nil {
return err
}
if tf.conf.EnableRecordAggregation {
data = tf.aggregateRecords(partitionKey, data)
}

if len(data) == 0 {
return nil
}

if _, err := tf.client.PutRecords(ctx, tf.conf.StreamName, partitionKey, data); err != nil {
return err
}

return nil
Expand All @@ -194,15 +195,15 @@ func (tf *sendAWSKinesisDataStream) aggregateRecords(partitionKey string, data [
agg := &kinesis.Aggregate{}
agg.New()

for _, b := range data {
if ok := agg.Add(b, partitionKey); ok {
for _, d := range data {
if ok := agg.Add(d, partitionKey); ok {
continue
}

records = append(records, agg.Get())

agg.New()
_ = agg.Add(b, partitionKey)
_ = agg.Add(d, partitionKey)
}

if agg.Count > 0 {
Expand Down

0 comments on commit 156750b

Please sign in to comment.