diff --git a/internal/aws/kinesis/kinesis.go b/internal/aws/kinesis/kinesis.go index b98858d1..14f48412 100644 --- a/internal/aws/kinesis/kinesis.go +++ b/internal/aws/kinesis/kinesis.go @@ -20,10 +20,16 @@ import ( ) const ( - kplMagicLen = 4 // Length of magic header for KPL Aggregate Record checking. - kplDigestSize = 16 // MD5 Message size for protobuf. - kplMaxBytes = 1024 * 1024 - kplMaxCount = 10000 + // kplMaxBytes ensures that an aggregated Kinesis record will not exceed 25 KB, which is + // the minimum record size charged by the Kinesis service ("PUT Payload Unit"). Any record + // smaller than 25 KB will be charged as 25 KB and any record larger than 25 KB will be + // charged in 25 KB increments. See the Kinesis pricing page for more details: + // https://aws.amazon.com/kinesis/data-streams/pricing/. + kplMaxBytes = 1000 * 25 + // kplMaxCount is the maximum number of records that can be aggregated into a single Kinesis + // record. There is no limit imposed by the Kinesis service on the number of records that can + // be aggregated into a single Kinesis record, so this value is set to a reasonable upper bound. + kplMaxCount = 10000 ) // Aggregate produces a KPL-compliant Kinesis record @@ -190,23 +196,6 @@ func (a *API) IsEnabled() bool { return a.Client != nil } -// PutRecord is a convenience wrapper for putting a record into a Kinesis stream. -func (a *API) PutRecord(ctx aws.Context, stream, partitionKey string, data []byte) (*kinesis.PutRecordOutput, error) { - ctx = context.WithoutCancel(ctx) - resp, err := a.Client.PutRecordWithContext( - ctx, - &kinesis.PutRecordInput{ - Data: data, - StreamName: aws.String(stream), - PartitionKey: aws.String(partitionKey), - }) - if err != nil { - return nil, fmt.Errorf("putrecord stream %s partitionkey %s: %v", stream, partitionKey, err) - } - - return resp, nil -} - // PutRecords is a convenience wrapper for putting multiple records into a Kinesis stream. func (a *API) PutRecords(ctx aws.Context, stream, partitionKey string, data [][]byte) (*kinesis.PutRecordsOutput, error) { var records []*kinesis.PutRecordsRequestEntry diff --git a/internal/aws/kinesis/kinesis_test.go b/internal/aws/kinesis/kinesis_test.go index eaded773..d7301add 100644 --- a/internal/aws/kinesis/kinesis_test.go +++ b/internal/aws/kinesis/kinesis_test.go @@ -11,47 +11,6 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" ) -type mockedPutRecord struct { - kinesisiface.KinesisAPI - Resp kinesis.PutRecordOutput -} - -func (m mockedPutRecord) PutRecordWithContext(ctx aws.Context, in *kinesis.PutRecordInput, opts ...request.Option) (*kinesis.PutRecordOutput, error) { - return &m.Resp, nil -} - -func TestPutRecord(t *testing.T) { - tests := []struct { - resp kinesis.PutRecordOutput - expected string - }{ - { - resp: kinesis.PutRecordOutput{ - EncryptionType: aws.String("NONE"), - SequenceNumber: aws.String("ABCDEF"), - ShardId: aws.String("XYZ"), - }, - expected: "ABCDEF", - }, - } - - ctx := context.TODO() - - for _, test := range tests { - a := API{ - mockedPutRecord{Resp: test.resp}, - } - resp, err := a.PutRecord(ctx, "", "", []byte("")) - if err != nil { - t.Fatalf("%v", err) - } - - if *resp.SequenceNumber != test.expected { - t.Errorf("expected %+v, got %s", resp.SequenceNumber, test.expected) - } - } -} - type mockedPutRecords struct { kinesisiface.KinesisAPI Resp kinesis.PutRecordsOutput @@ -185,12 +144,12 @@ func TestSize(t *testing.T) { }, { []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit."), - 235, + 58, "8Ex8TUWD3dWUMh6dUKaT", }, { []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit."), - 5678, + 235, "8Ex8TUWD3dWUMh6dUKaT", }, } diff --git a/transform/send_aws_kinesis_data_stream.go b/transform/send_aws_kinesis_data_stream.go index 4fec7608..36aa7f53 100644 --- a/transform/send_aws_kinesis_data_stream.go +++ b/transform/send_aws_kinesis_data_stream.go @@ -180,7 +180,7 @@ func (tf *sendAWSKinesisDataStream) send(ctx context.Context, key string) error return err } case true: - if err := tf.sendAggregateRecord(ctx, tf.conf.StreamName, partitionKey, data); err != nil { + if _, err := tf.client.PutRecords(ctx, tf.conf.StreamName, partitionKey, tf.aggregateRecords(partitionKey, data)); err != nil { return err } } @@ -188,7 +188,9 @@ func (tf *sendAWSKinesisDataStream) send(ctx context.Context, key string) error return nil } -func (tf *sendAWSKinesisDataStream) sendAggregateRecord(ctx context.Context, stream, partitionKey string, data [][]byte) error { +func (tf *sendAWSKinesisDataStream) aggregateRecords(partitionKey string, data [][]byte) [][]byte { + var records [][]byte + agg := &kinesis.Aggregate{} agg.New() @@ -197,19 +199,15 @@ func (tf *sendAWSKinesisDataStream) sendAggregateRecord(ctx context.Context, str continue } - if _, err := tf.client.PutRecord(ctx, stream, partitionKey, agg.Get()); err != nil { - return err - } + records = append(records, agg.Get()) agg.New() _ = agg.Add(b, partitionKey) } if agg.Count > 0 { - if _, err := tf.client.PutRecord(ctx, stream, partitionKey, agg.Get()); err != nil { - return err - } + records = append(records, agg.Get()) } - return nil + return records }