Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load the ES template on connect. #1381

Merged
merged 1 commit into from
Apr 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d
*Affecting all Beats*
- Drain response buffers when pipelining is used by redis output. {pull}1353[1353]
- Unterminated environment variable expressions in config files will now cause an error {pull}1389[1389]
- Fix issue with the automatic template loading when Elasticsearch is not available on Beat start. {issue}1321[1321]

*Packetbeat*

Expand Down
1 change: 1 addition & 0 deletions filebeat/invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sdasda
6 changes: 3 additions & 3 deletions libbeat/outputs/elasticsearch/api_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestOneHostSuccessResp(t *testing.T) {

server := ElasticsearchMock(200, expectedResp)

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestOneHost500Resp(t *testing.T) {

server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)
err := client.Connect(1 * time.Second)
if err != nil {
t.Fatalf("Failed to connect: %v", err)
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestOneHost503Resp(t *testing.T) {

server := ElasticsearchMock(503, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func GetTestingElasticsearch() *Client {
var address = "http://" + GetEsHost() + ":" + GetEsPort()
username := os.Getenv("ES_USER")
pass := os.Getenv("ES_PASS")
return NewClient(address, "", nil, nil, username, pass, nil)
return NewClient(address, "", nil, nil, username, pass, nil, nil)
}

func GetValidQueryResult() QueryResult {
Expand Down
6 changes: 3 additions & 3 deletions libbeat/outputs/elasticsearch/bulkapi_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {

server := ElasticsearchMock(200, expectedResp)

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestOneHost500Resp_Bulk(t *testing.T) {

server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestOneHost503Resp_Bulk(t *testing.T) {

server := ElasticsearchMock(503, []byte("Something wrong happened"))

client := NewClient(server.URL, "", nil, nil, "", "", nil)
client := NewClient(server.URL, "", nil, nil, "", "", nil, nil)

params := map[string]string{
"refresh": "true",
Expand Down
20 changes: 18 additions & 2 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ type Client struct {
json jsonReader
}

type connectCallback func(client *Client) error

type Connection struct {
URL string
Username string
Password string

http *http.Client
connected bool
http *http.Client
connected bool
onConnectCallback func() error
}

var (
Expand All @@ -61,6 +64,7 @@ func NewClient(
esURL, index string, proxyURL *url.URL, tls *tls.Config,
username, password string,
params map[string]string,
onConnectCallback connectCallback,
) *Client {
proxy := http.ProxyFromEnvironment
if proxyURL != nil {
Expand All @@ -82,6 +86,13 @@ func NewClient(
index: index,
params: params,
}

client.Connection.onConnectCallback = func() error {
if onConnectCallback != nil {
return onConnectCallback(client)
}
return nil
}
return client
}

Expand Down Expand Up @@ -424,6 +435,11 @@ func (conn *Connection) Connect(timeout time.Duration) error {
if !conn.connected {
return ErrNotConnected
}

err = conn.onConnectCallback()
if err != nil {
return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %v", err)
}
return nil
}

Expand Down
59 changes: 58 additions & 1 deletion libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"io/ioutil"
"path/filepath"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs"
"github.com/stretchr/testify/assert"
)

Expand All @@ -29,7 +31,7 @@ func TestCheckTemplate(t *testing.T) {
assert.Nil(t, err)

// Check for non existant template
assert.False(t, client.CheckTemplate("libbeat"))
assert.False(t, client.CheckTemplate("libbeat-notexists"))
}

func TestLoadTemplate(t *testing.T) {
Expand Down Expand Up @@ -129,3 +131,58 @@ func TestLoadBeatsTemplate(t *testing.T) {
assert.False(t, client.CheckTemplate(templateName))
}
}

// TestOutputLoadTemplate checks that the template is inserted before
// the first event is published.
func TestOutputLoadTemplate(t *testing.T) {

client := GetTestingElasticsearch()
err := client.Connect(5 * time.Second)
if err != nil {
t.Fatal(err)
}

// delete template if it exists
client.request("DELETE", "/_template/libbeat", nil, nil)

// Make sure template is not yet there
assert.False(t, client.CheckTemplate("libbeat"))

tPath, err := filepath.Abs("../../../topbeat/topbeat.template.json")
if err != nil {
t.Fatal(err)
}
config := map[string]interface{}{
"hosts": GetEsHost(),
"template": map[string]interface{}{
"name": "libbeat",
"path": tPath,
},
}

cfg, err := common.NewConfigFrom(config)
if err != nil {
t.Fatal(err)
}

output, err := New(cfg, 0)
if err != nil {
t.Fatal(err)
}
event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"host": "test-host",
"type": "libbeat",
"message": "Test message from libbeat",
}

err = output.PublishEvent(nil, outputs.Options{Guaranteed: true}, event)
if err != nil {
t.Fatal(err)
}

// Guaranteed publish, so the template should be there

assert.True(t, client.CheckTemplate("libbeat"))

}
89 changes: 55 additions & 34 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"bytes"
"crypto/tls"
"errors"
"fmt"
"io/ioutil"
"net/url"
"strings"
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
Expand All @@ -20,6 +22,9 @@ type elasticsearchOutput struct {
index string
mode mode.ConnectionMode
topology

templateContents []byte
templateMutex sync.Mutex
}

func init() {
Expand Down Expand Up @@ -69,7 +74,12 @@ func (out *elasticsearchOutput) init(
return err
}

clients, err := mode.MakeClients(cfg, makeClientFactory(tlsConfig, &config))
err = out.readTemplate(config.Template)
if err != nil {
return err
}

clients, err := mode.MakeClients(cfg, makeClientFactory(tlsConfig, &config, out))
if err != nil {
return err
}
Expand All @@ -91,8 +101,6 @@ func (out *elasticsearchOutput) init(
return err
}

loadTemplate(config.Template, clients)

if config.SaveTopology {
err := out.EnableTTL()
if err != nil {
Expand Down Expand Up @@ -122,52 +130,56 @@ func (out *elasticsearchOutput) init(
return nil
}

// loadTemplate checks if the index mapping template should be loaded
// In case template loading is enabled, template is written to index
func loadTemplate(config Template, clients []mode.ProtocolClient) {
// Check if template should be loaded
// Not being able to load the template will output an error but will not stop execution
if config.Name != "" && len(clients) > 0 {

// Always takes the first client
esClient := clients[0].(*Client)

// readTemplates reads the ES mapping template from the disk, if configured.
func (out *elasticsearchOutput) readTemplate(config Template) error {
if len(config.Name) > 0 {
// Look for the template in the configuration path, if it's not absolute
templatePath := paths.Resolve(paths.Config, config.Path)

logp.Info("Loading template enabled. Trying to load template: %v", templatePath)
logp.Info("Loading template enabled. Reading template file: %v", templatePath)

exists := esClient.CheckTemplate(config.Name)
var err error
out.templateContents, err = ioutil.ReadFile(templatePath)
if err != nil {
return fmt.Errorf("Error loading template %s: %v", templatePath, err)
}
}
return nil
}

// Check if template already exist or should be overwritten
if !exists || config.Overwrite {
// loadTemplate checks if the index mapping template should be loaded
// In case the template is not already loaded or overwritting is enabled, the
// template is written to index
func (out *elasticsearchOutput) loadTemplate(config Template, client *Client) error {
out.templateMutex.Lock()
defer out.templateMutex.Unlock()

if config.Overwrite {
logp.Info("Existing template will be overwritten, as overwrite is enabled.")
}
logp.Info("Trying to load template for client: %s", client)

// Load template from file
content, err := ioutil.ReadFile(templatePath)
if err != nil {
logp.Err("Could not load template from file path: %s; Error: %s", templatePath, err)
} else {
reader := bytes.NewReader(content)
err = esClient.LoadTemplate(config.Name, reader)
// Check if template already exist or should be overwritten
exists := client.CheckTemplate(config.Name)
if !exists || config.Overwrite {

if err != nil {
logp.Err("Could not load template: %v", err)
}
}
} else {
logp.Info("Template already exists and will not be overwritten.")
if config.Overwrite {
logp.Info("Existing template will be overwritten, as overwrite is enabled.")
}

reader := bytes.NewReader(out.templateContents)
err := client.LoadTemplate(config.Name, reader)
if err != nil {
return fmt.Errorf("Could not load template: %v", err)
}
} else {
logp.Info("Template already exists and will not be overwritten.")
}

return nil
}

func makeClientFactory(
tls *tls.Config,
config *elasticsearchConfig,
out *elasticsearchOutput,
) func(string) (mode.ProtocolClient, error) {
return func(host string) (mode.ProtocolClient, error) {
esURL, err := getURL(config.Protocol, config.Path, host)
Expand Down Expand Up @@ -196,10 +208,19 @@ func makeClientFactory(
if len(params) == 0 {
params = nil
}

// define a callback to be called on connection
var onConnected connectCallback
if len(out.templateContents) > 0 {
onConnected = func(client *Client) error {
return out.loadTemplate(config.Template, client)
}
}

client := NewClient(
esURL, config.Index, proxyURL, tls,
config.Username, config.Password,
params)
params, onConnected)
return client, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func esConnect(t *testing.T, index string) *esConnection {

username := os.Getenv("ES_USER")
password := os.Getenv("ES_PASS")
client := elasticsearch.NewClient(host, "", nil, nil, username, password, nil)
client := elasticsearch.NewClient(host, "", nil, nil, username, password, nil, nil)

// try to drop old index if left over from failed test
_, _, _ = client.Delete(index, "", "", nil) // ignore error
Expand Down
1 change: 1 addition & 0 deletions libbeat/tests/system/beatname.template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"template": true}
1 change: 1 addition & 0 deletions libbeat/tests/system/mockbeat.template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"template": true}
12 changes: 8 additions & 4 deletions libbeat/tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ def test_config_test(self):
"""
shutil.copy("../../etc/libbeat.yml",
os.path.join(self.working_dir, "libbeat.yml"))
with open(self.working_dir + "/beatname.template.json", "w") as f:
f.write('{"template": true}')

exit_code = self.run_beat(config="libbeat.yml",
extra_args=["-configtest"])
exit_code = self.run_beat(
config="libbeat.yml",
extra_args=["-configtest",
"-path.config", self.working_dir])

assert exit_code == 0
assert self.log_contains("Config OK") is True
Expand All @@ -70,8 +74,8 @@ def test_version(self):

assert self.log_contains("error loading config file") is False

with open(os.path.join(self.working_dir, "mockbeat.log"), "wb") \
as outputfile:
with open(os.path.join(self.working_dir, "mockbeat.log"), "wb") \
as outputfile:
proc = subprocess.Popen(args,
stdout=outputfile,
stderr=subprocess.STDOUT)
Expand Down