From e80365bba3a0b48bddbfcb7702b1444fb477a067 Mon Sep 17 00:00:00 2001 From: Thibault Cohen Date: Fri, 5 Feb 2016 01:03:05 -0500 Subject: [PATCH] [WIP] Etcd integration for configuration --- Godeps | 1 + Makefile | 6 +- cmd/telegraf/telegraf.go | 205 ++++++++++----- internal/config/config.go | 9 + internal/etcd/etcd.go | 248 ++++++++++++++++++ internal/etcd/etcd_test.go | 195 ++++++++++++++ .../etcd/testdata/test1/hosts/localhost.conf | 9 + .../etcd/testdata/test1/labels/influx.conf | 4 + .../etcd/testdata/test1/labels/network.conf | 2 + .../etcd/testdata/test1/labels/network2.conf | 4 + internal/etcd/testdata/test1/main.conf | 10 + internal/etcd/testdata/test2/main.conf | 12 + 12 files changed, 637 insertions(+), 68 deletions(-) create mode 100644 internal/etcd/etcd.go create mode 100644 internal/etcd/etcd_test.go create mode 100644 internal/etcd/testdata/test1/hosts/localhost.conf create mode 100644 internal/etcd/testdata/test1/labels/influx.conf create mode 100644 internal/etcd/testdata/test1/labels/network.conf create mode 100644 internal/etcd/testdata/test1/labels/network2.conf create mode 100644 internal/etcd/testdata/test1/main.conf create mode 100644 internal/etcd/testdata/test2/main.conf diff --git a/Godeps b/Godeps index 7389b1cb85093..56cb66cd241ab 100644 --- a/Godeps +++ b/Godeps @@ -6,6 +6,7 @@ github.com/aws/aws-sdk-go 87b1e60a50b09e4812dee560b33a238f67305804 github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 github.com/dancannon/gorethink 6f088135ff288deb9d5546f4c71919207f891a70 +github.com/coreos/etcd bdee27b19e8601ffd7bd4f0481abe9bbae04bd09 github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367 diff --git a/Makefile b/Makefile index ef316bd03f42f..a71a040709d44 100644 --- a/Makefile +++ b/Makefile @@ -71,6 +71,7 @@ endif docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim + docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379 # Run docker containers necessary for CircleCI unit tests docker-run-circle: @@ -85,11 +86,12 @@ docker-run-circle: docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim + docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379 # Kill all docker containers, ignore errors docker-kill: - -docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp - -docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp + -docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd + -docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd # Run full unit tests using docker containers (includes setup and teardown) test: vet docker-kill docker-run diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index a65c5607c7162..216d0519a27ee 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/telegraf/internal/etcd" _ "github.com/influxdata/telegraf/plugins/inputs/all" _ "github.com/influxdata/telegraf/plugins/outputs/all" @@ -21,6 +22,12 @@ var fDebug = flag.Bool("debug", false, var fQuiet = flag.Bool("quiet", false, "run in quiet mode") var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit") +var fEtcd = flag.String("etcd", "", "etcd url where configuration is stored") +var fEtcdFolder = flag.String("etcdfolder", "/telegraf", "etcd root folder where configuration is stored") +var fEtcdSendConfigDir = flag.String("etcdwriteconfigdir", "", "store the following config dir to etcd") +var fEtcdSendConfig = flag.String("etcdwriteconfig", "", "store the following config file to etcd") +var fEtcdSendLabel = flag.String("etcdwritelabel", "", "store config file to etcd with this label") +var fEtcdReadLabels = flag.String("etcdreadlabels", "", "read config from etcd using labels (comma-separated)") var fConfig = flag.String("config", "", "configuration file to load") var fConfigDirectory = flag.String("config-directory", "", "directory containing additional *.conf files") @@ -54,16 +61,20 @@ Usage: The flags are: - -config configuration file to load - -test gather metrics once, print them to stdout, and exit - -sample-config print out full sample configuration to stdout - -config-directory directory containing additional *.conf files - -input-filter filter the input plugins to enable, separator is : - -output-filter filter the output plugins to enable, separator is : - -usage print usage for a plugin, ie, 'telegraf -usage mysql' - -debug print metrics as they're generated to stdout - -quiet run in quiet mode - -version print the version to stdout + -config configuration file to load + -test gather metrics once, print them to stdout, and exit + -sample-config print out full sample configuration to stdout + -config-directory directory containing additional *.conf files + -etcdwriteconfigdir store the following config dir to etcd + -etcdwriteconfig store the following config file to etcd + -etcdwritelabel store config file to etcd with this label + -etcdreadlabels read config from etcd using labels (comma-separated) + -input-filter filter the input plugins to enable, separator is : + -output-filter filter the output plugins to enable, separator is : + -usage print usage for a plugin, ie, 'telegraf -usage mysql' + -debug print metrics as they're generated to stdout + -quiet run in quiet mode + -version print the version to stdout Examples: @@ -84,63 +95,138 @@ Examples: ` func main() { + // Read flags + flag.Usage = func() { usageExit(0) } + flag.Parse() + if flag.NFlag() == 0 { + usageExit(0) + } + + // Prepare signals handling reload := make(chan bool, 1) reload <- true - for <-reload { - reload <- false - flag.Usage = func() { usageExit(0) } - flag.Parse() - - if flag.NFlag() == 0 { - usageExit(0) + shutdown := make(chan struct{}) + signals := make(chan os.Signal) + signal.Notify(signals, os.Interrupt, syscall.SIGHUP) + + // Prepare etcd if needed + var e *etcd.EtcdClient + if *fEtcd != "" { + e = etcd.NewEtcdClient(*fEtcd, *fEtcdFolder) + if *fEtcdSendConfig == "" && *fEtcdSendLabel == "" && *fEtcdSendConfigDir == "" { + go e.LaunchWatcher(shutdown, signals) } + } - var inputFilters []string - if *fInputFiltersLegacy != "" { - inputFilter := strings.TrimSpace(*fInputFiltersLegacy) - inputFilters = strings.Split(":"+inputFilter+":", ":") - } - if *fInputFilters != "" { - inputFilter := strings.TrimSpace(*fInputFilters) - inputFilters = strings.Split(":"+inputFilter+":", ":") + // Handle signals + go func() { + for { + sig := <-signals + if sig == os.Interrupt { + close(shutdown) + } else if sig == syscall.SIGHUP { + log.Print("Reloading Telegraf config\n") + <-reload + reload <- true + close(shutdown) + } } + }() - var outputFilters []string - if *fOutputFiltersLegacy != "" { - outputFilter := strings.TrimSpace(*fOutputFiltersLegacy) - outputFilters = strings.Split(":"+outputFilter+":", ":") - } - if *fOutputFilters != "" { - outputFilter := strings.TrimSpace(*fOutputFilters) - outputFilters = strings.Split(":"+outputFilter+":", ":") - } + // Prepare inputs + var inputFilters []string + if *fInputFiltersLegacy != "" { + inputFilter := strings.TrimSpace(*fInputFiltersLegacy) + inputFilters = strings.Split(":"+inputFilter+":", ":") + } + if *fInputFilters != "" { + inputFilter := strings.TrimSpace(*fInputFilters) + inputFilters = strings.Split(":"+inputFilter+":", ":") + } - if *fVersion { - v := fmt.Sprintf("Telegraf - Version %s", Version) - fmt.Println(v) - return - } + // Prepare outputs + var outputFilters []string + if *fOutputFiltersLegacy != "" { + outputFilter := strings.TrimSpace(*fOutputFiltersLegacy) + outputFilters = strings.Split(":"+outputFilter+":", ":") + } + if *fOutputFilters != "" { + outputFilter := strings.TrimSpace(*fOutputFilters) + outputFilters = strings.Split(":"+outputFilter+":", ":") + } - if *fSampleConfig { - config.PrintSampleConfig(inputFilters, outputFilters) - return - } + // Print version + if *fVersion { + v := fmt.Sprintf("Telegraf - Version %s", Version) + fmt.Println(v) + return + } - if *fUsage != "" { - if err := config.PrintInputConfig(*fUsage); err != nil { - if err2 := config.PrintOutputConfig(*fUsage); err2 != nil { - log.Fatalf("%s and %s", err, err2) - } + // Print sample config + if *fSampleConfig { + config.PrintSampleConfig(inputFilters, outputFilters) + return + } + + // Print usage + if *fUsage != "" { + if err := config.PrintInputConfig(*fUsage); err != nil { + if err2 := config.PrintOutputConfig(*fUsage); err2 != nil { + log.Fatalf("%s and %s", err, err2) } - return } + return + } + + for <-reload { + // Reset signal handler vars + shutdown = make(chan struct{}) + reload <- false + // Prepare config var ( c *config.Config err error ) - if *fConfig != "" { + if *fEtcd != "" { + c = config.NewConfig() + c.OutputFilters = outputFilters + c.InputFilters = inputFilters + if *fEtcdSendConfigDir != "" { + // TODO check config format before write it + // Write config dir to etcd + err = c.LoadDirectory(*fEtcdSendConfigDir) + if err != nil { + log.Fatal(err) + } + err = e.WriteConfigDir(*fEtcdSendConfigDir) + if err != nil { + log.Fatal(err) + } + return + } else if *fEtcdSendConfig != "" && *fEtcdSendLabel != "" { + // TODO check config format before write it + // Write config to etcd + err = c.LoadConfig(*fEtcdSendConfig) + if err != nil { + log.Fatal(err) + } + err = e.WriteLabelConfig(*fEtcdSendLabel, *fEtcdSendConfig) + if err != nil { + log.Fatal(err) + } + return + } else { + // Read config to etcd + log.Printf("Config read from etcd with labels %s\n", *fEtcdReadLabels) + c, err = e.ReadConfig(c, *fEtcdReadLabels) + if err != nil { + log.Fatal(err) + } + } + } else if *fConfig != "" { + // Read config from file c = config.NewConfig() c.OutputFilters = outputFilters c.InputFilters = inputFilters @@ -153,6 +239,7 @@ func main() { os.Exit(1) } + // Read config dir if *fConfigDirectoryLegacy != "" { err = c.LoadDirectory(*fConfigDirectoryLegacy) if err != nil { @@ -160,12 +247,14 @@ func main() { } } + // Read config dir if *fConfigDirectory != "" { err = c.LoadDirectory(*fConfigDirectory) if err != nil { log.Fatal(err) } } + // check config if len(c.Outputs) == 0 { log.Fatalf("Error: no outputs found, did you provide a valid config file?") } @@ -199,22 +288,6 @@ func main() { log.Fatal(err) } - shutdown := make(chan struct{}) - signals := make(chan os.Signal) - signal.Notify(signals, os.Interrupt, syscall.SIGHUP) - go func() { - sig := <-signals - if sig == os.Interrupt { - close(shutdown) - } - if sig == syscall.SIGHUP { - log.Printf("Reloading Telegraf config\n") - <-reload - reload <- true - close(shutdown) - } - }() - log.Printf("Starting Telegraf (version %s)\n", Version) log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " ")) log.Printf("Loaded inputs: %s", strings.Join(c.InputNames(), " ")) diff --git a/internal/config/config.go b/internal/config/config.go index a7a9eaab479e7..d1adccc61ff20 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -41,6 +41,7 @@ func NewConfig() *Config { Agent: &AgentConfig{ Interval: internal.Duration{Duration: 10 * time.Second}, RoundInterval: true, + Labels: make([]string, 0), FlushInterval: internal.Duration{Duration: 10 * time.Second}, FlushJitter: internal.Duration{Duration: 5 * time.Second}, }, @@ -99,6 +100,9 @@ type AgentConfig struct { // Quiet is the option for running in quiet mode Quiet bool Hostname string + + // Etcd labels + Labels []string } // Inputs returns a list of strings of the configured inputs. @@ -326,7 +330,12 @@ func (c *Config) LoadConfig(path string) error { if err != nil { return err } + err = c.LoadConfigFromTable(tbl) + return err +} +func (c *Config) LoadConfigFromTable(tbl *ast.Table) error { + var err error for name, val := range tbl.Fields { subTable, ok := val.(*ast.Table) if !ok { diff --git a/internal/etcd/etcd.go b/internal/etcd/etcd.go new file mode 100644 index 0000000000000..40e16bc1bfabb --- /dev/null +++ b/internal/etcd/etcd.go @@ -0,0 +1,248 @@ +package etcd + +import ( + "golang.org/x/net/context" + "io/ioutil" + "log" + "os" + "path" + "strings" + "syscall" + "time" + + "github.com/coreos/etcd/client" + influxconfig "github.com/influxdata/config" + "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/toml" + "github.com/influxdata/toml/ast" +) + +type EtcdClient struct { + Kapi client.KeysAPI + Folder string +} + +func (e *EtcdClient) LaunchWatcher(shutdown chan struct{}, signals chan os.Signal) { + // TODO: All telegraf client will reload for each changes... + // Maybe we want to reload on those we need to ??? + // So we need to create a watcher by labels ?? + for { + watcherOpts := client.WatcherOptions{AfterIndex: 0, Recursive: true} + w := e.Kapi.Watcher(e.Folder, &watcherOpts) + r, err := w.Next(context.Background()) + if err != nil { + // TODO What we have to do here ???? + log.Fatal("Error occurred", err) + } + if r.Action == "set" || r.Action == "update" { + // do something with Response r here + log.Printf("Changes detected in etcd (%s action detected)\n", r.Action) + log.Print("Reloading Telegraf config\n") + signals <- syscall.SIGHUP + time.Sleep(time.Duration(1) * time.Second) + } + } +} + +func NewEtcdClient(urls string, folder string) *EtcdClient { + // Create a new etcd client + cfg := client.Config{ + Endpoints: []string{"http://127.0.0.1:2379"}, + Transport: client.DefaultTransport, + } + + e := &EtcdClient{} + c, err := client.New(cfg) + if err != nil { + log.Fatal(err) + } + kapi := client.NewKeysAPI(c) + + e.Kapi = kapi + e.Folder = folder + + return e +} + +func (e *EtcdClient) WriteConfigDir(configdir string) error { + directoryEntries, err := ioutil.ReadDir(configdir) + if err != nil { + return err + } + for _, entry := range directoryEntries { + name := entry.Name() + if entry.IsDir() { + if name == "labels" { + // Handle labels + directoryEntries, err := ioutil.ReadDir(path.Join(configdir, name)) + if err != nil { + return err + } + for _, entry := range directoryEntries { + filename := entry.Name() + if len(filename) < 6 || filename[len(filename)-5:] != ".conf" { + continue + } + label := filename[:len(filename)-5] + err = e.WriteLabelConfig(label, path.Join(configdir, name, filename)) + if err != nil { + return err + } + } + } else if name == "hosts" { + // Handle hosts specific config + directoryEntries, err := ioutil.ReadDir(path.Join(configdir, name)) + if err != nil { + return err + } + + for _, entry := range directoryEntries { + filename := entry.Name() + if len(filename) < 6 || filename[len(filename)-5:] != ".conf" { + continue + } + hostname := filename[:len(filename)-5] + err = e.WriteHostConfig(hostname, path.Join(configdir, name, filename)) + if err != nil { + return err + } + } + } + continue + } + if name == "main.conf" { + // Handle main config + err := e.WriteMainConfig(path.Join(configdir, name)) + if err != nil { + return err + } + } else { + continue + } + } + + return nil +} + +func (e *EtcdClient) WriteMainConfig(path string) error { + // Write main config file in etcd + key := "main" + err := e.WriteConfig(key, path) + return err +} + +func (e *EtcdClient) WriteLabelConfig(label string, path string) error { + // Write label config file in etcd + key := "labels/" + label + err := e.WriteConfig(key, path) + return err +} + +func (e *EtcdClient) WriteHostConfig(host string, path string) error { + // Write host config file in etcd + key := "hosts/" + host + err := e.WriteConfig(key, path) + return err +} + +func (e *EtcdClient) WriteConfig(relative_key string, path string) error { + // Read config file, get conf in tomlformat, convert to json + // Then write it to etcd + // Read file + tbl, err := influxconfig.ParseFile(path) + if err != nil { + return err + } + // Get toml + toml_data := tbl.Source() + // Write it + key := e.Folder + "/" + relative_key + resp, _ := e.Kapi.Get(context.Background(), key, nil) + if resp == nil { + _, err = e.Kapi.Set(context.Background(), key, string(toml_data), nil) + } else { + _, err = e.Kapi.Update(context.Background(), key, string(toml_data)) + } + if err != nil { + log.Fatal(err) + return err + } else { + log.Printf("Config written with key %s\n", key) + } + return nil +} + +//func (e *EtcdClient) ReadConfig(labels []string) (*config.Config, error) { +func (e *EtcdClient) ReadConfig(c *config.Config, labels string) (*config.Config, error) { + // Get default config in etcd + // key = /telegraf/default + key := e.Folder + "/main" + resp, err := e.Kapi.Get(context.Background(), key, nil) + if err != nil { + log.Printf("WARNING: [etcd] %s", err) + } else { + // Put it in toml + tbl, err := toml2table(resp) + if err != nil { + log.Printf("WARNING: [etcd] %s", err) + } + c.LoadConfigFromTable(tbl) + } + + // Get specific host config + // key = /telegraf/hosts/HOSTNAME + hostname, err := os.Hostname() + if err != nil { + log.Printf("WARNING: [etcd] %s", err) + } else if hostname != "" { + key = e.Folder + "/hosts/" + hostname + resp, err := e.Kapi.Get(context.Background(), key, nil) + if err != nil { + log.Printf("WARNING: [etcd] %s", err) + } else { + // Put it in toml + tbl, err := toml2table(resp) + if err != nil { + log.Print(err) + } + c.LoadConfigFromTable(tbl) + } + } + + // Concat labels from etcd and labels from command line + labels_list := c.Agent.Labels + if labels != "" { + labels_list = append(labels_list, strings.Split(labels, ",")...) + } + + // Iterate on all labels + // TODO check label order of importance ? + for _, label := range labels_list { + // Read from etcd + // key = /telegraf/labels/LABEL + key := e.Folder + "/labels/" + label + resp, err := e.Kapi.Get(context.Background(), key, nil) + if err != nil { + log.Print(err) + continue + } + // Put it in toml + tbl, err := toml2table(resp) + if err != nil { + log.Print(err) + continue + } + // Load config + err = c.LoadConfigFromTable(tbl) + if err != nil { + log.Print(err) + } + } + + return c, nil +} + +func toml2table(resp *client.Response) (*ast.Table, error) { + // Convert json to toml + return toml.Parse([]byte(resp.Node.Value)) +} diff --git a/internal/etcd/etcd_test.go b/internal/etcd/etcd_test.go new file mode 100644 index 0000000000000..40824d597e919 --- /dev/null +++ b/internal/etcd/etcd_test.go @@ -0,0 +1,195 @@ +package etcd + +import ( + "golang.org/x/net/context" + "io" + "os" + "syscall" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/outputs" + + "github.com/influxdata/telegraf/plugins/inputs/system" + "github.com/influxdata/telegraf/plugins/outputs/influxdb" + + eclient "github.com/coreos/etcd/client" +) + +func Test1Write(t *testing.T) { + // Delete hostname conf file + hostname, _ := os.Hostname() + os.Remove("./testdata/test1/hosts/" + hostname + ".conf") + // Get etcd client + e := NewEtcdClient("http://localhost:2379", "/telegraf") + // Delete old conf from etcd + delOptions := &eclient.DeleteOptions{ + Recursive: true, + Dir: true, + } + e.Kapi.Delete(context.Background(), "/telegraf", delOptions) + + // Test write dir + err := e.WriteConfigDir("./testdata/test1") + require.NoError(t, err) + resp, _ := e.Kapi.Get(context.Background(), "/telegraf/main", nil) + assert.Equal(t, + "[tags]\n dc = \"us-east-1\"\n\n[agent]\n interval = \"2s\"\n round_interval = true\n flush_interval = \"10s\"\n flush_jitter = \"0s\"\n debug = false\n hostname = \"\"\n", + resp.Node.Value) + resp, _ = e.Kapi.Get(context.Background(), "/telegraf/hosts/localhost", nil) + assert.Equal(t, + "\n[agent]\n interval = \"2s\"\n labels = [\"influx\"]\n\n[[inputs.cpu]]\n percpu = true\n totalcpu = true\n drop = [\"cpu_time*\"]\n", + resp.Node.Value) + + // Test write conf + err = e.WriteLabelConfig("mylabel", "./testdata/test1/labels/network.conf") + require.NoError(t, err) + resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil) + assert.Equal(t, + "[[inputs.net]]\n\n", + resp.Node.Value) + + // Test read + c := config.NewConfig() + var inputFilters []string + var outputFilters []string + c.OutputFilters = outputFilters + c.InputFilters = inputFilters + + net := inputs.Inputs["net"]().(*system.NetIOStats) + influx := outputs.Outputs["influxdb"]().(*influxdb.InfluxDB) + influx.URLs = []string{"http://localhost:8086"} + influx.Database = "telegraf" + influx.Precision = "s" + + c, err = e.ReadConfig(c, "mylabel,influx") + require.NoError(t, err) + assert.Equal(t, net, c.Inputs[0].Input, + "Testdata did not produce a correct net struct.") + assert.Equal(t, influx, c.Outputs[0].Output, + "Testdata did not produce a correct influxdb struct.") + + // Test reload + shutdown := make(chan struct{}) + signals := make(chan os.Signal) + go e.LaunchWatcher(shutdown, signals) + // Test write conf + err = e.WriteLabelConfig("mylabel", "./testdata/test1/labels/network2.conf") + require.NoError(t, err) + resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil) + assert.Equal(t, + "[[inputs.net]]\n\n interfaces = [\"eth0\"]\n\n", + resp.Node.Value) + // TODO found a way to test reload .... + sig := <-signals + assert.Equal(t, syscall.SIGHUP, sig) + +} + +func Test2Error(t *testing.T) { + e := NewEtcdClient("http://localhost:2379", "/telegraf") + + // Test write dir + err := e.WriteConfigDir("./testdata/test2") + require.Error(t, err) +} + +func Test3Write(t *testing.T) { + // Delete old hostname conf file + hostname, _ := os.Hostname() + os.Remove("./testdata/test1/hosts/" + hostname + ".conf") + // Write host file + if hostname != "" { + r, err := os.Open("./testdata/test1/hosts/localhost.conf") + if err != nil { + panic(err) + } + defer r.Close() + + w, err := os.Create("./testdata/test1/hosts/" + hostname + ".conf") + if err != nil { + panic(err) + } + defer w.Close() + + // do the actual work + _, err = io.Copy(w, r) + if err != nil { + panic(err) + } + } + // Get tcd client + e := NewEtcdClient("http://localhost:2379", "/telegraf") + // Delete old conf from etcd + delOptions := &eclient.DeleteOptions{ + Recursive: true, + Dir: true, + } + e.Kapi.Delete(context.Background(), "/telegraf", delOptions) + + // Test write dir + err := e.WriteConfigDir("./testdata/test1") + require.NoError(t, err) + resp, _ := e.Kapi.Get(context.Background(), "/telegraf/main", nil) + assert.Equal(t, + "[tags]\n dc = \"us-east-1\"\n\n[agent]\n interval = \"2s\"\n round_interval = true\n flush_interval = \"10s\"\n flush_jitter = \"0s\"\n debug = false\n hostname = \"\"\n", + resp.Node.Value) + resp, _ = e.Kapi.Get(context.Background(), "/telegraf/hosts/localhost", nil) + assert.Equal(t, + "\n[agent]\n interval = \"2s\"\n labels = [\"influx\"]\n\n[[inputs.cpu]]\n percpu = true\n totalcpu = true\n drop = [\"cpu_time*\"]\n", + resp.Node.Value) + + // Test write conf + err = e.WriteLabelConfig("mylabel", "./testdata/test1/labels/network.conf") + require.NoError(t, err) + resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil) + assert.Equal(t, + "[[inputs.net]]\n\n", + resp.Node.Value) + + // Test read + c := config.NewConfig() + var inputFilters []string + var outputFilters []string + c.OutputFilters = outputFilters + c.InputFilters = inputFilters + + cpu := inputs.Inputs["cpu"]().(*system.CPUStats) + cpu.PerCPU = true + cpu.TotalCPU = true + net := inputs.Inputs["net"]().(*system.NetIOStats) + influx := outputs.Outputs["influxdb"]().(*influxdb.InfluxDB) + influx.URLs = []string{"http://localhost:8086"} + influx.Database = "telegraf" + influx.Precision = "s" + + c, err = e.ReadConfig(c, "mylabel,influx") + require.NoError(t, err) + assert.Equal(t, cpu, c.Inputs[0].Input, + "Testdata did not produce a correct net struct.") + assert.Equal(t, net, c.Inputs[1].Input, + "Testdata did not produce a correct net struct.") + assert.Equal(t, influx, c.Outputs[0].Output, + "Testdata did not produce a correct influxdb struct.") + + // Test reload + shutdown := make(chan struct{}) + signals := make(chan os.Signal) + go e.LaunchWatcher(shutdown, signals) + // Test write conf + err = e.WriteLabelConfig("mylabel", "./testdata/test1/labels/network2.conf") + require.NoError(t, err) + resp, _ = e.Kapi.Get(context.Background(), "/telegraf/labels/mylabel", nil) + assert.Equal(t, + "[[inputs.net]]\n\n interfaces = [\"eth0\"]\n\n", + resp.Node.Value) + // TODO found a way to test reload .... + sig := <-signals + assert.Equal(t, syscall.SIGHUP, sig) + // Delete hostname conf file + os.Remove("./testdata/test1/hosts/" + hostname + ".conf") +} diff --git a/internal/etcd/testdata/test1/hosts/localhost.conf b/internal/etcd/testdata/test1/hosts/localhost.conf new file mode 100644 index 0000000000000..d157dd2f52478 --- /dev/null +++ b/internal/etcd/testdata/test1/hosts/localhost.conf @@ -0,0 +1,9 @@ + +[agent] + interval = "2s" + labels = ["influx"] + +[[inputs.cpu]] + percpu = true + totalcpu = true + drop = ["cpu_time*"] diff --git a/internal/etcd/testdata/test1/labels/influx.conf b/internal/etcd/testdata/test1/labels/influx.conf new file mode 100644 index 0000000000000..100ca02647485 --- /dev/null +++ b/internal/etcd/testdata/test1/labels/influx.conf @@ -0,0 +1,4 @@ +[[outputs.influxdb]] + urls = ["http://localhost:8086"] + database = "telegraf" + precision = "s" diff --git a/internal/etcd/testdata/test1/labels/network.conf b/internal/etcd/testdata/test1/labels/network.conf new file mode 100644 index 0000000000000..848a9702f0ee6 --- /dev/null +++ b/internal/etcd/testdata/test1/labels/network.conf @@ -0,0 +1,2 @@ +[[inputs.net]] + diff --git a/internal/etcd/testdata/test1/labels/network2.conf b/internal/etcd/testdata/test1/labels/network2.conf new file mode 100644 index 0000000000000..49ad3d94a17c9 --- /dev/null +++ b/internal/etcd/testdata/test1/labels/network2.conf @@ -0,0 +1,4 @@ +[[inputs.net]] + + interfaces = ["eth0"] + diff --git a/internal/etcd/testdata/test1/main.conf b/internal/etcd/testdata/test1/main.conf new file mode 100644 index 0000000000000..0adf151a8906b --- /dev/null +++ b/internal/etcd/testdata/test1/main.conf @@ -0,0 +1,10 @@ +[tags] + dc = "us-east-1" + +[agent] + interval = "2s" + round_interval = true + flush_interval = "10s" + flush_jitter = "0s" + debug = false + hostname = "" diff --git a/internal/etcd/testdata/test2/main.conf b/internal/etcd/testdata/test2/main.conf new file mode 100644 index 0000000000000..9e85000fefd88 --- /dev/null +++ b/internal/etcd/testdata/test2/main.conf @@ -0,0 +1,12 @@ +[tags] + dc = "us-east-1" + +[agent] + interval = "2s" + round_interval = true + flush_interval = "10s" + flush_jitter = "0s" + debug = false + hostname = "" + +sdag: sdg