Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(aws): Reduce Aggregated Kinesis Record Size #147

Merged
merged 3 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 10 additions & 21 deletions internal/aws/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ import (
)

const (
kplMagicLen = 4 // Length of magic header for KPL Aggregate Record checking.
kplDigestSize = 16 // MD5 Message size for protobuf.
kplMaxBytes = 1024 * 1024
kplMaxCount = 10000
// kplMaxBytes ensures that an aggregated Kinesis record will not exceed 25 KB, which is
// the minimum record size charged by the Kinesis service ("PUT Payload Unit"). Any record
// smaller than 25 KB will be charged as 25 KB and any record larger than 25 KB will be
// charged in 25 KB increments. See the Kinesis pricing page for more details:
// https://aws.amazon.com/kinesis/data-streams/pricing/.
kplMaxBytes = 1000 * 25
// kplMaxCount is the maximum number of records that can be aggregated into a single Kinesis
// record. There is no limit imposed by the Kinesis service on the number of records that can
// be aggregated into a single Kinesis record, so this value is set to a reasonable upper bound.
kplMaxCount = 10000
)

// Aggregate produces a KPL-compliant Kinesis record
Expand Down Expand Up @@ -190,23 +196,6 @@ func (a *API) IsEnabled() bool {
return a.Client != nil
}

// PutRecord is a convenience wrapper for putting a record into a Kinesis stream.
func (a *API) PutRecord(ctx aws.Context, stream, partitionKey string, data []byte) (*kinesis.PutRecordOutput, error) {
ctx = context.WithoutCancel(ctx)
resp, err := a.Client.PutRecordWithContext(
ctx,
&kinesis.PutRecordInput{
Data: data,
StreamName: aws.String(stream),
PartitionKey: aws.String(partitionKey),
})
if err != nil {
return nil, fmt.Errorf("putrecord stream %s partitionkey %s: %v", stream, partitionKey, err)
}

return resp, nil
}

// PutRecords is a convenience wrapper for putting multiple records into a Kinesis stream.
func (a *API) PutRecords(ctx aws.Context, stream, partitionKey string, data [][]byte) (*kinesis.PutRecordsOutput, error) {
var records []*kinesis.PutRecordsRequestEntry
Expand Down
45 changes: 2 additions & 43 deletions internal/aws/kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,6 @@ import (
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
)

type mockedPutRecord struct {
kinesisiface.KinesisAPI
Resp kinesis.PutRecordOutput
}

func (m mockedPutRecord) PutRecordWithContext(ctx aws.Context, in *kinesis.PutRecordInput, opts ...request.Option) (*kinesis.PutRecordOutput, error) {
return &m.Resp, nil
}

func TestPutRecord(t *testing.T) {
tests := []struct {
resp kinesis.PutRecordOutput
expected string
}{
{
resp: kinesis.PutRecordOutput{
EncryptionType: aws.String("NONE"),
SequenceNumber: aws.String("ABCDEF"),
ShardId: aws.String("XYZ"),
},
expected: "ABCDEF",
},
}

ctx := context.TODO()

for _, test := range tests {
a := API{
mockedPutRecord{Resp: test.resp},
}
resp, err := a.PutRecord(ctx, "", "", []byte(""))
if err != nil {
t.Fatalf("%v", err)
}

if *resp.SequenceNumber != test.expected {
t.Errorf("expected %+v, got %s", resp.SequenceNumber, test.expected)
}
}
}

type mockedPutRecords struct {
kinesisiface.KinesisAPI
Resp kinesis.PutRecordsOutput
Expand Down Expand Up @@ -185,12 +144,12 @@ func TestSize(t *testing.T) {
},
{
[]byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit."),
235,
58,
"8Ex8TUWD3dWUMh6dUKaT",
},
{
[]byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit."),
5678,
235,
"8Ex8TUWD3dWUMh6dUKaT",
},
}
Expand Down
16 changes: 7 additions & 9 deletions transform/send_aws_kinesis_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,17 @@ func (tf *sendAWSKinesisDataStream) send(ctx context.Context, key string) error
return err
}
case true:
if err := tf.sendAggregateRecord(ctx, tf.conf.StreamName, partitionKey, data); err != nil {
if _, err := tf.client.PutRecords(ctx, tf.conf.StreamName, partitionKey, tf.aggregateRecords(partitionKey, data)); err != nil {
return err
}
}

return nil
}

func (tf *sendAWSKinesisDataStream) sendAggregateRecord(ctx context.Context, stream, partitionKey string, data [][]byte) error {
func (tf *sendAWSKinesisDataStream) aggregateRecords(partitionKey string, data [][]byte) [][]byte {
var records [][]byte

agg := &kinesis.Aggregate{}
agg.New()

Expand All @@ -197,19 +199,15 @@ func (tf *sendAWSKinesisDataStream) sendAggregateRecord(ctx context.Context, str
continue
}

if _, err := tf.client.PutRecord(ctx, stream, partitionKey, agg.Get()); err != nil {
return err
}
records = append(records, agg.Get())

agg.New()
_ = agg.Add(b, partitionKey)
}

if agg.Count > 0 {
if _, err := tf.client.PutRecord(ctx, stream, partitionKey, agg.Get()); err != nil {
return err
}
records = append(records, agg.Get())
}

return nil
return records
}
Loading