Skip to content

Commit

Permalink
Merge pull request #19 from vsco/ng-put-item
Browse files Browse the repository at this point in the history
fix for batch write
  • Loading branch information
negator authored Jun 16, 2017
2 parents a2f9c1f + 802ed2b commit 92fb5b8
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 47 deletions.
75 changes: 38 additions & 37 deletions domino.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,13 +555,13 @@ func (table DynamoTable) PutItem(i interface{}) *putInput {
return &q
}

func (d *putInput) ReturnAllOld() *putInput {
func (d *putInput) ReturnAllOld() *putInput {
(*dynamodb.PutItemInput)(d).SetReturnValues("ALL_OLD")
return d
}
func (d *putInput) ReturnNone() *putInput {
(*dynamodb.PutItemInput)(d).SetReturnValues("NONE")
return d
return d
}
func (d *putInput) SetConditionExpression(c Expression) *putInput {
s, m, _ := c.construct(1, true)
Expand Down Expand Up @@ -606,7 +606,7 @@ func (o *putOutput) Result(item interface{}) (err error) {
/************************************** BatchWriteItem *********************************/
/***************************************************************************************/
type batchWriteInput struct {
batches []dynamodb.BatchWriteItemInput
batches []*dynamodb.BatchWriteItemInput
table DynamoTable
delayedFunctions []func() error
}
Expand All @@ -619,52 +619,53 @@ type batchPutOutput struct {
/*BatchWriteItem represents dynamo batch write item call*/
func (table DynamoTable) BatchWriteItem() *batchWriteInput {
r := batchWriteInput{
batches: []dynamodb.BatchWriteItemInput{},
batches: []*dynamodb.BatchWriteItemInput{},
table: table,
}
return &r
}

func (d *batchWriteInput) writeItems(putOnly bool, items ...interface{}) *batchWriteInput {
if len(items) <= 0 {
return d
}
delayed := func() error {
batches := []dynamodb.BatchWriteItemInput{}
batchCount := len(items)/25 + 1
for i := 1; i <= batchCount; i++ {
batch := dynamodb.BatchWriteItemInput{
RequestItems: make(map[string][]*dynamodb.WriteRequest),
var batch *dynamodb.BatchWriteItemInput

for _, item := range items {
if batch == nil {
batch = &dynamodb.BatchWriteItemInput{
RequestItems: make(map[string][]*dynamodb.WriteRequest),
}
d.batches = append(d.batches, batch)
}
puts := []*dynamodb.WriteRequest{}

for len(items) > 0 && len(puts) < 25 {
item := items[0]
items = items[1:]
dynamoItem, err := dynamodbattribute.MarshalMap(item)
if err != nil {
return err
dynamoItem, err := dynamodbattribute.MarshalMap(item)

if err != nil {
return err
}
var write *dynamodb.WriteRequest
if putOnly {
write = &dynamodb.WriteRequest{
PutRequest: &dynamodb.PutRequest{
Item: dynamoItem,
},
}
var write *dynamodb.WriteRequest
if putOnly {
write = &dynamodb.WriteRequest{
PutRequest: &dynamodb.PutRequest{
Item: dynamoItem,
},
}
} else {
write = &dynamodb.WriteRequest{
DeleteRequest: &dynamodb.DeleteRequest{
Key: dynamoItem,
},
}
} else {
write = &dynamodb.WriteRequest{
DeleteRequest: &dynamodb.DeleteRequest{
Key: dynamoItem,
},
}

puts = append(puts, write)
}
batch.RequestItems[d.table.Name] = append(batch.RequestItems[d.table.Name], write)

batch.RequestItems[d.table.Name] = puts
batches = append(batches, batch)

if len(batch.RequestItems[d.table.Name]) >= 25 {
batch = nil
}
}
d.batches = append(d.batches, batches...)

return nil
}
d.delayedFunctions = append(d.delayedFunctions, delayed)
Expand All @@ -687,7 +688,7 @@ func (d *batchWriteInput) DeleteItems(keys ...KeyValue) *batchWriteInput {
return d
}

func (d *batchWriteInput) Build() (input []dynamodb.BatchWriteItemInput, err error) {
func (d *batchWriteInput) Build() (input []*dynamodb.BatchWriteItemInput, err error) {
for _, function := range d.delayedFunctions {
if err = function(); err != nil {
return
Expand All @@ -714,7 +715,7 @@ func (d *batchWriteInput) ExecuteWith(ctx context.Context, dynamo DynamoDBIFace,
return
}
for _, batch := range batches {
result, err := dynamo.BatchWriteItemWithContext(ctx, &batch, opts...)
result, err := dynamo.BatchWriteItemWithContext(ctx, batch, opts...)
if err != nil {
out.Error = handleAwsErr(err)
return
Expand Down
25 changes: 15 additions & 10 deletions domino_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
// "fmt"

"context"
"fmt"
"net/http"
"strconv"
"sync"
Expand Down Expand Up @@ -159,13 +160,15 @@ func TestBatchPutItem(t *testing.T) {

assert.NoError(t, err)

items := []interface{}{}
for i := 0; i < 100; i++ {
row := User{Email: fmt.Sprintf("%dbob@email.com", i), Password: "password"}
items = append(items, row)
}

q := table.
BatchWriteItem().
PutItems(
User{Email: "bob@email.com", Password: "password"},
User{Email: "joe@email.com", Password: "password"},
User{Email: "alice@email.com", Password: "password"},
).
PutItems(items...).
DeleteItems(
KeyValue{"name@email.com", "password"},
)
Expand All @@ -180,12 +183,14 @@ func TestBatchPutItem(t *testing.T) {
assert.Empty(t, unprocessed)
assert.NoError(t, err)

keys := []KeyValue{}
for i := 0; i < 100; i++ {
key := KeyValue{fmt.Sprintf("%dbob@email.com", i), "password"}
keys = append(keys, key)
}

g := table.
BatchGetItem(
KeyValue{"bob@email.com", "password"},
KeyValue{"joe@email.com", "password"},
KeyValue{"alice@email.com", "password"},
)
BatchGetItem(keys...)

users := []*User{}
nextItem := func() interface{} {
Expand Down

0 comments on commit 92fb5b8

Please sign in to comment.