diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index c9060be47ef..1b12e16d1b7 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -37,6 +37,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d *Affecting all Beats* - Drain response buffers when pipelining is used by redis output. {pull}1353[1353] - Unterminated environment variable expressions in config files will now cause an error {pull}1389[1389] +- Fix issue with the automatic template loading when Elasticsearch is not available on Beat start. {issue}1321[1321] *Packetbeat* diff --git a/filebeat/invalid.json b/filebeat/invalid.json new file mode 100644 index 00000000000..407357c16c4 --- /dev/null +++ b/filebeat/invalid.json @@ -0,0 +1 @@ +sdasda diff --git a/libbeat/outputs/elasticsearch/api_mock_test.go b/libbeat/outputs/elasticsearch/api_mock_test.go index 4b0d13788b5..0c21a193f16 100644 --- a/libbeat/outputs/elasticsearch/api_mock_test.go +++ b/libbeat/outputs/elasticsearch/api_mock_test.go @@ -50,7 +50,7 @@ func TestOneHostSuccessResp(t *testing.T) { server := ElasticsearchMock(200, expectedResp) - client := NewClient(server.URL, "", nil, nil, "", "", nil) + client := NewClient(server.URL, "", nil, nil, "", "", nil, nil) params := map[string]string{ "refresh": "true", @@ -79,7 +79,7 @@ func TestOneHost500Resp(t *testing.T) { server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened")) - client := NewClient(server.URL, "", nil, nil, "", "", nil) + client := NewClient(server.URL, "", nil, nil, "", "", nil, nil) err := client.Connect(1 * time.Second) if err != nil { t.Fatalf("Failed to connect: %v", err) @@ -114,7 +114,7 @@ func TestOneHost503Resp(t *testing.T) { server := ElasticsearchMock(503, []byte("Something wrong happened")) - client := NewClient(server.URL, "", nil, nil, "", "", nil) + client := NewClient(server.URL, "", nil, nil, "", "", nil, nil) params := map[string]string{ "refresh": "true", diff --git a/libbeat/outputs/elasticsearch/api_test.go b/libbeat/outputs/elasticsearch/api_test.go index fbcbcc608f2..31ca3f2317d 100644 --- a/libbeat/outputs/elasticsearch/api_test.go +++ b/libbeat/outputs/elasticsearch/api_test.go @@ -36,7 +36,7 @@ func GetTestingElasticsearch() *Client { var address = "http://" + GetEsHost() + ":" + GetEsPort() username := os.Getenv("ES_USER") pass := os.Getenv("ES_PASS") - return NewClient(address, "", nil, nil, username, pass, nil) + return NewClient(address, "", nil, nil, username, pass, nil, nil) } func GetValidQueryResult() QueryResult { diff --git a/libbeat/outputs/elasticsearch/bulkapi_mock_test.go b/libbeat/outputs/elasticsearch/bulkapi_mock_test.go index 56ced1a8f85..706e65ca798 100644 --- a/libbeat/outputs/elasticsearch/bulkapi_mock_test.go +++ b/libbeat/outputs/elasticsearch/bulkapi_mock_test.go @@ -42,7 +42,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) { server := ElasticsearchMock(200, expectedResp) - client := NewClient(server.URL, "", nil, nil, "", "", nil) + client := NewClient(server.URL, "", nil, nil, "", "", nil, nil) params := map[string]string{ "refresh": "true", @@ -83,7 +83,7 @@ func TestOneHost500Resp_Bulk(t *testing.T) { server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened")) - client := NewClient(server.URL, "", nil, nil, "", "", nil) + client := NewClient(server.URL, "", nil, nil, "", "", nil, nil) params := map[string]string{ "refresh": "true", @@ -125,7 +125,7 @@ func TestOneHost503Resp_Bulk(t *testing.T) { server := ElasticsearchMock(503, []byte("Something wrong happened")) - client := NewClient(server.URL, "", nil, nil, "", "", nil) + client := NewClient(server.URL, "", nil, nil, "", "", nil, nil) params := map[string]string{ "refresh": "true", diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 6ae856278f3..1bc71342106 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -35,13 +35,16 @@ type Client struct { json jsonReader } +type connectCallback func(client *Client) error + type Connection struct { URL string Username string Password string - http *http.Client - connected bool + http *http.Client + connected bool + onConnectCallback func() error } var ( @@ -61,6 +64,7 @@ func NewClient( esURL, index string, proxyURL *url.URL, tls *tls.Config, username, password string, params map[string]string, + onConnectCallback connectCallback, ) *Client { proxy := http.ProxyFromEnvironment if proxyURL != nil { @@ -82,6 +86,13 @@ func NewClient( index: index, params: params, } + + client.Connection.onConnectCallback = func() error { + if onConnectCallback != nil { + return onConnectCallback(client) + } + return nil + } return client } @@ -424,6 +435,11 @@ func (conn *Connection) Connect(timeout time.Duration) error { if !conn.connected { return ErrNotConnected } + + err = conn.onConnectCallback() + if err != nil { + return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %v", err) + } return nil } diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 6f77a67a801..ee6c35adb8d 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -10,6 +10,8 @@ import ( "io/ioutil" "path/filepath" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs" "github.com/stretchr/testify/assert" ) @@ -29,7 +31,7 @@ func TestCheckTemplate(t *testing.T) { assert.Nil(t, err) // Check for non existant template - assert.False(t, client.CheckTemplate("libbeat")) + assert.False(t, client.CheckTemplate("libbeat-notexists")) } func TestLoadTemplate(t *testing.T) { @@ -129,3 +131,58 @@ func TestLoadBeatsTemplate(t *testing.T) { assert.False(t, client.CheckTemplate(templateName)) } } + +// TestOutputLoadTemplate checks that the template is inserted before +// the first event is published. +func TestOutputLoadTemplate(t *testing.T) { + + client := GetTestingElasticsearch() + err := client.Connect(5 * time.Second) + if err != nil { + t.Fatal(err) + } + + // delete template if it exists + client.request("DELETE", "/_template/libbeat", nil, nil) + + // Make sure template is not yet there + assert.False(t, client.CheckTemplate("libbeat")) + + tPath, err := filepath.Abs("../../../topbeat/topbeat.template.json") + if err != nil { + t.Fatal(err) + } + config := map[string]interface{}{ + "hosts": GetEsHost(), + "template": map[string]interface{}{ + "name": "libbeat", + "path": tPath, + }, + } + + cfg, err := common.NewConfigFrom(config) + if err != nil { + t.Fatal(err) + } + + output, err := New(cfg, 0) + if err != nil { + t.Fatal(err) + } + event := common.MapStr{ + "@timestamp": common.Time(time.Now()), + "host": "test-host", + "type": "libbeat", + "message": "Test message from libbeat", + } + + err = output.PublishEvent(nil, outputs.Options{Guaranteed: true}, event) + if err != nil { + t.Fatal(err) + } + + // Guaranteed publish, so the template should be there + + assert.True(t, client.CheckTemplate("libbeat")) + +} diff --git a/libbeat/outputs/elasticsearch/output.go b/libbeat/outputs/elasticsearch/output.go index 9fd5a4fa7b4..4dcde4e0594 100644 --- a/libbeat/outputs/elasticsearch/output.go +++ b/libbeat/outputs/elasticsearch/output.go @@ -4,9 +4,11 @@ import ( "bytes" "crypto/tls" "errors" + "fmt" "io/ioutil" "net/url" "strings" + "sync" "time" "github.com/elastic/beats/libbeat/common" @@ -20,6 +22,9 @@ type elasticsearchOutput struct { index string mode mode.ConnectionMode topology + + templateContents []byte + templateMutex sync.Mutex } func init() { @@ -69,7 +74,12 @@ func (out *elasticsearchOutput) init( return err } - clients, err := mode.MakeClients(cfg, makeClientFactory(tlsConfig, &config)) + err = out.readTemplate(config.Template) + if err != nil { + return err + } + + clients, err := mode.MakeClients(cfg, makeClientFactory(tlsConfig, &config, out)) if err != nil { return err } @@ -91,8 +101,6 @@ func (out *elasticsearchOutput) init( return err } - loadTemplate(config.Template, clients) - if config.SaveTopology { err := out.EnableTTL() if err != nil { @@ -122,52 +130,56 @@ func (out *elasticsearchOutput) init( return nil } -// loadTemplate checks if the index mapping template should be loaded -// In case template loading is enabled, template is written to index -func loadTemplate(config Template, clients []mode.ProtocolClient) { - // Check if template should be loaded - // Not being able to load the template will output an error but will not stop execution - if config.Name != "" && len(clients) > 0 { - - // Always takes the first client - esClient := clients[0].(*Client) - +// readTemplates reads the ES mapping template from the disk, if configured. +func (out *elasticsearchOutput) readTemplate(config Template) error { + if len(config.Name) > 0 { // Look for the template in the configuration path, if it's not absolute templatePath := paths.Resolve(paths.Config, config.Path) - logp.Info("Loading template enabled. Trying to load template: %v", templatePath) + logp.Info("Loading template enabled. Reading template file: %v", templatePath) - exists := esClient.CheckTemplate(config.Name) + var err error + out.templateContents, err = ioutil.ReadFile(templatePath) + if err != nil { + return fmt.Errorf("Error loading template %s: %v", templatePath, err) + } + } + return nil +} - // Check if template already exist or should be overwritten - if !exists || config.Overwrite { +// loadTemplate checks if the index mapping template should be loaded +// In case the template is not already loaded or overwritting is enabled, the +// template is written to index +func (out *elasticsearchOutput) loadTemplate(config Template, client *Client) error { + out.templateMutex.Lock() + defer out.templateMutex.Unlock() - if config.Overwrite { - logp.Info("Existing template will be overwritten, as overwrite is enabled.") - } + logp.Info("Trying to load template for client: %s", client) - // Load template from file - content, err := ioutil.ReadFile(templatePath) - if err != nil { - logp.Err("Could not load template from file path: %s; Error: %s", templatePath, err) - } else { - reader := bytes.NewReader(content) - err = esClient.LoadTemplate(config.Name, reader) + // Check if template already exist or should be overwritten + exists := client.CheckTemplate(config.Name) + if !exists || config.Overwrite { - if err != nil { - logp.Err("Could not load template: %v", err) - } - } - } else { - logp.Info("Template already exists and will not be overwritten.") + if config.Overwrite { + logp.Info("Existing template will be overwritten, as overwrite is enabled.") } + reader := bytes.NewReader(out.templateContents) + err := client.LoadTemplate(config.Name, reader) + if err != nil { + return fmt.Errorf("Could not load template: %v", err) + } + } else { + logp.Info("Template already exists and will not be overwritten.") } + + return nil } func makeClientFactory( tls *tls.Config, config *elasticsearchConfig, + out *elasticsearchOutput, ) func(string) (mode.ProtocolClient, error) { return func(host string) (mode.ProtocolClient, error) { esURL, err := getURL(config.Protocol, config.Path, host) @@ -196,10 +208,19 @@ func makeClientFactory( if len(params) == 0 { params = nil } + + // define a callback to be called on connection + var onConnected connectCallback + if len(out.templateContents) > 0 { + onConnected = func(client *Client) error { + return out.loadTemplate(config.Template, client) + } + } + client := NewClient( esURL, config.Index, proxyURL, tls, config.Username, config.Password, - params) + params, onConnected) return client, nil } } diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index af1b32328c5..a353d0038b3 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -72,7 +72,7 @@ func esConnect(t *testing.T, index string) *esConnection { username := os.Getenv("ES_USER") password := os.Getenv("ES_PASS") - client := elasticsearch.NewClient(host, "", nil, nil, username, password, nil) + client := elasticsearch.NewClient(host, "", nil, nil, username, password, nil, nil) // try to drop old index if left over from failed test _, _, _ = client.Delete(index, "", "", nil) // ignore error diff --git a/libbeat/tests/system/beatname.template.json b/libbeat/tests/system/beatname.template.json new file mode 100644 index 00000000000..2e9e1b84e89 --- /dev/null +++ b/libbeat/tests/system/beatname.template.json @@ -0,0 +1 @@ +{"template": true} \ No newline at end of file diff --git a/libbeat/tests/system/mockbeat.template.json b/libbeat/tests/system/mockbeat.template.json new file mode 100644 index 00000000000..2e9e1b84e89 --- /dev/null +++ b/libbeat/tests/system/mockbeat.template.json @@ -0,0 +1 @@ +{"template": true} \ No newline at end of file diff --git a/libbeat/tests/system/test_base.py b/libbeat/tests/system/test_base.py index a29449f815d..248883e550c 100644 --- a/libbeat/tests/system/test_base.py +++ b/libbeat/tests/system/test_base.py @@ -46,9 +46,13 @@ def test_config_test(self): """ shutil.copy("../../etc/libbeat.yml", os.path.join(self.working_dir, "libbeat.yml")) + with open(self.working_dir + "/beatname.template.json", "w") as f: + f.write('{"template": true}') - exit_code = self.run_beat(config="libbeat.yml", - extra_args=["-configtest"]) + exit_code = self.run_beat( + config="libbeat.yml", + extra_args=["-configtest", + "-path.config", self.working_dir]) assert exit_code == 0 assert self.log_contains("Config OK") is True @@ -70,8 +74,8 @@ def test_version(self): assert self.log_contains("error loading config file") is False - with open(os.path.join(self.working_dir, "mockbeat.log"), "wb") \ - as outputfile: + with open(os.path.join(self.working_dir, "mockbeat.log"), "wb") \ + as outputfile: proc = subprocess.Popen(args, stdout=outputfile, stderr=subprocess.STDOUT)