Skip to content

Commit

Permalink
Improve some debug messages (elastic#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
monicasarbu committed May 19, 2015
1 parent e4183f0 commit e887cb4
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 6 deletions.
3 changes: 2 additions & 1 deletion outputs/elasticsearch/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (es *Elasticsearch) Request(method string, url string,
return nil, err
}

logp.Debug("elasticsearch", "Request %s", req)

req.Header.Add("Accept", "application/json")
if es.Username != "" || es.Password != "" {
req.SetBasicAuth(es.Username, es.Password)
Expand Down Expand Up @@ -178,7 +180,6 @@ func (es *Elasticsearch) Index(index string, doc_type string, id string,
} else {
method = "PUT"
}
logp.Debug("output_elasticsearch", "method=%s path=%s", method, path)
resp, err := es.Request(method, path, params, body)
if err != nil {
return nil, err
Expand Down
3 changes: 1 addition & 2 deletions outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ func (es *Elasticsearch) Bulk(index string, doc_type string,
enc := json.NewEncoder(&buf)
for obj := range body {
enc.Encode(obj)
logp.Debug("elasticsearch", "obj %s", obj)
}

if buf.Len() == 0 {
logp.Debug("elasticsearch", "Empty channel. Wait for more data.")
return nil, nil
}

logp.Debug("elasticsearch", "Insert bulk messages: %s\n", buf)
logp.Debug("elasticsearch", "Insert bulk messages:\n%s\n", buf)

path, err := MakePath(index, doc_type, "_bulk")
if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ func (out *ElasticsearchOutput) Init(config outputs.MothershipConfig, topology_e
logp.Info("[ElasticsearchOutput] Using Elasticsearch %s", url)
logp.Info("[ElasticsearchOutput] Using index pattern [%s-]YYYY.MM.DD", out.Index)
logp.Info("[ElasticsearchOutput] Topology expires after %ds", out.TopologyExpire/1000)
logp.Info("[ElasticsearchOutput] Flush interval %s", out.FlushInterval)
logp.Info("[ElasticsearchOutput] Bulk size %d", out.BulkMaxSize)
if out.FlushInterval > 0 {
logp.Info("[ElasticsearchOutput] Insert events in batches. Flush interval is %s. Bulk size is %d.", out.FlushInterval, out.BulkMaxSize)
} else {
logp.Info("[ElasticsearchOutput] Insert events one by one. This might affect the performance of the shipper.")
}

return nil
}
Expand Down Expand Up @@ -126,7 +129,7 @@ func (out *ElasticsearchOutput) SendMessagesGoroutine() {
case msg := <-out.sendingQueue:
index := fmt.Sprintf("%s-%d.%02d.%02d", out.Index, msg.Ts.Year(), msg.Ts.Month(), msg.Ts.Day())
if out.FlushInterval > 0 {
logp.Debug("output_elasticsearch", "Insert %d bulk messages.", len(bulkChannel))
logp.Debug("output_elasticsearch", "Insert bulk messages in channel of size %d.", len(bulkChannel))
if len(bulkChannel)+2 > out.BulkMaxSize {
logp.Debug("output_elasticsearch", "Channel size reached. Calling bulk")
out.InsertBulkMessage(bulkChannel)
Expand Down

0 comments on commit e887cb4

Please sign in to comment.