diff --git a/internal/aws/kinesis/kinesis.go b/internal/aws/kinesis/kinesis.go index 744dfef5..9c2531af 100644 --- a/internal/aws/kinesis/kinesis.go +++ b/internal/aws/kinesis/kinesis.go @@ -23,6 +23,7 @@ import ( type Aggregate struct { Record *rec.AggregatedRecord Count int + Size int PartitionKey string } @@ -31,11 +32,49 @@ type Aggregate struct { func (a *Aggregate) New() { a.Record = &rec.AggregatedRecord{} a.Count = 0 + a.Size = 0 a.PartitionKey = "" a.Record.PartitionKeyTable = make([]string, 0) } +func varIntSize(i int) int { + if i == 0 { + return 1 + } + + var needed int + for i > 0 { + needed++ + i >>= 1 + } + + bytes := needed / 7 + if needed%7 > 0 { + bytes++ + } + + return bytes +} + +func (a *Aggregate) calculateRecordSize(data []byte, partitionKey string) int { + var recordSize int + // https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L344-L349 + pkSize := 1 + varIntSize(len(partitionKey)) + len(partitionKey) + recordSize += pkSize + // https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L362-L364 + pkiSize := 1 + varIntSize(a.Count) + recordSize += pkiSize + // https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L371-L374 + dataSize := 1 + varIntSize(len(data)) + len(data) + recordSize += dataSize + // https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L376-L378 + recordSize = recordSize + 1 + varIntSize(pkiSize+dataSize) + + // input record size + current aggregated record size + 4 byte magic header + 16 byte MD5 digest + return recordSize + a.Record.XXX_Size() + 20 +} + // Add inserts a Kinesis record into an aggregated Kinesis record // https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L382 func (a *Aggregate) Add(data []byte, partitionKey string) bool { @@ -49,15 +88,25 @@ func (a *Aggregate) Add(data []byte, partitionKey string) bool { a.PartitionKey = partitionKey } + // Verify the record size won't exceed the 1 MB limit of the Kinesis service. + // https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html + if a.calculateRecordSize(data, partitionKey) > 1024*1024 { + return false + } + pki := uint64(a.Count) r := &rec.Record{ PartitionKeyIndex: &pki, Data: data, } + // Append the data to the aggregated record. a.Record.Records = append(a.Record.Records, r) a.Record.PartitionKeyTable = append(a.Record.PartitionKeyTable, partitionKey) + + // Update the record count and size. This is not used in the aggregated record. a.Count++ + a.Size += a.calculateRecordSize(data, partitionKey) return true } diff --git a/transform/send_aws_kinesis_data_stream.go b/transform/send_aws_kinesis_data_stream.go index d66d5b37..532ed0a1 100644 --- a/transform/send_aws_kinesis_data_stream.go +++ b/transform/send_aws_kinesis_data_stream.go @@ -203,6 +203,8 @@ func (tf *sendAWSKinesisDataStream) aggregateRecords(partitionKey string, data [ records = append(records, agg.Get()) agg.New() + + // This silently drops any data that is between ~0.9999 MB and 1 MB. _ = agg.Add(d, partitionKey) }