Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Etcd integration for configuration #651

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ github.com/couchbase/go-couchbase cb664315a324d87d19c879d9cc67fda6be8c2ac1
github.com/couchbase/gomemcached a5ea6356f648fec6ab89add00edd09151455b4b2
github.com/couchbase/goutils 5823a0cbaaa9008406021dc5daf80125ea30bba6
github.com/dancannon/gorethink e7cac92ea2bc52638791a021f212145acfedb1fc
github.com/coreos/etcd bdee27b19e8601ffd7bd4f0481abe9bbae04bd09
github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367
Expand Down
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ endif
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim
docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379

# Run docker containers necessary for CircleCI unit tests
docker-run-circle:
Expand All @@ -85,11 +86,12 @@ docker-run-circle:
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim
docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379

# Kill all docker containers, ignore errors
docker-kill:
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd

# Run full unit tests using docker containers (includes setup and teardown)
test: vet docker-kill docker-run
Expand Down
274 changes: 184 additions & 90 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/etcd"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
Expand All @@ -22,6 +23,13 @@ var fDebug = flag.Bool("debug", false,
var fQuiet = flag.Bool("quiet", false,
"run in quiet mode")
var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit")
var fEtcd = flag.String("etcd", "", "etcd urls where configuration is stored (comma separated)")
var fEtcdFolder = flag.String("etcdfolder", "/telegraf", "etcd root folder where configuration is stored")
var fEtcdSendConfigDir = flag.String("etcdwriteconfigdir", "", "store the following config dir to etcd")
var fEtcdSendConfig = flag.String("etcdwriteconfig", "", "store the following config file to etcd")
var fEtcdEraseConfig = flag.Bool("etcderaseconfig", false, "erase all telegraf config in etcd")
var fEtcdSendLabel = flag.String("etcdwritelabel", "", "store config file to etcd with this label")
var fEtcdReadLabels = flag.String("etcdreadlabels", "", "read config from etcd using labels (comma-separated)")
var fConfig = flag.String("config", "", "configuration file to load")
var fConfigDirectory = flag.String("config-directory", "",
"directory containing additional *.conf files")
Expand Down Expand Up @@ -58,18 +66,25 @@ Usage:

The flags are:

-config <file> configuration file to load
-test gather metrics once, print them to stdout, and exit
-sample-config print out full sample configuration to stdout
-config-directory directory containing additional *.conf files
-input-filter filter the input plugins to enable, separator is :
-input-list print all the plugins inputs
-output-filter filter the output plugins to enable, separator is :
-output-list print all the available outputs
-usage print usage for a plugin, ie, 'telegraf -usage mysql'
-debug print metrics as they're generated to stdout
-quiet run in quiet mode
-version print the version to stdout
-config <file> configuration file to load
-test gather metrics once, print them to stdout, and exit
-sample-config print out full sample configuration to stdout
-config-directory directory containing additional *.conf files
-etcd etcd urls where configuration is stored (comma separated)
-etcdfolder etcd folder where configuration is stored and read
-etcdwriteconfigdir store the following config dir to etcd
-etcdwriteconfig store the following config file to etcd
-etcdwritelabel store config file to etcd with this label
-etcdreadlabels read config from etcd using labels (comma-separated)
-etcderaseconfig erase all telegraf config in etcd
-input-filter filter the input plugins to enable, separator is :
-input-list print all the plugins inputs
-output-filter filter the output plugins to enable, separator is :
-output-list print all the available outputs
-usage print usage for a plugin, ie, 'telegraf -usage mysql'
-debug print metrics as they're generated to stdout
-quiet run in quiet mode
-version print the version to stdout

Examples:

Expand All @@ -90,92 +105,184 @@ Examples:
`

func main() {
// Read flags
flag.Usage = func() { usageExit(0) }
flag.Parse()
args := flag.Args()
if flag.NFlag() == 0 && len(args) == 0 {
usageExit(0)
}

// Prepare signals handling
reload := make(chan bool, 1)
reload <- true
for <-reload {
reload <- false
flag.Usage = func() { usageExit(0) }
flag.Parse()
args := flag.Args()

if flag.NFlag() == 0 && len(args) == 0 {
usageExit(0)
}

var inputFilters []string
if *fInputFiltersLegacy != "" {
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}
if *fInputFilters != "" {
inputFilter := strings.TrimSpace(*fInputFilters)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}

var outputFilters []string
if *fOutputFiltersLegacy != "" {
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
if *fOutputFilters != "" {
outputFilter := strings.TrimSpace(*fOutputFilters)
outputFilters = strings.Split(":"+outputFilter+":", ":")
shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)

// Prepare etcd if needed
var e *etcd.EtcdClient
if *fEtcd != "" {
e = etcd.NewEtcdClient(*fEtcd, *fEtcdFolder)
if *fEtcdSendConfig == "" && *fEtcdSendLabel == "" && *fEtcdSendConfigDir == "" {
go e.LaunchWatcher(shutdown, signals)
}
}

if len(args) > 0 {
switch args[0] {
case "version":
v := fmt.Sprintf("Telegraf - Version %s", Version)
fmt.Println(v)
return
case "config":
config.PrintSampleConfig(inputFilters, outputFilters)
return
// Handle signals
go func() {
for {
sig := <-signals
if sig == os.Interrupt {
close(shutdown)
} else if sig == syscall.SIGHUP {
log.Print("Reloading Telegraf config\n")
<-reload
reload <- true
close(shutdown)
}
}
}()

if *fOutputList {
fmt.Println("Available Output Plugins:")
for k, _ := range outputs.Outputs {
fmt.Printf(" %s\n", k)
}
return
}
// Prepare inputs
var inputFilters []string
if *fInputFiltersLegacy != "" {
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}
if *fInputFilters != "" {
inputFilter := strings.TrimSpace(*fInputFilters)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}

if *fInputList {
fmt.Println("Available Input Plugins:")
for k, _ := range inputs.Inputs {
fmt.Printf(" %s\n", k)
}
return
}
// Prepare outputs
var outputFilters []string
if *fOutputFiltersLegacy != "" {
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
if *fOutputFilters != "" {
outputFilter := strings.TrimSpace(*fOutputFilters)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}

if *fVersion {
if len(args) > 0 {
switch args[0] {
case "version":
v := fmt.Sprintf("Telegraf - Version %s", Version)
fmt.Println(v)
return
}

if *fSampleConfig {
case "config":
config.PrintSampleConfig(inputFilters, outputFilters)
return
}
}

if *fUsage != "" {
if err := config.PrintInputConfig(*fUsage); err != nil {
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
log.Fatalf("%s and %s", err, err2)
}
if *fOutputList {
fmt.Println("Available Output Plugins:")
for k, _ := range outputs.Outputs {
fmt.Printf(" %s\n", k)
}
return
}

if *fInputList {
fmt.Println("Available Input Plugins:")
for k, _ := range inputs.Inputs {
fmt.Printf(" %s\n", k)
}
return
}

// Print version
if *fVersion {
v := fmt.Sprintf("Telegraf - Version %s", Version)
fmt.Println(v)
return
}

// Print sample config
if *fSampleConfig {
config.PrintSampleConfig(inputFilters, outputFilters)
return
}

// Print usage
if *fUsage != "" {
if err := config.PrintInputConfig(*fUsage); err != nil {
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
log.Fatalf("%s and %s", err, err2)
}
return
}
return
}

for <-reload {
// Reset signal handler vars
shutdown = make(chan struct{})
reload <- false

// Prepare config
var (
c *config.Config
err error
)

if *fConfig != "" {
if *fEtcd != "" {
c = config.NewConfig()
c.OutputFilters = outputFilters
c.InputFilters = inputFilters

if *fEtcdSendConfigDir != "" {
// TODO check config format before write it
// Erase config in etcd
if *fEtcdEraseConfig {
err = e.DeleteConfig("")
if err != nil {
err = fmt.Errorf("Error erasing Telegraf Etcd Config: %s", err)
log.Fatal(err)
}
}
// Write config dir to etcd
err = c.LoadDirectory(*fEtcdSendConfigDir)
if err != nil {
log.Fatal(err)
}
err = e.WriteConfigDir(*fEtcdSendConfigDir)
if err != nil {
log.Fatal(err)
}
return
} else if *fEtcdSendConfig != "" && *fEtcdSendLabel != "" {
// TODO check config format before write it
// Write config to etcd
err = c.LoadConfig(*fEtcdSendConfig)
if err != nil {
log.Fatal(err)
}
err = e.WriteLabelConfig(*fEtcdSendLabel, *fEtcdSendConfig)
if err != nil {
log.Fatal(err)
}
return
} else if *fEtcdEraseConfig {
// Erase config in etcd
err = e.DeleteConfig("")
if err != nil {
err = fmt.Errorf("Error erasing Telegraf Etcd Config: %s", err)
log.Fatal(err)
}
return
} else {
// Read config to etcd
log.Printf("Config read from etcd with labels %s\n", *fEtcdReadLabels)
c, err = e.ReadConfig(c, *fEtcdReadLabels)
if err != nil {
log.Fatal(err)
}
}
} else if *fConfig != "" {
// Read config from file
c = config.NewConfig()
c.OutputFilters = outputFilters
c.InputFilters = inputFilters
Expand All @@ -188,19 +295,22 @@ func main() {
os.Exit(1)
}

// Read config dir
if *fConfigDirectoryLegacy != "" {
err = c.LoadDirectory(*fConfigDirectoryLegacy)
if err != nil {
log.Fatal(err)
}
}

// Read config dir
if *fConfigDirectory != "" {
err = c.LoadDirectory(*fConfigDirectory)
if err != nil {
log.Fatal(err)
}
}
// check config
if len(c.Outputs) == 0 {
log.Fatalf("Error: no outputs found, did you provide a valid config file?")
}
Expand Down Expand Up @@ -234,22 +344,6 @@ func main() {
log.Fatal(err)
}

shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)
go func() {
sig := <-signals
if sig == os.Interrupt {
close(shutdown)
}
if sig == syscall.SIGHUP {
log.Printf("Reloading Telegraf config\n")
<-reload
reload <- true
close(shutdown)
}
}()

log.Printf("Starting Telegraf (version %s)\n", Version)
log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
log.Printf("Loaded inputs: %s", strings.Join(c.InputNames(), " "))
Expand Down
Loading