diff --git a/outputs/elasticsearch/api.go b/outputs/elasticsearch/api.go index 02e8b08494d..28804aed8f5 100644 --- a/outputs/elasticsearch/api.go +++ b/outputs/elasticsearch/api.go @@ -143,6 +143,8 @@ func (es *Elasticsearch) Request(method string, url string, return nil, err } + logp.Debug("elasticsearch", "Request %s", req) + req.Header.Add("Accept", "application/json") if es.Username != "" || es.Password != "" { req.SetBasicAuth(es.Username, es.Password) @@ -178,7 +180,6 @@ func (es *Elasticsearch) Index(index string, doc_type string, id string, } else { method = "PUT" } - logp.Debug("output_elasticsearch", "method=%s path=%s", method, path) resp, err := es.Request(method, path, params, body) if err != nil { return nil, err diff --git a/outputs/elasticsearch/bulkapi.go b/outputs/elasticsearch/bulkapi.go index 79105703ac9..e86838d438f 100644 --- a/outputs/elasticsearch/bulkapi.go +++ b/outputs/elasticsearch/bulkapi.go @@ -24,7 +24,6 @@ func (es *Elasticsearch) Bulk(index string, doc_type string, enc := json.NewEncoder(&buf) for obj := range body { enc.Encode(obj) - logp.Debug("elasticsearch", "obj %s", obj) } if buf.Len() == 0 { @@ -32,7 +31,7 @@ func (es *Elasticsearch) Bulk(index string, doc_type string, return nil, nil } - logp.Debug("elasticsearch", "Insert bulk messages: %s\n", buf) + logp.Debug("elasticsearch", "Insert bulk messages:\n%s\n", buf) path, err := MakePath(index, doc_type, "_bulk") if err != nil { diff --git a/outputs/elasticsearch/output.go b/outputs/elasticsearch/output.go index e39fe5a4e73..e11d252a98e 100644 --- a/outputs/elasticsearch/output.go +++ b/outputs/elasticsearch/output.go @@ -71,8 +71,11 @@ func (out *ElasticsearchOutput) Init(config outputs.MothershipConfig, topology_e logp.Info("[ElasticsearchOutput] Using Elasticsearch %s", url) logp.Info("[ElasticsearchOutput] Using index pattern [%s-]YYYY.MM.DD", out.Index) logp.Info("[ElasticsearchOutput] Topology expires after %ds", out.TopologyExpire/1000) - logp.Info("[ElasticsearchOutput] Flush interval %s", out.FlushInterval) - logp.Info("[ElasticsearchOutput] Bulk size %d", out.BulkMaxSize) + if out.FlushInterval > 0 { + logp.Info("[ElasticsearchOutput] Insert events in batches. Flush interval is %s. Bulk size is %d.", out.FlushInterval, out.BulkMaxSize) + } else { + logp.Info("[ElasticsearchOutput] Insert events one by one. This might affect the performance of the shipper.") + } return nil } @@ -126,7 +129,7 @@ func (out *ElasticsearchOutput) SendMessagesGoroutine() { case msg := <-out.sendingQueue: index := fmt.Sprintf("%s-%d.%02d.%02d", out.Index, msg.Ts.Year(), msg.Ts.Month(), msg.Ts.Day()) if out.FlushInterval > 0 { - logp.Debug("output_elasticsearch", "Insert %d bulk messages.", len(bulkChannel)) + logp.Debug("output_elasticsearch", "Insert bulk messages in channel of size %d.", len(bulkChannel)) if len(bulkChannel)+2 > out.BulkMaxSize { logp.Debug("output_elasticsearch", "Channel size reached. Calling bulk") out.InsertBulkMessage(bulkChannel)