Skip to content

Commit

Permalink
Refactor and extract consts.
Browse files Browse the repository at this point in the history
  • Loading branch information
bestpath-gb committed Apr 16, 2020
1 parent 3134353 commit b385a8f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 22 deletions.
15 changes: 15 additions & 0 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

// FlagField fields used to keep information or errors when events are parsed.
Expand Down Expand Up @@ -54,6 +55,20 @@ func (e *Event) SetID(id string) {
e.Meta["_id"] = id
}

func (e *Event) GetMetaStringValue(key string) (string, error) {
var val string
if tmp, err := e.GetValue("@metadata." + key); err == nil {
if s, ok := tmp.(string); ok {
return s, nil
} else {
logp.Err("Event[%s] '%v' is no string value", key, tmp)
return val, nil
}
} else {
return val, err
}
}

func (e *Event) GetValue(key string) (interface{}, error) {
if key == "@timestamp" {
return e.Timestamp, nil
Expand Down
39 changes: 17 additions & 22 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,16 @@ type bulkResultStats struct {

const (
defaultEventType = "doc"
opTypeCreate = "create"
opTypeDelete = "delete"
opTypeIndex = "index"
)

// opTypeKey defines the metadata key name for event operation type.
// The key's value can be an empty string, `create`, `index`, or `delete`. If empty, the event will be `create`d if the ID
// is known, and `index`ed otherwise.
const opTypeKey = "@metadata.op_type"

// NewClient instantiates a new client.
func NewClient(
s ClientSettings,
Expand Down Expand Up @@ -251,20 +259,6 @@ func (client *Client) publishEvents(
return nil, nil
}

func eventMetaValue(event *beat.Event, key string) string {
var val string
if m := event.Meta; m != nil {
if tmp := m[key]; tmp != nil {
if s, ok := tmp.(string); ok {
val = s
} else {
logp.Err("Event[%s] '%v' is no string value", key, val)
}
}
}
return val
}

// bulkEncodePublishRequest encodes all bulk requests and returns slice of events
// successfully added to the list of bulk items and the list of bulk items.
func bulkEncodePublishRequest(
Expand All @@ -284,8 +278,7 @@ func bulkEncodePublishRequest(
log.Errorf("Failed to encode event meta data: %+v", err)
continue
}
opType := eventMetaValue(event, "op_type")
if opType == "delete" {
if opType, err := event.GetMetaStringValue(opTypeKey); err == nil && opType == opTypeDelete {
// We don't include the event source in a bulk DELETE
bulkItems = append(bulkItems, meta, nil)
} else {
Expand Down Expand Up @@ -320,8 +313,8 @@ func createEventBulkMeta(
return nil, err
}

id := eventMetaValue(event, "id")
opType := eventMetaValue(event, "op_type")
id, _ := event.GetMetaStringValue("_id")
opType, _ := event.GetMetaStringValue(opTypeKey)

meta := eslegclient.BulkMeta{
Index: index,
Expand All @@ -330,12 +323,14 @@ func createEventBulkMeta(
ID: id,
}

if id != "" && opType == opTypeDelete {
return eslegclient.BulkDeleteAction{Delete: meta}, nil
}
if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) {
if opType == "" || opType == "create" {
return eslegclient.BulkCreateAction{Create: meta}, nil
} else if opType == "delete" {
return eslegclient.BulkDeleteAction{Delete: meta}, nil
if opType == opTypeIndex {
return eslegclient.BulkIndexAction{Index: meta}, nil
}
return eslegclient.BulkCreateAction{Create: meta}, nil
}
return eslegclient.BulkIndexAction{Index: meta}, nil
}
Expand Down

0 comments on commit b385a8f

Please sign in to comment.