Skip to content

Commit

Permalink
Exit Bulk command in case channel is empty
Browse files Browse the repository at this point in the history
In case the channel is empty, don't issue any bulk command.
  • Loading branch information
monicasarbu committed May 18, 2015
1 parent ae75cd7 commit 64e6afc
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 6 deletions.
19 changes: 14 additions & 5 deletions outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/elastic/libbeat/common"
"github.com/elastic/libbeat/logp"
)

type BulkMsg struct {
Expand All @@ -19,15 +20,23 @@ type BulkMsg struct {
func (es *Elasticsearch) Bulk(index string, doc_type string,
params map[string]string, body chan interface{}) (*QueryResult, error) {

path, err := MakePath(index, doc_type, "_bulk")
if err != nil {
return nil, err
}

var buf bytes.Buffer
enc := json.NewEncoder(&buf)
for obj := range body {
enc.Encode(obj)
logp.Debug("elasticsearch", "obj %s", obj)
}

if buf.Len() == 0 {
logp.Debug("elasticsearch", "Empty channel. Wait for more data.")
return nil, nil
}

logp.Debug("elasticsearch", "Insert bulk messages: %s\n", buf)

path, err := MakePath(index, doc_type, "_bulk")
if err != nil {
return nil, err
}

url := es.Url + path
Expand Down
26 changes: 26 additions & 0 deletions outputs/elasticsearch/bulkapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,32 @@ func TestBulk(t *testing.T) {
}
}

func TestEmptyBulk(t *testing.T) {
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"elasticsearch"})
}
if testing.Short() {
t.Skip("Skipping in short mode, because it requires Elasticsearch")
}
es := NewElasticsearch("http://localhost:9200")
index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())

body := make(chan interface{}, 10)
close(body)

params := map[string]string{
"refresh": "true",
}
resp, err := es.Bulk(index, "type1", params, body)
if err != nil {
t.Errorf("Bulk() returned error: %s", err)
}
if resp != nil {
t.Errorf("Unexpected response: %s", resp)
}

}

func TestBulkMoreOperations(t *testing.T) {
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"elasticsearch"})
Expand Down
2 changes: 1 addition & 1 deletion outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (out *ElasticsearchOutput) SendMessagesGoroutine() {
case msg := <-out.sendingQueue:
index := fmt.Sprintf("%s-%d.%02d.%02d", out.Index, msg.Ts.Year(), msg.Ts.Month(), msg.Ts.Day())
if out.FlushInterval > 0 {
logp.Debug("output_elasticsearch", "Insert bulk messages")
logp.Debug("output_elasticsearch", "Insert bulk messages.")
bulkChannel <- map[string]interface{}{
"index": map[string]interface{}{
"_index": index,
Expand Down

0 comments on commit 64e6afc

Please sign in to comment.