Skip to content

Commit

Permalink
Merge pull request #879 from urso/feature/805-es-http-params
Browse files Browse the repository at this point in the history
Add URL parameters to ES output
  • Loading branch information
ruflin committed Jan 29, 2016
2 parents 3e82f43 + 0e1f535 commit 3dd0bb1
Show file tree
Hide file tree
Showing 14 changed files with 55 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]
- Add ability to override configuration settings using environment variables {issue}114[114]
- Libbeat now always exits through a single exit method for proper cleanup and control {pull}736[736]
- Add ability to create Elasticsearch mapping on startup {pull}639[639]
- Add option to elasticsearch output to pass http parameters in index operations {issue}805[805]

*Packetbeat*
- Change the DNS library used throughout the dns package to github.com/miekg/dns. {pull}803[803]
Expand Down
5 changes: 5 additions & 0 deletions filebeat/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ output:
#username: "admin"
#password: "s3cr3t"

# Dictionary of HTTP parameters to pass within the url with index operations.
#parameters:
#param1: value1
#param2: value2

# Number of workers per Elasticsearch host.
#worker: 1

Expand Down
4 changes: 4 additions & 0 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ The basic authentication username for connecting to Elasticsearch.

The basic authentication password for connecting to Elasticsearch.

===== parameters

Dictionary of HTTP parameters to pass within the url with index operations.

[[protocol-option]]
===== protocol

Expand Down
5 changes: 5 additions & 0 deletions libbeat/etc/libbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ output:
#username: "admin"
#password: "s3cr3t"

# Dictionary of HTTP parameters to pass within the url with index operations.
#parameters:
#param1: value1
#param2: value2

# Number of workers per Elasticsearch host.
#worker: 1

Expand Down
6 changes: 3 additions & 3 deletions libbeat/outputs/elasticsearch/api_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestOneHostSuccessResp(t *testing.T) {

server := ElasticsearchMock(200, expectedResp)

client := NewClient(server.URL, "", nil, nil, "", "")
client := NewClient(server.URL, "", nil, nil, "", "", nil)

params := map[string]string{
"refresh": "true",
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestOneHost500Resp(t *testing.T) {

server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "")
client := NewClient(server.URL, "", nil, nil, "", "", nil)
err := client.Connect(1 * time.Second)
if err != nil {
t.Fatalf("Failed to connect: %v", err)
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestOneHost503Resp(t *testing.T) {

server := ElasticsearchMock(503, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "")
client := NewClient(server.URL, "", nil, nil, "", "", nil)

params := map[string]string{
"refresh": "true",
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func GetTestingElasticsearch() *Client {
var address = "http://" + GetEsHost() + ":" + GetEsPort()
username := os.Getenv("ES_USER")
pass := os.Getenv("ES_PASS")
return NewClient(address, "", nil, nil, username, pass)
return NewClient(address, "", nil, nil, username, pass, nil)
}

func GetValidQueryResult() QueryResult {
Expand Down
6 changes: 3 additions & 3 deletions libbeat/outputs/elasticsearch/bulkapi_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {

server := ElasticsearchMock(200, expectedResp)

client := NewClient(server.URL, "", nil, nil, "", "")
client := NewClient(server.URL, "", nil, nil, "", "", nil)

params := map[string]string{
"refresh": "true",
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestOneHost500Resp_Bulk(t *testing.T) {

server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "")
client := NewClient(server.URL, "", nil, nil, "", "", nil)

params := map[string]string{
"refresh": "true",
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestOneHost503Resp_Bulk(t *testing.T) {

server := ElasticsearchMock(503, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "")
client := NewClient(server.URL, "", nil, nil, "", "", nil)

params := map[string]string{
"refresh": "true",
Expand Down
12 changes: 8 additions & 4 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ var (

type Client struct {
Connection
index string
index string
params map[string]string

json jsonReader
}
Expand Down Expand Up @@ -57,6 +58,7 @@ var (
func NewClient(
esURL, index string, proxyURL *url.URL, tls *tls.Config,
username, password string,
params map[string]string,
) *Client {
proxy := http.ProxyFromEnvironment
if proxyURL != nil {
Expand All @@ -75,7 +77,8 @@ func NewClient(
},
},
},
index: index,
index: index,
params: params,
}
return client
}
Expand Down Expand Up @@ -109,7 +112,7 @@ func (client *Client) PublishEvents(
}

// new request to store all events into
request, err := client.startBulkRequest("", "", nil)
request, err := client.startBulkRequest("", "", client.params)
if err != nil {
logp.Err("Failed to perform any bulk index operations: %s", err)
return events, err
Expand Down Expand Up @@ -328,7 +331,8 @@ func (client *Client) PublishEvent(event common.MapStr) error {
logp.Debug("output_elasticsearch", "Publish event: %s", event)

// insert the events one by one
status, _, err := client.Index(index, event["type"].(string), "", nil, event)
status, _, err := client.Index(
index, event["type"].(string), "", client.params, event)
if err != nil {
logp.Warn("Fail to insert a single event: %s", err)
if err == ErrJSONEncodeFailed {
Expand Down
9 changes: 8 additions & 1 deletion libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,14 @@ func makeClientFactory(
logp.Info("Using proxy URL: %s", proxyURL)
}

client := NewClient(esURL, config.Index, proxyURL, tls, config.Username, config.Password)
params := config.Params
if len(params) == 0 {
params = nil
}
client := NewClient(
esURL, config.Index, proxyURL, tls,
config.Username, config.Password,
params)
return client, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func esConnect(t *testing.T, index string) *esConnection {

username := os.Getenv("ES_USER")
password := os.Getenv("ES_PASS")
client := elasticsearch.NewClient(host, "", nil, nil, username, password)
client := elasticsearch.NewClient(host, "", nil, nil, username, password, nil)

// try to drop old index if left over from failed test
_, _, _ = client.Delete(index, "", "", nil) // ignore error
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type MothershipConfig struct {
Index string
Path string
Template Template
Params map[string]string `yaml:"parameters"`
Db int
Db_topology int
Timeout int
Expand Down
5 changes: 5 additions & 0 deletions packetbeat/packetbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ output:
#username: "admin"
#password: "s3cr3t"

# Dictionary of HTTP parameters to pass within the url with index operations.
#parameters:
#param1: value1
#param2: value2

# Number of workers per Elasticsearch host.
#worker: 1

Expand Down
5 changes: 5 additions & 0 deletions topbeat/topbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ output:
#username: "admin"
#password: "s3cr3t"

# Dictionary of HTTP parameters to pass within the url with index operations.
#parameters:
#param1: value1
#param2: value2

# Number of workers per Elasticsearch host.
#worker: 1

Expand Down
5 changes: 5 additions & 0 deletions winlogbeat/winlogbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ output:
#username: "admin"
#password: "s3cr3t"

# Dictionary of HTTP parameters to pass within the url with index operations.
#parameters:
#param1: value1
#param2: value2

# Number of workers per Elasticsearch host.
#worker: 1

Expand Down

0 comments on commit 3dd0bb1

Please sign in to comment.