Skip to content

Commit

Permalink
Allow delete and index actions with a document ID (elastic#12606)
Browse files Browse the repository at this point in the history
* Add `op_type` meta key for delete and index operations with a document ID.

* Add note on why `obj` can be nil.

* Refactor and extract consts.

* Don't include metadata prefix in key.

* Stop appending `nil` else it ends up in the body.

* Error when trying to delete with no _id.

* Remove incorrect detail in comment.

* Add unit test for new op_type meta key.

* No longer required.

* Return error to caller.

* Fail event if op_type is no string.

* Use consts in error.

* Replace assert with require.

* Fail instead of panic.

* Change missed assert calls to require.

* Simplify GetMetaStringValue.

* Ignore err as key may not exist.
  • Loading branch information
bestpath-gb authored and ycombinator committed May 6, 2020
1 parent b1fdd94 commit 6168a83
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 11 deletions.
13 changes: 13 additions & 0 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ func (e *Event) SetID(id string) {
e.Meta["_id"] = id
}

func (e *Event) GetMetaStringValue(key string) (string, error) {
tmp, err := e.Meta.GetValue(key)
if err != nil {
return "", err
}

if s, ok := tmp.(string); ok {
return s, nil
}

return "", nil
}

func (e *Event) GetValue(key string) (interface{}, error) {
if key == "@timestamp" {
return e.Timestamp, nil
Expand Down
4 changes: 4 additions & 0 deletions libbeat/esleg/eslegclient/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type BulkCreateAction struct {
Create BulkMeta `json:"create" struct:"create"`
}

type BulkDeleteAction struct {
Delete BulkMeta `json:"delete" struct:"delete"`
}

type BulkMeta struct {
Index string `json:"_index" struct:"_index"`
DocType string `json:"_type,omitempty" struct:"_type,omitempty"`
Expand Down
37 changes: 26 additions & 11 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,16 @@ type bulkResultStats struct {

const (
defaultEventType = "doc"
opTypeCreate = "create"
opTypeDelete = "delete"
opTypeIndex = "index"
)

// opTypeKey defines the metadata key name for event operation type.
// The key's value can be an empty string, `create`, `index`, or `delete`. If empty, it will assume
// either `create` or `index`. See `createEventBulkMeta`. If in doubt, set explicitly.
const opTypeKey = "op_type"

// NewClient instantiates a new client.
func NewClient(
s ClientSettings,
Expand Down Expand Up @@ -281,7 +289,12 @@ func bulkEncodePublishRequest(
log.Errorf("Failed to encode event meta data: %+v", err)
continue
}
bulkItems = append(bulkItems, meta, event)
if opType, err := event.GetMetaStringValue(opTypeKey); err == nil && opType == opTypeDelete {
// We don't include the event source in a bulk DELETE
bulkItems = append(bulkItems, meta)
} else {
bulkItems = append(bulkItems, meta, event)
}
okEvents = append(okEvents, data[i])
}
return okEvents, bulkItems
Expand Down Expand Up @@ -311,16 +324,8 @@ func createEventBulkMeta(
return nil, err
}

var id string
if m := event.Meta; m != nil {
if tmp := m["_id"]; tmp != nil {
if s, ok := tmp.(string); ok {
id = s
} else {
log.Errorf("Event ID '%v' is no string value", id)
}
}
}
id, _ := event.GetMetaStringValue("_id")
opType, _ := event.GetMetaStringValue(opTypeKey)

meta := eslegclient.BulkMeta{
Index: index,
Expand All @@ -329,7 +334,17 @@ func createEventBulkMeta(
ID: id,
}

if opType == opTypeDelete {
if id != "" {
return eslegclient.BulkDeleteAction{Delete: meta}, nil
} else {
return nil, fmt.Errorf("%s %s requires _id", opTypeKey, opTypeDelete)
}
}
if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) {
if opType == opTypeIndex {
return eslegclient.BulkIndexAction{Index: meta}, nil
}
return eslegclient.BulkCreateAction{Create: meta}, nil
}
return eslegclient.BulkIndexAction{Index: meta}, nil
Expand Down
62 changes: 62 additions & 0 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,68 @@ func TestBulkEncodeEvents(t *testing.T) {
}
}

func TestBulkEncodeEventsWithOpType(t *testing.T) {
cases := []common.MapStr{
{"_id": "111", "op_type": "index", "message": "test 1", "bulkIndex": 0},
{"_id": "112", "op_type": "", "message": "test 2", "bulkIndex": 2},
{"_id": "", "op_type": "delete", "message": "test 6", "bulkIndex": -1}, // this won't get encoded due to missing _id
{"_id": "", "op_type": "", "message": "test 3", "bulkIndex": 4},
{"_id": "114", "op_type": "delete", "message": "test 4", "bulkIndex": 6},
{"_id": "115", "op_type": "index", "message": "test 5", "bulkIndex": 7},
}

cfg := common.MustNewConfigFrom(common.MapStr{})
info := beat.Info{
IndexPrefix: "test",
Version: version.GetDefaultVersion(),
}

im, err := idxmgmt.DefaultSupport(nil, info, common.NewConfig())
require.NoError(t, err)

index, pipeline, err := buildSelectors(im, info, cfg)
require.NoError(t, err)

events := make([]publisher.Event, len(cases))
for i, fields := range cases {
events[i] = publisher.Event{
Content: beat.Event{
Meta: common.MapStr{
"_id": fields["_id"],
"op_type": fields["op_type"],
},
Fields: common.MapStr{
"message": fields["message"],
},
}}
}

encoded, bulkItems := bulkEncodePublishRequest(logp.L(), *common.MustNewVersion(version.GetDefaultVersion()), index, pipeline, events)
require.Equal(t, len(events)-1, len(encoded), "all events should have been encoded")
require.Equal(t, 9, len(bulkItems), "incomplete bulk")

for i := 0; i < len(cases); i++ {
bulkEventIndex, _ := cases[i]["bulkIndex"].(int)
if bulkEventIndex == -1 {
continue
}
caseOpType, _ := cases[i]["op_type"].(string)
caseMessage, _ := cases[i]["message"].(string)
switch bulkItems[bulkEventIndex].(type) {
case eslegclient.BulkCreateAction:
validOpTypes := []string{opTypeCreate, ""}
require.Contains(t, validOpTypes, caseOpType, caseMessage)
case eslegclient.BulkIndexAction:
require.Equal(t, opTypeIndex, caseOpType, caseMessage)
case eslegclient.BulkDeleteAction:
require.Equal(t, opTypeDelete, caseOpType, caseMessage)
default:
require.FailNow(t, "unknown type")
}
}

}

func TestClientWithAPIKey(t *testing.T) {
var headers http.Header

Expand Down

0 comments on commit 6168a83

Please sign in to comment.