diff --git a/transform/send_aws_kinesis_data_stream.go b/transform/send_aws_kinesis_data_stream.go index 532ed0a1..e07e617c 100644 --- a/transform/send_aws_kinesis_data_stream.go +++ b/transform/send_aws_kinesis_data_stream.go @@ -192,19 +192,18 @@ func (tf *sendAWSKinesisDataStream) send(ctx context.Context, key string) error func (tf *sendAWSKinesisDataStream) aggregateRecords(partitionKey string, data [][]byte) [][]byte { var records [][]byte + // Aggregation silently drops any data that is between ~0.9999 MB and 1 MB. agg := &kinesis.Aggregate{} agg.New() for _, d := range data { if ok := agg.Add(d, partitionKey); ok { continue + } else if agg.Count > 0 { + records = append(records, agg.Get()) } - records = append(records, agg.Get()) - agg.New() - - // This silently drops any data that is between ~0.9999 MB and 1 MB. _ = agg.Add(d, partitionKey) }