From 85891ab6ee22c58152e92c664aaa0df94550e3ec Mon Sep 17 00:00:00 2001 From: negator Date: Fri, 16 Jun 2017 12:25:05 -0700 Subject: [PATCH 1/2] fix for batch write --- domino.go | 76 ++++++++++++++++++++++++++------------------------ domino_test.go | 25 ++++++++++------- 2 files changed, 54 insertions(+), 47 deletions(-) diff --git a/domino.go b/domino.go index a2cdb30..01de5f6 100644 --- a/domino.go +++ b/domino.go @@ -555,13 +555,13 @@ func (table DynamoTable) PutItem(i interface{}) *putInput { return &q } -func (d *putInput) ReturnAllOld() *putInput { +func (d *putInput) ReturnAllOld() *putInput { (*dynamodb.PutItemInput)(d).SetReturnValues("ALL_OLD") return d } func (d *putInput) ReturnNone() *putInput { (*dynamodb.PutItemInput)(d).SetReturnValues("NONE") - return d + return d } func (d *putInput) SetConditionExpression(c Expression) *putInput { s, m, _ := c.construct(1, true) @@ -606,7 +606,7 @@ func (o *putOutput) Result(item interface{}) (err error) { /************************************** BatchWriteItem *********************************/ /***************************************************************************************/ type batchWriteInput struct { - batches []dynamodb.BatchWriteItemInput + batches []*dynamodb.BatchWriteItemInput table DynamoTable delayedFunctions []func() error } @@ -619,52 +619,54 @@ type batchPutOutput struct { /*BatchWriteItem represents dynamo batch write item call*/ func (table DynamoTable) BatchWriteItem() *batchWriteInput { r := batchWriteInput{ - batches: []dynamodb.BatchWriteItemInput{}, + batches: []*dynamodb.BatchWriteItemInput{}, table: table, } return &r } func (d *batchWriteInput) writeItems(putOnly bool, items ...interface{}) *batchWriteInput { + if len(items) <= 0 { + return d + } delayed := func() error { - batches := []dynamodb.BatchWriteItemInput{} - batchCount := len(items)/25 + 1 - for i := 1; i <= batchCount; i++ { - batch := dynamodb.BatchWriteItemInput{ - RequestItems: make(map[string][]*dynamodb.WriteRequest), + // batchCount := math.Ceil(float64(len(items)) / 25.0) + var batch *dynamodb.BatchWriteItemInput + + for _, item := range items { + if batch == nil { + batch = &dynamodb.BatchWriteItemInput{ + RequestItems: make(map[string][]*dynamodb.WriteRequest), + } + d.batches = append(d.batches, batch) } - puts := []*dynamodb.WriteRequest{} - for len(items) > 0 && len(puts) < 25 { - item := items[0] - items = items[1:] - dynamoItem, err := dynamodbattribute.MarshalMap(item) - if err != nil { - return err + dynamoItem, err := dynamodbattribute.MarshalMap(item) + + if err != nil { + return err + } + var write *dynamodb.WriteRequest + if putOnly { + write = &dynamodb.WriteRequest{ + PutRequest: &dynamodb.PutRequest{ + Item: dynamoItem, + }, } - var write *dynamodb.WriteRequest - if putOnly { - write = &dynamodb.WriteRequest{ - PutRequest: &dynamodb.PutRequest{ - Item: dynamoItem, - }, - } - } else { - write = &dynamodb.WriteRequest{ - DeleteRequest: &dynamodb.DeleteRequest{ - Key: dynamoItem, - }, - } + } else { + write = &dynamodb.WriteRequest{ + DeleteRequest: &dynamodb.DeleteRequest{ + Key: dynamoItem, + }, } - - puts = append(puts, write) } + batch.RequestItems[d.table.Name] = append(batch.RequestItems[d.table.Name], write) - batch.RequestItems[d.table.Name] = puts - batches = append(batches, batch) - + if len(batch.RequestItems[d.table.Name]) >= 25 { + batch = nil + } } - d.batches = append(d.batches, batches...) + return nil } d.delayedFunctions = append(d.delayedFunctions, delayed) @@ -687,7 +689,7 @@ func (d *batchWriteInput) DeleteItems(keys ...KeyValue) *batchWriteInput { return d } -func (d *batchWriteInput) Build() (input []dynamodb.BatchWriteItemInput, err error) { +func (d *batchWriteInput) Build() (input []*dynamodb.BatchWriteItemInput, err error) { for _, function := range d.delayedFunctions { if err = function(); err != nil { return @@ -714,7 +716,7 @@ func (d *batchWriteInput) ExecuteWith(ctx context.Context, dynamo DynamoDBIFace, return } for _, batch := range batches { - result, err := dynamo.BatchWriteItemWithContext(ctx, &batch, opts...) + result, err := dynamo.BatchWriteItemWithContext(ctx, batch, opts...) if err != nil { out.Error = handleAwsErr(err) return diff --git a/domino_test.go b/domino_test.go index 7d3e5f4..38758bb 100644 --- a/domino_test.go +++ b/domino_test.go @@ -4,6 +4,7 @@ import ( // "fmt" "context" + "fmt" "net/http" "strconv" "sync" @@ -159,13 +160,15 @@ func TestBatchPutItem(t *testing.T) { assert.NoError(t, err) + items := []interface{}{} + for i := 0; i < 100; i++ { + row := User{Email: fmt.Sprintf("%dbob@email.com", i), Password: "password"} + items = append(items, row) + } + q := table. BatchWriteItem(). - PutItems( - User{Email: "bob@email.com", Password: "password"}, - User{Email: "joe@email.com", Password: "password"}, - User{Email: "alice@email.com", Password: "password"}, - ). + PutItems(items...). DeleteItems( KeyValue{"name@email.com", "password"}, ) @@ -180,12 +183,14 @@ func TestBatchPutItem(t *testing.T) { assert.Empty(t, unprocessed) assert.NoError(t, err) + keys := []KeyValue{} + for i := 0; i < 100; i++ { + key := KeyValue{fmt.Sprintf("%dbob@email.com", i), "password"} + keys = append(keys, key) + } + g := table. - BatchGetItem( - KeyValue{"bob@email.com", "password"}, - KeyValue{"joe@email.com", "password"}, - KeyValue{"alice@email.com", "password"}, - ) + BatchGetItem(keys...) users := []*User{} nextItem := func() interface{} { From 802ed2bf160345ff2fcbebea1367e81de57ec293 Mon Sep 17 00:00:00 2001 From: negator Date: Fri, 16 Jun 2017 12:26:19 -0700 Subject: [PATCH 2/2] fix for batch write --- domino.go | 1 - 1 file changed, 1 deletion(-) diff --git a/domino.go b/domino.go index 01de5f6..5f4236d 100644 --- a/domino.go +++ b/domino.go @@ -630,7 +630,6 @@ func (d *batchWriteInput) writeItems(putOnly bool, items ...interface{}) *batchW return d } delayed := func() error { - // batchCount := math.Ceil(float64(len(items)) / 25.0) var batch *dynamodb.BatchWriteItemInput for _, item := range items {