Skip to content

Commit

Permalink
[WIP] Etcd integration for configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
titilambert committed Jan 17, 2016
1 parent c13927f commit 121fd61
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 17 deletions.
63 changes: 46 additions & 17 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/influxdb/telegraf"
"github.com/influxdb/telegraf/internal/config"
"github.com/influxdb/telegraf/internal/etcd"
_ "github.com/influxdb/telegraf/plugins/inputs/all"
_ "github.com/influxdb/telegraf/plugins/outputs/all"
)
Expand All @@ -20,6 +21,10 @@ var fDebug = flag.Bool("debug", false,
var fQuiet = flag.Bool("quiet", false,
"run in quiet mode")
var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit")
var fEtcd = flag.String("etcd", "", "etcd url where configuration is stored")
var fEtcdSendConfig = flag.String("etcdwriteconfig", "", "store the following config file to etcd")
var fEtcdSendLabel = flag.String("etcdwritelabel", "", "store config file to etcd with this label")
var fEtcdReadLabels = flag.String("etcdreadlabels", "", "read config from etcd using labels (comma-separated)")
var fConfig = flag.String("config", "", "configuration file to load")
var fConfigDirectory = flag.String("config-directory", "",
"directory containing additional *.conf files")
Expand Down Expand Up @@ -83,10 +88,29 @@ Examples:
`

func main() {
// Prepare signals handling
reload := make(chan bool, 1)
reload <- true
shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)
go func() {
sig := <-signals
if sig == os.Interrupt {
close(shutdown)
} else if sig == syscall.SIGHUP {
log.Print("Reloading Telegraf config\n")
<-reload
reload <- true
close(shutdown)
}
}()

for <-reload {
// Reset signal handler vars
shutdown = make(chan struct{})
reload <- false

flag.Usage = usageExit
flag.Parse()

Expand Down Expand Up @@ -136,10 +160,31 @@ func main() {

var (
c *config.Config
e *etcd.EtcdClient
err error
)

if *fConfig != "" {
if *fEtcd != "" {
e = etcd.NewEtcdClient(*fEtcd, "/telegraf")
if *fEtcdSendConfig != "" && *fEtcdSendLabel != "" {
// TODO check config format before write it
c = config.NewConfig()
c.OutputFilters = outputFilters
c.InputFilters = inputFilters
err = c.LoadConfig(*fEtcdSendConfig)
if err != nil {
log.Fatal(err)
return
}
// Write config to etcd
e.WriteConfig(*fEtcdSendLabel, *fEtcdSendConfig)
return
} else if *fEtcdReadLabels != "" {
c, _ = e.ReadConfig(*fEtcdReadLabels)
go e.LaunchWatcher(shutdown, reload)
log.Printf("Config read from etcd with labels %s\n", *fEtcdReadLabels)
}
} else if *fConfig != "" {
c = config.NewConfig()
c.OutputFilters = outputFilters
c.InputFilters = inputFilters
Expand Down Expand Up @@ -199,22 +244,6 @@ func main() {
log.Fatal(err)
}

shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)
go func() {
sig := <-signals
if sig == os.Interrupt {
close(shutdown)
}
if sig == syscall.SIGHUP {
log.Printf("Reloading Telegraf config\n")
<-reload
reload <- true
close(shutdown)
}
}()

log.Printf("Starting Telegraf (version %s)\n", Version)
log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
log.Printf("Loaded plugins: %s", strings.Join(c.InputNames(), " "))
Expand Down
4 changes: 4 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,11 @@ func (c *Config) LoadConfig(path string) error {
if err != nil {
return err
}
err = c.LoadConfigFromText(data)
return err
}

func (c *Config) LoadConfigFromText(data []byte) error {
tbl, err := toml.Parse(data)
if err != nil {
return err
Expand Down
174 changes: 174 additions & 0 deletions internal/etcd/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package etcd

import (
"bytes"
"encoding/json"
"golang.org/x/net/context"
"io/ioutil"
"log"
"os"
"strings"

"github.com/BurntSushi/toml"
"github.com/coreos/etcd/client"

"github.com/influxdb/telegraf/internal/config"
)

type EtcdClient struct {
Kapi client.KeysAPI
Folder string
}

func (e *EtcdClient) LaunchWatcher(shutdown chan struct{}, reload chan bool) {
// TODO: All telegraf client will reload for each changes...
// Maybe we want to reload on those we need to ???
// So we need to create a watcher by labels ??
watcherOpts := client.WatcherOptions{AfterIndex: 0, Recursive: true}
w := e.Kapi.Watcher(e.Folder, &watcherOpts)
r, err := w.Next(context.Background())
if err != nil {
// TODO What we have to do here ????
log.Fatal("Error occurred", err)
}
if r.Action == "set" || r.Action == "update" {
// do something with Response r here
log.Printf("Changes detected in etcd (%s action detected)\n", r.Action)
log.Print("Reloading Telegraf config\n")
<-reload
reload <- true
close(shutdown)
}
}

func NewEtcdClient(urls string, folder string) *EtcdClient {

cfg := client.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
Transport: client.DefaultTransport,
}

e := &EtcdClient{}
c, err := client.New(cfg)
if err != nil {
log.Fatal(err)
}
kapi := client.NewKeysAPI(c)

e.Kapi = kapi
e.Folder = folder

return e
}

func (e *EtcdClient) Connect() error {
//c, err := eclient.New(cfg)
return nil
}

func (e *EtcdClient) WriteConfig(label string, path string) error {
// Read config file, get conf in tomlformat, convert to json
// Then write it to etcd
// TODO: Maybe we just want to store toml in etcd ? Is json really needed ????
// Read file
raw_data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
// Get toml
var data interface{}
_, err = toml.Decode(string(raw_data), &data)
if err != nil {
return err
}
// Get json
json_data, _ := json.Marshal(&data)
// Write it
key := e.Folder + "/labels/" + label
resp, _ := e.Kapi.Get(context.Background(), key, nil)
if resp == nil {
_, err = e.Kapi.Set(context.Background(), key, string(json_data), nil)
} else {
_, err = e.Kapi.Update(context.Background(), key, string(json_data))
}
if err != nil {
log.Fatal(err)
return err
} else {
log.Printf("Config written with label %s\n", label)
}
return nil
}

//func (e *EtcdClient) ReadConfig(labels []string) (*config.Config, error) {
func (e *EtcdClient) ReadConfig(labels string) (*config.Config, error) {
c := config.NewConfig()

// Get default config in etcd
// key = /telegraf/default
key := e.Folder + "/default"
resp, err := e.Kapi.Get(context.Background(), key, nil)
if err != nil {
log.Printf("WARNING: [etcd] %s", err)
} else {
// Put it in toml
data, err := json2toml(resp)
if err != nil {
log.Printf("WARNING: [etcd] %s", err)
}
c.LoadConfigFromText(data)
}

// Get specific host config
// key = /telegraf/hosts/HOSTNAME
hostname, err := os.Hostname()
if err != nil {
log.Printf("WARNING: [etcd] %s", err)
} else if hostname != "" {
key = e.Folder + "/hosts/" + hostname
resp, err := e.Kapi.Get(context.Background(), key, nil)
if err != nil {
log.Printf("WARNING: [etcd] %s", err)
} else {
// Put it in toml
data, err := json2toml(resp)
if err != nil {
log.Print(err)
}
c.LoadConfigFromText(data)
}
}

// Iterate on all labels
// TODO check label order of importance ?
for _, label := range strings.Split(labels, ",") {
// Read from etcd
// key = /telegraf/labels/LABEL
key := e.Folder + "/labels/" + label
resp, err := e.Kapi.Get(context.Background(), key, nil)
if err != nil {
log.Fatal(err)
continue
}
// Put it in toml
data, err := json2toml(resp)
if err != nil {
log.Fatal(err)
continue
}
// Load config
c.LoadConfigFromText(data)
}

return c, nil
}

func json2toml(resp *client.Response) ([]byte, error) {
var json_data interface{}
var data []byte
json.Unmarshal([]byte(resp.Node.Value), &json_data)
buf := new(bytes.Buffer)
err := toml.NewEncoder(buf).Encode(json_data)
data = buf.Bytes()
return data, err
}

0 comments on commit 121fd61

Please sign in to comment.