Skip to content

Commit

Permalink
Add support for the bulk API on indexing (elastic#10)
Browse files Browse the repository at this point in the history
This implementation uses a channel to pass the bulk messages. The caller
needs to write the operations to the channel, close the channel and
call the Bulk method.
  • Loading branch information
monicasarbu committed May 15, 2015
1 parent 978034c commit ae75cd7
Show file tree
Hide file tree
Showing 8 changed files with 468 additions and 70 deletions.
52 changes: 35 additions & 17 deletions outputs/elasticsearch/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io/ioutil"
"net/http"
"net/url"

"github.com/elastic/libbeat/logp"
)

const (
Expand Down Expand Up @@ -86,14 +88,33 @@ func MakePath(index string, doc_type string, id string) (string, error) {
}
} else {
if len(id) > 0 {
path = fmt.Sprintf("/%s/%s", index, id)
if len(index) > 0 {
path = fmt.Sprintf("/%s/%s", index, id)
} else {
path = fmt.Sprintf("/%s", id)
}
} else {
path = fmt.Sprintf("/%s", index)
}
}
return path, nil
}

func ReadQueryResult(resp http.Response) (*QueryResult, error) {

defer resp.Body.Close()
obj, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var result QueryResult
err = json.Unmarshal(obj, &result)
if err != nil {
return nil, err
}
return &result, err
}

// Create a HTTP request to Elaticsearch
func (es *Elasticsearch) Request(method string, url string,
params map[string]string, body interface{}) (*http.Response, error) {
Expand Down Expand Up @@ -148,6 +169,7 @@ func (es *Elasticsearch) Index(index string, doc_type string, id string,
} else {
method = "PUT"
}
logp.Debug("output_elasticsearch", "method=%s path=%s", method, path)
resp, err := es.Request(method, path, params, body)
if err != nil {
return nil, err
Expand Down Expand Up @@ -177,17 +199,23 @@ func (es *Elasticsearch) Refresh(index string) (*QueryResult, error) {
return nil, err
}

defer resp.Body.Close()
obj, err := ioutil.ReadAll(resp.Body)
return ReadQueryResult(*resp)
}

// Instantiate an index
func (es *Elasticsearch) CreateIndex(index string) (*QueryResult, error) {

path, err := MakePath(index, "", "")
if err != nil {
return nil, err
}
var result QueryResult
err = json.Unmarshal(obj, &result)

resp, err := es.Request("PUT", path, nil, nil)
if err != nil {
return nil, err
}
return &result, err

return ReadQueryResult(*resp)
}

// Deletes a typed JSON document from a specific index based on its id.
Expand All @@ -204,17 +232,7 @@ func (es *Elasticsearch) Delete(index string, doc_type string, id string, params
return nil, err
}

defer resp.Body.Close()
obj, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var result QueryResult
err = json.Unmarshal(obj, &result)
if err != nil {
return nil, err
}
return &result, err
return ReadQueryResult(*resp)
}

// A search request can be executed purely using a URI by providing request parameters.
Expand Down
18 changes: 17 additions & 1 deletion outputs/elasticsearch/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ func TestMakePath(t *testing.T) {
if path != "/twitter/_refresh" {
t.Errorf("Wrong path created: %s", path)
}

path, err = MakePath("", "", "_bulk")
if err != nil {
t.Errorf("Fail to create path: %s", err)
}
if path != "/_bulk" {
t.Errorf("Wrong path created: %s", path)
}
path, err = MakePath("twitter", "", "")
if err != nil {
t.Errorf("Fail to create path: %s", err)
}
if path != "/twitter" {
t.Errorf("Wrong path created: %s", path)
}

}

func TestIndex(t *testing.T) {
Expand All @@ -56,7 +72,7 @@ func TestIndex(t *testing.T) {
}

if testing.Short() {
t.Skip("Skipping topology tests in short mode, because they require Elasticsearch")
t.Skip("Skipping in short mode, because it requires Elasticsearch")
}

es := NewElasticsearch("http://localhost:9200")
Expand Down
63 changes: 63 additions & 0 deletions outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package elasticsearch

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/elastic/libbeat/common"
)

type BulkMsg struct {
Ts time.Time
Event common.MapStr
}

func (es *Elasticsearch) Bulk(index string, doc_type string,
params map[string]string, body chan interface{}) (*QueryResult, error) {

path, err := MakePath(index, doc_type, "_bulk")
if err != nil {
return nil, err
}

var buf bytes.Buffer
enc := json.NewEncoder(&buf)
for obj := range body {
enc.Encode(obj)
}

url := es.Url + path
if len(params) > 0 {
url = url + "?" + UrlEncode(params)
}

req, err := http.NewRequest("POST", url, &buf)
if err != nil {
return nil, err
}

resp, err := es.client.Do(req)
if err != nil {
return nil, err
}

defer resp.Body.Close()
obj, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var result QueryResult
err = json.Unmarshal(obj, &result)
if err != nil {
return nil, err
}

if resp.StatusCode > 299 {
return &result, fmt.Errorf("ES returned an error: %s", resp.Status)
}
return &result, err
}
158 changes: 158 additions & 0 deletions outputs/elasticsearch/bulkapi_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package elasticsearch

import (
"fmt"
"os"
"testing"

"github.com/elastic/libbeat/logp"
)

func TestBulk(t *testing.T) {
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"elasticsearch"})
}
if testing.Short() {
t.Skip("Skipping in short mode, because it requires Elasticsearch")
}
es := NewElasticsearch("http://localhost:9200")
index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())

ops := []map[string]interface{}{
map[string]interface{}{
"index": map[string]interface{}{
"_index": index,
"_type": "type1",
"_id": "1",
},
},
map[string]interface{}{
"field1": "value1",
},
}

body := make(chan interface{}, 10)
for _, op := range ops {
body <- op
}
close(body)

params := map[string]string{
"refresh": "true",
}
_, err := es.Bulk(index, "type1", params, body)
if err != nil {
t.Errorf("Bulk() returned error: %s", err)
}

params = map[string]string{
"q": "field1:value1",
}
result, err := es.SearchUri(index, "type1", params)
if err != nil {
t.Errorf("SearchUri() returns an error: %s", err)
}
if result.Hits.Total != 1 {
t.Errorf("Wrong number of search results: %d", result.Hits.Total)
}

_, err = es.Delete(index, "", "", nil)
if err != nil {
t.Errorf("Delete() returns error: %s", err)
}
}

func TestBulkMoreOperations(t *testing.T) {
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"elasticsearch"})
}
if testing.Short() {
t.Skip("Skipping in short mode, because it requires Elasticsearch")
}
es := NewElasticsearch("http://localhost:9200")
index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())

ops := []map[string]interface{}{
map[string]interface{}{
"index": map[string]interface{}{
"_index": index,
"_type": "type1",
"_id": "1",
},
},
map[string]interface{}{
"field1": "value1",
},
map[string]interface{}{
"delete": map[string]interface{}{
"_index": index,
"_type": "type1",
"_id": "2",
},
},
map[string]interface{}{
"create": map[string]interface{}{
"_index": index,
"_type": "type1",
"_id": "3",
},
},
map[string]interface{}{
"field1": "value3",
},
map[string]interface{}{
"update": map[string]interface{}{
"_id": "1",
"_index": index,
"_type": "type1",
},
},
map[string]interface{}{
"doc": map[string]interface{}{
"field2": "value2",
},
},
}

body := make(chan interface{}, 10)
for _, op := range ops {
body <- op
}
close(body)

params := map[string]string{
"refresh": "true",
}
resp, err := es.Bulk(index, "type1", params, body)
if err != nil {
t.Errorf("Bulk() returned error: %s [%s]", err, resp)
return
}

params = map[string]string{
"q": "field1:value3",
}
result, err := es.SearchUri(index, "type1", params)
if err != nil {
t.Errorf("SearchUri() returns an error: %s", err)
}
if result.Hits.Total != 1 {
t.Errorf("Wrong number of search results: %d", result.Hits.Total)
}

params = map[string]string{
"q": "field2:value2",
}
result, err = es.SearchUri(index, "type1", params)
if err != nil {
t.Errorf("SearchUri() returns an error: %s", err)
}
if result.Hits.Total != 1 {
t.Errorf("Wrong number of search results: %d", result.Hits.Total)
}

_, err = es.Delete(index, "", "", nil)
if err != nil {
t.Errorf("Delete() returns error: %s", err)
}
}
Loading

0 comments on commit ae75cd7

Please sign in to comment.