From d601de23626f364811d44de8f67b21f66bfd835e Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Tue, 19 Apr 2016 07:46:20 +0200 Subject: [PATCH] Load the ES template on connect. (#1381) It used to try loading it only once on init, causing bug #1321. This change moves the call to loadTemplate at connection time, immediately after successful connection. This has the effect that if overwrite is true, the template will be loaded on each new established connection. The template is read on init time and sent to Elasticsearch at connect time. This means that if the template path is wrong, it will be discovered at startup (including `-configtest`). In case there is an error loading the template, the Connect call fails. This commit includes an integration test for the behaviour. --- CHANGELOG.asciidoc | 1 + filebeat/invalid.json | 1 + .../outputs/elasticsearch/api_mock_test.go | 6 +- libbeat/outputs/elasticsearch/api_test.go | 2 +- .../elasticsearch/bulkapi_mock_test.go | 6 +- libbeat/outputs/elasticsearch/client.go | 20 ++++- .../elasticsearch/client_integration_test.go | 59 +++++++++++- libbeat/outputs/elasticsearch/output.go | 89 ++++++++++++------- .../logstash/logstash_integration_test.go | 2 +- libbeat/tests/system/beatname.template.json | 1 + libbeat/tests/system/mockbeat.template.json | 1 + libbeat/tests/system/test_base.py | 12 ++- 12 files changed, 151 insertions(+), 49 deletions(-) create mode 100644 filebeat/invalid.json create mode 100644 libbeat/tests/system/beatname.template.json create mode 100644 libbeat/tests/system/mockbeat.template.json diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index c9060be47ef..1b12e16d1b7 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -37,6 +37,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d *Affecting all Beats* - Drain response buffers when pipelining is used by redis output. {pull}1353[1353] - Unterminated environment variable expressions in config files will now cause an error {pull}1389[1389] +- Fix issue with the automatic template loading when Elasticsearch is not available on Beat start. {issue}1321[1321] *Packetbeat* diff --git a/filebeat/invalid.json b/filebeat/invalid.json new file mode 100644 index 00000000000..407357c16c4 --- /dev/null +++ b/filebeat/invalid.json @@ -0,0 +1 @@ +sdasda diff --git a/libbeat/outputs/elasticsearch/api_mock_test.go b/libbeat/outputs/elasticsearch/api_mock_test.go index 4b0d13788b5..0c21a193f16 100644 --- a/libbeat/outputs/elasticsearch/api_mock_test.go +++ b/libbeat/outputs/elasticsearch/api_mock_test.go @@ -50,7 +50,7 @@ func TestOneHostSuccessResp(t *testing.T) { server := ElasticsearchMock(200, expectedResp) - client := NewClient(server.URL, "", nil, nil, "", "", nil) + client := NewClient(server.URL, "", nil, nil, "", "", nil, nil) params := map[string]string{ "refresh": "true", @@ -79,7 +79,7 @@ func TestOneHost500Resp(t *testing.T) { server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened")) - client := NewClient(server.URL, "", nil, nil, "", "", nil) + client := NewClient(server.URL, "", nil, nil, "", "", nil, nil) err := client.Connect(1 * time.Second) if err != nil { t.Fatalf("Failed to connect: %v", err) @@ -114,7 +114,7 @@ func TestOneHost503Resp(t *testing.T) { server := ElasticsearchMock(503, []byte("Something wrong happened")) - client := NewClient(server.URL, "", nil, nil, "", "", nil) + client := NewClient(server.URL, "", nil, nil, "", "", nil, nil) params := map[string]string{ "refresh": "true", diff --git a/libbeat/outputs/elasticsearch/api_test.go b/libbeat/outputs/elasticsearch/api_test.go index fbcbcc608f2..31ca3f2317d 100644 --- a/libbeat/outputs/elasticsearch/api_test.go +++ b/libbeat/outputs/elasticsearch/api_test.go @@ -36,7 +36,7 @@ func GetTestingElasticsearch() *Client { var address = "http://" + GetEsHost() + ":" + GetEsPort() username := os.Getenv("ES_USER") pass := os.Getenv("ES_PASS") - return NewClient(address, "", nil, nil, username, pass, nil) + return NewClient(address, "", nil, nil, username, pass, nil, nil) } func GetValidQueryResult() QueryResult { diff --git a/libbeat/outputs/elasticsearch/bulkapi_mock_test.go b/libbeat/outputs/elasticsearch/bulkapi_mock_test.go index 56ced1a8f85..706e65ca798 100644 --- a/libbeat/outputs/elasticsearch/bulkapi_mock_test.go +++ b/libbeat/outputs/elasticsearch/bulkapi_mock_test.go @@ -42,7 +42,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) { server := ElasticsearchMock(200, expectedResp) - client := NewClient(server.URL, "", nil, nil, "", "", nil) + client := NewClient(server.URL, "", nil, nil, "", "", nil, nil) params := map[string]string{ "refresh": "true", @@ -83,7 +83,7 @@ func TestOneHost500Resp_Bulk(t *testing.T) { server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened")) - client := NewClient(server.URL, "", nil, nil, "", "", nil) + client := NewClient(server.URL, "", nil, nil, "", "", nil, nil) params := map[string]string{ "refresh": "true", @@ -125,7 +125,7 @@ func TestOneHost503Resp_Bulk(t *testing.T) { server := ElasticsearchMock(503, []byte("Something wrong happened")) - client := NewClient(server.URL, "", nil, nil, "", "", nil) + client := NewClient(server.URL, "", nil, nil, "", "", nil, nil) params := map[string]string{ "refresh": "true", diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 6ae856278f3..1bc71342106 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -35,13 +35,16 @@ type Client struct { json jsonReader } +type connectCallback func(client *Client) error + type Connection struct { URL string Username string Password string - http *http.Client - connected bool + http *http.Client + connected bool + onConnectCallback func() error } var ( @@ -61,6 +64,7 @@ func NewClient( esURL, index string, proxyURL *url.URL, tls *tls.Config, username, password string, params map[string]string, + onConnectCallback connectCallback, ) *Client { proxy := http.ProxyFromEnvironment if proxyURL != nil { @@ -82,6 +86,13 @@ func NewClient( index: index, params: params, } + + client.Connection.onConnectCallback = func() error { + if onConnectCallback != nil { + return onConnectCallback(client) + } + return nil + } return client } @@ -424,6 +435,11 @@ func (conn *Connection) Connect(timeout time.Duration) error { if !conn.connected { return ErrNotConnected } + + err = conn.onConnectCallback() + if err != nil { + return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %v", err) + } return nil } diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 6f77a67a801..ee6c35adb8d 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -10,6 +10,8 @@ import ( "io/ioutil" "path/filepath" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs" "github.com/stretchr/testify/assert" ) @@ -29,7 +31,7 @@ func TestCheckTemplate(t *testing.T) { assert.Nil(t, err) // Check for non existant template - assert.False(t, client.CheckTemplate("libbeat")) + assert.False(t, client.CheckTemplate("libbeat-notexists")) } func TestLoadTemplate(t *testing.T) { @@ -129,3 +131,58 @@ func TestLoadBeatsTemplate(t *testing.T) { assert.False(t, client.CheckTemplate(templateName)) } } + +// TestOutputLoadTemplate checks that the template is inserted before +// the first event is published. +func TestOutputLoadTemplate(t *testing.T) { + + client := GetTestingElasticsearch() + err := client.Connect(5 * time.Second) + if err != nil { + t.Fatal(err) + } + + // delete template if it exists + client.request("DELETE", "/_template/libbeat", nil, nil) + + // Make sure template is not yet there + assert.False(t, client.CheckTemplate("libbeat")) + + tPath, err := filepath.Abs("../../../topbeat/topbeat.template.json") + if err != nil { + t.Fatal(err) + } + config := map[string]interface{}{ + "hosts": GetEsHost(), + "template": map[string]interface{}{ + "name": "libbeat", + "path": tPath, + }, + } + + cfg, err := common.NewConfigFrom(config) + if err != nil { + t.Fatal(err) + } + + output, err := New(cfg, 0) + if err != nil { + t.Fatal(err) + } + event := common.MapStr{ + "@timestamp": common.Time(time.Now()), + "host": "test-host", + "type": "libbeat", + "message": "Test message from libbeat", + } + + err = output.PublishEvent(nil, outputs.Options{Guaranteed: true}, event) + if err != nil { + t.Fatal(err) + } + + // Guaranteed publish, so the template should be there + + assert.True(t, client.CheckTemplate("libbeat")) + +} diff --git a/libbeat/outputs/elasticsearch/output.go b/libbeat/outputs/elasticsearch/output.go index 9fd5a4fa7b4..4dcde4e0594 100644 --- a/libbeat/outputs/elasticsearch/output.go +++ b/libbeat/outputs/elasticsearch/output.go @@ -4,9 +4,11 @@ import ( "bytes" "crypto/tls" "errors" + "fmt" "io/ioutil" "net/url" "strings" + "sync" "time" "github.com/elastic/beats/libbeat/common" @@ -20,6 +22,9 @@ type elasticsearchOutput struct { index string mode mode.ConnectionMode topology + + templateContents []byte + templateMutex sync.Mutex } func init() { @@ -69,7 +74,12 @@ func (out *elasticsearchOutput) init( return err } - clients, err := mode.MakeClients(cfg, makeClientFactory(tlsConfig, &config)) + err = out.readTemplate(config.Template) + if err != nil { + return err + } + + clients, err := mode.MakeClients(cfg, makeClientFactory(tlsConfig, &config, out)) if err != nil { return err } @@ -91,8 +101,6 @@ func (out *elasticsearchOutput) init( return err } - loadTemplate(config.Template, clients) - if config.SaveTopology { err := out.EnableTTL() if err != nil { @@ -122,52 +130,56 @@ func (out *elasticsearchOutput) init( return nil } -// loadTemplate checks if the index mapping template should be loaded -// In case template loading is enabled, template is written to index -func loadTemplate(config Template, clients []mode.ProtocolClient) { - // Check if template should be loaded - // Not being able to load the template will output an error but will not stop execution - if config.Name != "" && len(clients) > 0 { - - // Always takes the first client - esClient := clients[0].(*Client) - +// readTemplates reads the ES mapping template from the disk, if configured. +func (out *elasticsearchOutput) readTemplate(config Template) error { + if len(config.Name) > 0 { // Look for the template in the configuration path, if it's not absolute templatePath := paths.Resolve(paths.Config, config.Path) - logp.Info("Loading template enabled. Trying to load template: %v", templatePath) + logp.Info("Loading template enabled. Reading template file: %v", templatePath) - exists := esClient.CheckTemplate(config.Name) + var err error + out.templateContents, err = ioutil.ReadFile(templatePath) + if err != nil { + return fmt.Errorf("Error loading template %s: %v", templatePath, err) + } + } + return nil +} - // Check if template already exist or should be overwritten - if !exists || config.Overwrite { +// loadTemplate checks if the index mapping template should be loaded +// In case the template is not already loaded or overwritting is enabled, the +// template is written to index +func (out *elasticsearchOutput) loadTemplate(config Template, client *Client) error { + out.templateMutex.Lock() + defer out.templateMutex.Unlock() - if config.Overwrite { - logp.Info("Existing template will be overwritten, as overwrite is enabled.") - } + logp.Info("Trying to load template for client: %s", client) - // Load template from file - content, err := ioutil.ReadFile(templatePath) - if err != nil { - logp.Err("Could not load template from file path: %s; Error: %s", templatePath, err) - } else { - reader := bytes.NewReader(content) - err = esClient.LoadTemplate(config.Name, reader) + // Check if template already exist or should be overwritten + exists := client.CheckTemplate(config.Name) + if !exists || config.Overwrite { - if err != nil { - logp.Err("Could not load template: %v", err) - } - } - } else { - logp.Info("Template already exists and will not be overwritten.") + if config.Overwrite { + logp.Info("Existing template will be overwritten, as overwrite is enabled.") } + reader := bytes.NewReader(out.templateContents) + err := client.LoadTemplate(config.Name, reader) + if err != nil { + return fmt.Errorf("Could not load template: %v", err) + } + } else { + logp.Info("Template already exists and will not be overwritten.") } + + return nil } func makeClientFactory( tls *tls.Config, config *elasticsearchConfig, + out *elasticsearchOutput, ) func(string) (mode.ProtocolClient, error) { return func(host string) (mode.ProtocolClient, error) { esURL, err := getURL(config.Protocol, config.Path, host) @@ -196,10 +208,19 @@ func makeClientFactory( if len(params) == 0 { params = nil } + + // define a callback to be called on connection + var onConnected connectCallback + if len(out.templateContents) > 0 { + onConnected = func(client *Client) error { + return out.loadTemplate(config.Template, client) + } + } + client := NewClient( esURL, config.Index, proxyURL, tls, config.Username, config.Password, - params) + params, onConnected) return client, nil } } diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index af1b32328c5..a353d0038b3 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -72,7 +72,7 @@ func esConnect(t *testing.T, index string) *esConnection { username := os.Getenv("ES_USER") password := os.Getenv("ES_PASS") - client := elasticsearch.NewClient(host, "", nil, nil, username, password, nil) + client := elasticsearch.NewClient(host, "", nil, nil, username, password, nil, nil) // try to drop old index if left over from failed test _, _, _ = client.Delete(index, "", "", nil) // ignore error diff --git a/libbeat/tests/system/beatname.template.json b/libbeat/tests/system/beatname.template.json new file mode 100644 index 00000000000..2e9e1b84e89 --- /dev/null +++ b/libbeat/tests/system/beatname.template.json @@ -0,0 +1 @@ +{"template": true} \ No newline at end of file diff --git a/libbeat/tests/system/mockbeat.template.json b/libbeat/tests/system/mockbeat.template.json new file mode 100644 index 00000000000..2e9e1b84e89 --- /dev/null +++ b/libbeat/tests/system/mockbeat.template.json @@ -0,0 +1 @@ +{"template": true} \ No newline at end of file diff --git a/libbeat/tests/system/test_base.py b/libbeat/tests/system/test_base.py index a29449f815d..248883e550c 100644 --- a/libbeat/tests/system/test_base.py +++ b/libbeat/tests/system/test_base.py @@ -46,9 +46,13 @@ def test_config_test(self): """ shutil.copy("../../etc/libbeat.yml", os.path.join(self.working_dir, "libbeat.yml")) + with open(self.working_dir + "/beatname.template.json", "w") as f: + f.write('{"template": true}') - exit_code = self.run_beat(config="libbeat.yml", - extra_args=["-configtest"]) + exit_code = self.run_beat( + config="libbeat.yml", + extra_args=["-configtest", + "-path.config", self.working_dir]) assert exit_code == 0 assert self.log_contains("Config OK") is True @@ -70,8 +74,8 @@ def test_version(self): assert self.log_contains("error loading config file") is False - with open(os.path.join(self.working_dir, "mockbeat.log"), "wb") \ - as outputfile: + with open(os.path.join(self.working_dir, "mockbeat.log"), "wb") \ + as outputfile: proc = subprocess.Popen(args, stdout=outputfile, stderr=subprocess.STDOUT)