Skip to content

Commit

Permalink
[WIP] Etcd integration for configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
titilambert committed Jan 21, 2016
1 parent f24f5e9 commit f4580bd
Show file tree
Hide file tree
Showing 11 changed files with 524 additions and 68 deletions.
2 changes: 2 additions & 0 deletions Godeps
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git dbd8d5c40a582eb9adacde36b47932b3a3ad0034
github.com/BurntSushi/toml 5c4df71dfe9ac89ef6287afc05e4c1b16ae65a1e
github.com/Shopify/sarama d37c73f2b2bce85f7fa16b6a550d26c5372892ef
github.com/Sirupsen/logrus f7f79f729e0fbe2fcc061db48a9ba0263f588252
github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339
Expand All @@ -7,6 +8,7 @@ github.com/aws/aws-sdk-go 3ad0b07b44c22c21c734d1094981540b7a11e942
github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d
github.com/boltdb/bolt 6465994716bf6400605746e79224cf1e7ed68725
github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99
github.com/coreos/etcd bdee27b19e8601ffd7bd4f0481abe9bbae04bd09
github.com/dancannon/gorethink ff457cac6a529d9749d841a733d76e8305cba3c8
github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
Expand Down
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ endif
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379

# Run docker containers necessary for CircleCI unit tests
docker-run-circle:
Expand All @@ -65,11 +66,12 @@ docker-run-circle:
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379

# Kill all docker containers, ignore errors
docker-kill:
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann etcd
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann etcd

# Run full unit tests using docker containers (includes setup and teardown)
test: docker-kill docker-run
Expand Down
205 changes: 139 additions & 66 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/etcd"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
)
Expand All @@ -20,6 +21,11 @@ var fDebug = flag.Bool("debug", false,
var fQuiet = flag.Bool("quiet", false,
"run in quiet mode")
var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit")
var fEtcd = flag.String("etcd", "", "etcd url where configuration is stored")
var fEtcdSendConfigDir = flag.String("etcdwriteconfigdir", "", "store the following config dir to etcd")
var fEtcdSendConfig = flag.String("etcdwriteconfig", "", "store the following config file to etcd")
var fEtcdSendLabel = flag.String("etcdwritelabel", "", "store config file to etcd with this label")
var fEtcdReadLabels = flag.String("etcdreadlabels", "", "read config from etcd using labels (comma-separated)")
var fConfig = flag.String("config", "", "configuration file to load")
var fConfigDirectory = flag.String("config-directory", "",
"directory containing additional *.conf files")
Expand Down Expand Up @@ -53,16 +59,20 @@ Usage:
The flags are:
-config <file> configuration file to load
-test gather metrics once, print them to stdout, and exit
-sample-config print out full sample configuration to stdout
-config-directory directory containing additional *.conf files
-input-filter filter the input plugins to enable, separator is :
-output-filter filter the output plugins to enable, separator is :
-usage print usage for a plugin, ie, 'telegraf -usage mysql'
-debug print metrics as they're generated to stdout
-quiet run in quiet mode
-version print the version to stdout
-config <file> configuration file to load
-test gather metrics once, print them to stdout, and exit
-sample-config print out full sample configuration to stdout
-config-directory directory containing additional *.conf files
-etcdwriteconfigdir store the following config dir to etcd
-etcdwriteconfig store the following config file to etcd
-etcdwritelabel store config file to etcd with this label
-etcdreadlabels read config from etcd using labels (comma-separated)
-input-filter filter the input plugins to enable, separator is :
-output-filter filter the output plugins to enable, separator is :
-usage print usage for a plugin, ie, 'telegraf -usage mysql'
-debug print metrics as they're generated to stdout
-quiet run in quiet mode
-version print the version to stdout
Examples:
Expand All @@ -83,63 +93,138 @@ Examples:
`

func main() {
// Read flags
flag.Usage = usageExit
flag.Parse()
if flag.NFlag() == 0 {
usageExit()
}

// Prepare signals handling
reload := make(chan bool, 1)
reload <- true
for <-reload {
reload <- false
flag.Usage = usageExit
flag.Parse()

if flag.NFlag() == 0 {
usageExit()
shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)

// Prepare etcd if needed
var e *etcd.EtcdClient
if *fEtcd != "" {
e = etcd.NewEtcdClient(*fEtcd, "/telegraf")
if *fEtcdSendConfig == "" && *fEtcdSendLabel == "" && *fEtcdSendConfigDir == "" {
go e.LaunchWatcher(shutdown, signals)
}
}

var inputFilters []string
if *fInputFiltersLegacy != "" {
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}
if *fInputFilters != "" {
inputFilter := strings.TrimSpace(*fInputFilters)
inputFilters = strings.Split(":"+inputFilter+":", ":")
// Handle signals
go func() {
for {
sig := <-signals
if sig == os.Interrupt {
close(shutdown)
} else if sig == syscall.SIGHUP {
log.Print("Reloading Telegraf config\n")
<-reload
reload <- true
close(shutdown)
}
}
}()

var outputFilters []string
if *fOutputFiltersLegacy != "" {
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
if *fOutputFilters != "" {
outputFilter := strings.TrimSpace(*fOutputFilters)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
// Prepare inputs
var inputFilters []string
if *fInputFiltersLegacy != "" {
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}
if *fInputFilters != "" {
inputFilter := strings.TrimSpace(*fInputFilters)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}

if *fVersion {
v := fmt.Sprintf("Telegraf - Version %s", Version)
fmt.Println(v)
return
}
// Prepare outputs
var outputFilters []string
if *fOutputFiltersLegacy != "" {
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
if *fOutputFilters != "" {
outputFilter := strings.TrimSpace(*fOutputFilters)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}

if *fSampleConfig {
config.PrintSampleConfig(inputFilters, outputFilters)
return
}
// Print version
if *fVersion {
v := fmt.Sprintf("Telegraf - Version %s", Version)
fmt.Println(v)
return
}

if *fUsage != "" {
if err := config.PrintInputConfig(*fUsage); err != nil {
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
log.Fatalf("%s and %s", err, err2)
}
// Print sample config
if *fSampleConfig {
config.PrintSampleConfig(inputFilters, outputFilters)
return
}

// Print usage
if *fUsage != "" {
if err := config.PrintInputConfig(*fUsage); err != nil {
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
log.Fatalf("%s and %s", err, err2)
}
return
}
return
}

for <-reload {
// Reset signal handler vars
shutdown = make(chan struct{})
reload <- false

// Prepare config
var (
c *config.Config
err error
)

if *fConfig != "" {
if *fEtcd != "" {
c = config.NewConfig()
c.OutputFilters = outputFilters
c.InputFilters = inputFilters
if *fEtcdSendConfigDir != "" {
// TODO check config format before write it
// Write config dir to etcd
err = c.LoadDirectory(*fEtcdSendConfigDir)
if err != nil {
log.Fatal(err)
}
err = e.WriteConfigDir(*fEtcdSendConfigDir)
if err != nil {
log.Fatal(err)
}
return
} else if *fEtcdSendConfig != "" && *fEtcdSendLabel != "" {
// TODO check config format before write it
// Write config to etcd
err = c.LoadConfig(*fEtcdSendConfig)
if err != nil {
log.Fatal(err)
}
err = e.WriteLabelConfig(*fEtcdSendLabel, *fEtcdSendConfig)
if err != nil {
log.Fatal(err)
}
return
} else {
// Read config to etcd
log.Printf("Config read from etcd with labels %s\n", *fEtcdReadLabels)
c, err = e.ReadConfig(c, *fEtcdReadLabels)
if err != nil {
log.Fatal(err)
}
}
} else if *fConfig != "" {
// Read config from file
c = config.NewConfig()
c.OutputFilters = outputFilters
c.InputFilters = inputFilters
Expand All @@ -148,24 +233,28 @@ func main() {
log.Fatal(err)
}
} else {
// Print usage
fmt.Println("Usage: Telegraf")
flag.PrintDefaults()
return
}

// Read config dir
if *fConfigDirectoryLegacy != "" {
err = c.LoadDirectory(*fConfigDirectoryLegacy)
if err != nil {
log.Fatal(err)
}
}

// Read config dir
if *fConfigDirectory != "" {
err = c.LoadDirectory(*fConfigDirectory)
if err != nil {
log.Fatal(err)
}
}
// check config
if len(c.Outputs) == 0 {
log.Fatalf("Error: no outputs found, did you provide a valid config file?")
}
Expand Down Expand Up @@ -199,22 +288,6 @@ func main() {
log.Fatal(err)
}

shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)
go func() {
sig := <-signals
if sig == os.Interrupt {
close(shutdown)
}
if sig == syscall.SIGHUP {
log.Printf("Reloading Telegraf config\n")
<-reload
reload <- true
close(shutdown)
}
}()

log.Printf("Starting Telegraf (version %s)\n", Version)
log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
log.Printf("Loaded inputs: %s", strings.Join(c.InputNames(), " "))
Expand Down
8 changes: 8 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func NewConfig() *Config {
Agent: &AgentConfig{
Interval: internal.Duration{Duration: 10 * time.Second},
RoundInterval: true,
Labels: make([]string, 0),
FlushInterval: internal.Duration{Duration: 10 * time.Second},
FlushRetries: 2,
FlushJitter: internal.Duration{Duration: 5 * time.Second},
Expand Down Expand Up @@ -91,6 +92,9 @@ type AgentConfig struct {
// Quiet is the option for running in quiet mode
Quiet bool
Hostname string

// Etcd labels
Labels []string
}

// TagFilter is the name of a tag, and the values on which to filter
Expand Down Expand Up @@ -446,7 +450,11 @@ func (c *Config) LoadConfig(path string) error {
if err != nil {
return err
}
err = c.LoadConfigFromText(data)
return err
}

func (c *Config) LoadConfigFromText(data []byte) error {
tbl, err := toml.Parse(data)
if err != nil {
return err
Expand Down
Loading

0 comments on commit f4580bd

Please sign in to comment.