Skip to content

Commit

Permalink
Avoid adding to the channel up to the maximum(elastic#10)
Browse files Browse the repository at this point in the history
The problem appears when the bulkChannel reaches the maximum size, the go-routine blocks
and no more data are consumed from the channel.
To fix the problem, make sure the bulkChannel doesn't reach the maximum size by consuming the data
before the channel is full.
  • Loading branch information
monicasarbu committed May 19, 2015
1 parent 3b61800 commit 86053cc
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 25 deletions.
25 changes: 17 additions & 8 deletions outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ func (out *ElasticsearchOutput) GetNameByIP(ip string) string {
return name
}

func (out *ElasticsearchOutput) InsertBulkMessage(bulkChannel chan interface{}) {
close(bulkChannel)
go func(channel chan interface{}) {
_, err := out.Conn.Bulk("", "", nil, channel)
if err != nil {
logp.Err("Fail to perform many index operations in a single API call: %s", err)
}
}(bulkChannel)
}

func (out *ElasticsearchOutput) SendMessagesGoroutine() {
flushChannel := make(<-chan time.Time)

Expand All @@ -115,7 +125,12 @@ func (out *ElasticsearchOutput) SendMessagesGoroutine() {
case msg := <-out.sendingQueue:
index := fmt.Sprintf("%s-%d.%02d.%02d", out.Index, msg.Ts.Year(), msg.Ts.Month(), msg.Ts.Day())
if out.FlushInterval > 0 {
logp.Debug("output_elasticsearch", "Insert bulk messages.")
logp.Debug("output_elasticsearch", "Insert %d bulk messages.", len(bulkChannel))
if len(bulkChannel)+2 > out.BulkMaxSize {
logp.Debug("output_elasticsearch", "Channel size reached. Calling bulk")
out.InsertBulkMessage(bulkChannel)
bulkChannel = make(chan interface{}, out.BulkMaxSize)
}
bulkChannel <- map[string]interface{}{
"index": map[string]interface{}{
"_index": index,
Expand All @@ -131,13 +146,7 @@ func (out *ElasticsearchOutput) SendMessagesGoroutine() {
}
}
case _ = <-flushChannel:
close(bulkChannel)
go func(channel chan interface{}) {
_, err := out.Conn.Bulk("", "", nil, channel)
if err != nil {
logp.Err("Fail to perform many index operations in a single API call: %s", err)
}
}(bulkChannel)
out.InsertBulkMessage(bulkChannel)
bulkChannel = make(chan interface{}, out.BulkMaxSize)
}
}
Expand Down
44 changes: 27 additions & 17 deletions outputs/elasticsearch/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
const elasticsearchAddr = "localhost"
const elasticsearchPort = 9200

func createElasticsearchConnection(flush_interval int) ElasticsearchOutput {
func createElasticsearchConnection(flush_interval int, bulk_size int) ElasticsearchOutput {

index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())

Expand All @@ -31,6 +31,7 @@ func createElasticsearchConnection(flush_interval int) ElasticsearchOutput {
Index: index,
Protocol: "",
Flush_interval: &flush_interval,
BulkMaxSize: &bulk_size,
}, 10)

return elasticsearchOutput
Expand All @@ -44,9 +45,9 @@ func TestTopologyInES(t *testing.T) {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch"})
}

elasticsearchOutput1 := createElasticsearchConnection(0)
elasticsearchOutput2 := createElasticsearchConnection(0)
elasticsearchOutput3 := createElasticsearchConnection(0)
elasticsearchOutput1 := createElasticsearchConnection(0, 0)
elasticsearchOutput2 := createElasticsearchConnection(0, 0)
elasticsearchOutput3 := createElasticsearchConnection(0, 0)

elasticsearchOutput1.PublishIPs("proxy1", []string{"10.1.0.4"})
elasticsearchOutput2.PublishIPs("proxy2", []string{"10.1.0.9",
Expand Down Expand Up @@ -88,7 +89,7 @@ func TestOneEvent(t *testing.T) {

ts := time.Now()

elasticsearchOutput := createElasticsearchConnection(0)
elasticsearchOutput := createElasticsearchConnection(0, 0)

event := common.MapStr{}
event["type"] = "redis"
Expand Down Expand Up @@ -153,7 +154,7 @@ func TestEvents(t *testing.T) {

ts := time.Now()

elasticsearchOutput := createElasticsearchConnection(0)
elasticsearchOutput := createElasticsearchConnection(0, 0)

event := common.MapStr{}
event["type"] = "redis"
Expand Down Expand Up @@ -213,18 +214,8 @@ func TestEvents(t *testing.T) {
}
}

func TestBulkEvents(t *testing.T) {
if testing.Short() {
t.Skip("Skipping events publish in short mode, because they require Elasticsearch")
}
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch"})
}

func test_bulk_with_params(t *testing.T, elasticsearchOutput ElasticsearchOutput) {
ts := time.Now()

elasticsearchOutput := createElasticsearchConnection(50)

index := fmt.Sprintf("%s-%d.%02d.%02d", elasticsearchOutput.Index, ts.Year(), ts.Month(), ts.Day())

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -271,8 +262,27 @@ func TestBulkEvents(t *testing.T) {

if err != nil {
t.Errorf("Failed to query elasticsearch: %s", err)
return
}
if resp.Hits.Total != 10 {
t.Errorf("Wrong number of results: %d", resp.Hits.Total)
}
}

func TestBulkEvents(t *testing.T) {
if testing.Short() {
t.Skip("Skipping events publish in short mode, because they require Elasticsearch")
}
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch", "elasticsearch"})
}

elasticsearchOutput := createElasticsearchConnection(50, 2)
test_bulk_with_params(t, elasticsearchOutput)

elasticsearchOutput = createElasticsearchConnection(50, 1000)
test_bulk_with_params(t, elasticsearchOutput)

elasticsearchOutput = createElasticsearchConnection(50, 5)
test_bulk_with_params(t, elasticsearchOutput)
}

0 comments on commit 86053cc

Please sign in to comment.