diff --git a/internal/aws/kinesis/kinesis.go b/internal/aws/kinesis/kinesis.go index 14f48412..744dfef5 100644 --- a/internal/aws/kinesis/kinesis.go +++ b/internal/aws/kinesis/kinesis.go @@ -19,25 +19,10 @@ import ( "github.com/golang/protobuf/proto" ) -const ( - // kplMaxBytes ensures that an aggregated Kinesis record will not exceed 25 KB, which is - // the minimum record size charged by the Kinesis service ("PUT Payload Unit"). Any record - // smaller than 25 KB will be charged as 25 KB and any record larger than 25 KB will be - // charged in 25 KB increments. See the Kinesis pricing page for more details: - // https://aws.amazon.com/kinesis/data-streams/pricing/. - kplMaxBytes = 1000 * 25 - // kplMaxCount is the maximum number of records that can be aggregated into a single Kinesis - // record. There is no limit imposed by the Kinesis service on the number of records that can - // be aggregated into a single Kinesis record, so this value is set to a reasonable upper bound. - kplMaxCount = 10000 -) - // Aggregate produces a KPL-compliant Kinesis record type Aggregate struct { Record *rec.AggregatedRecord Count int - MaxCount int - MaxSize int PartitionKey string } @@ -47,61 +32,10 @@ func (a *Aggregate) New() { a.Record = &rec.AggregatedRecord{} a.Count = 0 - if a.MaxCount == 0 { - a.MaxCount = kplMaxCount - } - if a.MaxCount > kplMaxCount { - a.MaxCount = kplMaxCount - } - - if a.MaxSize == 0 { - a.MaxSize = kplMaxBytes - } - if a.MaxSize > kplMaxBytes { - a.MaxSize = kplMaxBytes - } - a.PartitionKey = "" a.Record.PartitionKeyTable = make([]string, 0) } -func varIntSize(i int) int { - if i == 0 { - return 1 - } - - var needed int - for i > 0 { - needed++ - i >>= 1 - } - - bytes := needed / 7 - if needed%7 > 0 { - bytes++ - } - - return bytes -} - -func (a *Aggregate) calculateRecordSize(data []byte, partitionKey string) int { - var recordSize int - // https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L344-L349 - pkSize := 1 + varIntSize(len(partitionKey)) + len(partitionKey) - recordSize += pkSize - // https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L362-L364 - pkiSize := 1 + varIntSize(a.Count) - recordSize += pkiSize - // https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L371-L374 - dataSize := 1 + varIntSize(len(data)) + len(data) - recordSize += dataSize - // https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L376-L378 - recordSize = recordSize + 1 + varIntSize(pkiSize+dataSize) - - // input record size + current aggregated record size + 4 byte magic header + 16 byte MD5 digest - return recordSize + a.Record.XXX_Size() + 20 -} - // Add inserts a Kinesis record into an aggregated Kinesis record // https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L382 func (a *Aggregate) Add(data []byte, partitionKey string) bool { @@ -115,15 +49,6 @@ func (a *Aggregate) Add(data []byte, partitionKey string) bool { a.PartitionKey = partitionKey } - if a.Count > a.MaxCount { - return false - } - - newSize := a.calculateRecordSize(data, partitionKey) - if newSize > a.MaxSize { - return false - } - pki := uint64(a.Count) r := &rec.Record{ PartitionKeyIndex: &pki, diff --git a/internal/aws/kinesis/kinesis_test.go b/internal/aws/kinesis/kinesis_test.go index d7301add..8119b7a0 100644 --- a/internal/aws/kinesis/kinesis_test.go +++ b/internal/aws/kinesis/kinesis_test.go @@ -1,7 +1,6 @@ package kinesis import ( - "bytes" "context" "testing" @@ -129,42 +128,3 @@ func TestGetTags(t *testing.T) { } } } - -// tests that the calculated record size matches the size of returned data -func TestSize(t *testing.T) { - tests := []struct { - data []byte - repeat int - pk string - }{ - { - []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit."), - 1, - "8Ex8TUWD3dWUMh6dUKaT", - }, - { - []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit."), - 58, - "8Ex8TUWD3dWUMh6dUKaT", - }, - { - []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit."), - 235, - "8Ex8TUWD3dWUMh6dUKaT", - }, - } - - rec := Aggregate{} - rec.New() - - for _, test := range tests { - b := bytes.Repeat(test.data, test.repeat) - check := rec.calculateRecordSize(b, test.pk) - rec.Add(b, test.pk) - - data := rec.Get() - if check != len(data) { - t.Errorf("expected %v, got %v", len(data), check) - } - } -} diff --git a/transform/send_aws_kinesis_data_stream.go b/transform/send_aws_kinesis_data_stream.go index 36aa7f53..d66d5b37 100644 --- a/transform/send_aws_kinesis_data_stream.go +++ b/transform/send_aws_kinesis_data_stream.go @@ -174,15 +174,16 @@ func (tf *sendAWSKinesisDataStream) send(ctx context.Context, key string) error partitionKey = uuid.NewString() } - switch tf.conf.EnableRecordAggregation { - case false: - if _, err := tf.client.PutRecords(ctx, tf.conf.StreamName, partitionKey, data); err != nil { - return err - } - case true: - if _, err := tf.client.PutRecords(ctx, tf.conf.StreamName, partitionKey, tf.aggregateRecords(partitionKey, data)); err != nil { - return err - } + if tf.conf.EnableRecordAggregation { + data = tf.aggregateRecords(partitionKey, data) + } + + if len(data) == 0 { + return nil + } + + if _, err := tf.client.PutRecords(ctx, tf.conf.StreamName, partitionKey, data); err != nil { + return err } return nil @@ -194,15 +195,15 @@ func (tf *sendAWSKinesisDataStream) aggregateRecords(partitionKey string, data [ agg := &kinesis.Aggregate{} agg.New() - for _, b := range data { - if ok := agg.Add(b, partitionKey); ok { + for _, d := range data { + if ok := agg.Add(d, partitionKey); ok { continue } records = append(records, agg.Get()) agg.New() - _ = agg.Add(b, partitionKey) + _ = agg.Add(d, partitionKey) } if agg.Count > 0 {