Skip to content

Commit

Permalink
perf(aws): Reduce Aggregated Kinesis Record Size (#147)
Browse files Browse the repository at this point in the history
* refactor(aws): Kinesis Max Bytes, PutRecord

* perf(transform): Send Kinesis PutRecords

* docs(aws): Kinesis KPL Max
  • Loading branch information
jshlbrd committed Mar 19, 2024
1 parent 20bfc72 commit a0ef232
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 73 deletions.
31 changes: 10 additions & 21 deletions internal/aws/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ import (
)

const (
kplMagicLen = 4 // Length of magic header for KPL Aggregate Record checking.
kplDigestSize = 16 // MD5 Message size for protobuf.
kplMaxBytes = 1024 * 1024
kplMaxCount = 10000
// kplMaxBytes ensures that an aggregated Kinesis record will not exceed 25 KB, which is
// the minimum record size charged by the Kinesis service ("PUT Payload Unit"). Any record
// smaller than 25 KB will be charged as 25 KB and any record larger than 25 KB will be
// charged in 25 KB increments. See the Kinesis pricing page for more details:
// https://aws.amazon.com/kinesis/data-streams/pricing/.
kplMaxBytes = 1000 * 25
// kplMaxCount is the maximum number of records that can be aggregated into a single Kinesis
// record. There is no limit imposed by the Kinesis service on the number of records that can
// be aggregated into a single Kinesis record, so this value is set to a reasonable upper bound.
kplMaxCount = 10000
)

// Aggregate produces a KPL-compliant Kinesis record
Expand Down Expand Up @@ -190,23 +196,6 @@ func (a *API) IsEnabled() bool {
return a.Client != nil
}

// PutRecord is a convenience wrapper for putting a record into a Kinesis stream.
func (a *API) PutRecord(ctx aws.Context, stream, partitionKey string, data []byte) (*kinesis.PutRecordOutput, error) {
ctx = context.WithoutCancel(ctx)
resp, err := a.Client.PutRecordWithContext(
ctx,
&kinesis.PutRecordInput{
Data: data,
StreamName: aws.String(stream),
PartitionKey: aws.String(partitionKey),
})
if err != nil {
return nil, fmt.Errorf("putrecord stream %s partitionkey %s: %v", stream, partitionKey, err)
}

return resp, nil
}

// PutRecords is a convenience wrapper for putting multiple records into a Kinesis stream.
func (a *API) PutRecords(ctx aws.Context, stream, partitionKey string, data [][]byte) (*kinesis.PutRecordsOutput, error) {
var records []*kinesis.PutRecordsRequestEntry
Expand Down
45 changes: 2 additions & 43 deletions internal/aws/kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,6 @@ import (
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
)

type mockedPutRecord struct {
kinesisiface.KinesisAPI
Resp kinesis.PutRecordOutput
}

func (m mockedPutRecord) PutRecordWithContext(ctx aws.Context, in *kinesis.PutRecordInput, opts ...request.Option) (*kinesis.PutRecordOutput, error) {
return &m.Resp, nil
}

func TestPutRecord(t *testing.T) {
tests := []struct {
resp kinesis.PutRecordOutput
expected string
}{
{
resp: kinesis.PutRecordOutput{
EncryptionType: aws.String("NONE"),
SequenceNumber: aws.String("ABCDEF"),
ShardId: aws.String("XYZ"),
},
expected: "ABCDEF",
},
}

ctx := context.TODO()

for _, test := range tests {
a := API{
mockedPutRecord{Resp: test.resp},
}
resp, err := a.PutRecord(ctx, "", "", []byte(""))
if err != nil {
t.Fatalf("%v", err)
}

if *resp.SequenceNumber != test.expected {
t.Errorf("expected %+v, got %s", resp.SequenceNumber, test.expected)
}
}
}

type mockedPutRecords struct {
kinesisiface.KinesisAPI
Resp kinesis.PutRecordsOutput
Expand Down Expand Up @@ -185,12 +144,12 @@ func TestSize(t *testing.T) {
},
{
[]byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit."),
235,
58,
"8Ex8TUWD3dWUMh6dUKaT",
},
{
[]byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit."),
5678,
235,
"8Ex8TUWD3dWUMh6dUKaT",
},
}
Expand Down
16 changes: 7 additions & 9 deletions transform/send_aws_kinesis_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,17 @@ func (tf *sendAWSKinesisDataStream) send(ctx context.Context, key string) error
return err
}
case true:
if err := tf.sendAggregateRecord(ctx, tf.conf.StreamName, partitionKey, data); err != nil {
if _, err := tf.client.PutRecords(ctx, tf.conf.StreamName, partitionKey, tf.aggregateRecords(partitionKey, data)); err != nil {
return err
}
}

return nil
}

func (tf *sendAWSKinesisDataStream) sendAggregateRecord(ctx context.Context, stream, partitionKey string, data [][]byte) error {
func (tf *sendAWSKinesisDataStream) aggregateRecords(partitionKey string, data [][]byte) [][]byte {
var records [][]byte

agg := &kinesis.Aggregate{}
agg.New()

Expand All @@ -197,19 +199,15 @@ func (tf *sendAWSKinesisDataStream) sendAggregateRecord(ctx context.Context, str
continue
}

if _, err := tf.client.PutRecord(ctx, stream, partitionKey, agg.Get()); err != nil {
return err
}
records = append(records, agg.Get())

agg.New()
_ = agg.Add(b, partitionKey)
}

if agg.Count > 0 {
if _, err := tf.client.PutRecord(ctx, stream, partitionKey, agg.Get()); err != nil {
return err
}
records = append(records, agg.Get())
}

return nil
return records
}

0 comments on commit a0ef232

Please sign in to comment.