Skip to content

Commit

Permalink
feat: implement if_seq_no and if_primary_term parameters in bulk inde…
Browse files Browse the repository at this point in the history
…xer (#783) (#784)

Co-authored-by: ben <ben@wese.io>
  • Loading branch information
Anaethelion and benjyiw committed Jan 11, 2024
1 parent 3e540dd commit 3bf6c18
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
11 changes: 11 additions & 0 deletions esutil/bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type BulkIndexerItem struct {
VersionType string
Body io.ReadSeeker
RetryOnConflict *int
IfSeqNo *int64
IfPrimaryTerm *int64
meta bytes.Buffer // Item metadata header
payloadLength int // Item payload total length metadata+newline+body length

Expand Down Expand Up @@ -177,6 +179,15 @@ func (item *BulkIndexerItem) marshallMeta() {
aux = aux[:0]
}

if item.DocumentID != "" && item.IfSeqNo != nil && item.IfPrimaryTerm != nil {
item.meta.WriteRune(',')
item.meta.WriteString(`"if_seq_no":`)
item.meta.WriteString(strconv.FormatInt(*item.IfSeqNo, 10))
item.meta.WriteRune(',')
item.meta.WriteString(`"if_primary_term":`)
item.meta.WriteString(strconv.FormatInt(*item.IfPrimaryTerm, 10))
}

item.meta.WriteRune('}')
item.meta.WriteRune('}')
item.meta.WriteRune('\n')
Expand Down
12 changes: 12 additions & 0 deletions esutil/bulk_indexer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,8 @@ func TestBulkIndexer(t *testing.T) {
})
t.Run("Worker.writeMeta()", func(t *testing.T) {
v := int64(23)
ifSeqNo := int64(45)
ifPrimaryTerm := int64(67)
type args struct {
item BulkIndexerItem
}
Expand Down Expand Up @@ -706,6 +708,16 @@ func TestBulkIndexer(t *testing.T) {
}},
`{"update":{"_id":"1","retry_on_conflict":3}}` + "\n",
},
{
"with if_seq_no and if_primary_term",
args{BulkIndexerItem{
Action: "index",
DocumentID: "1",
IfSeqNo: &ifSeqNo,
IfPrimaryTerm: &ifPrimaryTerm,
}},
`{"index":{"_id":"1","if_seq_no":45,"if_primary_term":67}}` + "\n",
},
}
for _, tt := range tests {
tt := tt
Expand Down

0 comments on commit 3bf6c18

Please sign in to comment.