Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Etcd integration for configuration #465

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Godeps
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git dbd8d5c40a582eb9adacde36b47932b3a3ad0034
github.com/BurntSushi/toml 5c4df71dfe9ac89ef6287afc05e4c1b16ae65a1e
github.com/Shopify/sarama d37c73f2b2bce85f7fa16b6a550d26c5372892ef
github.com/Sirupsen/logrus f7f79f729e0fbe2fcc061db48a9ba0263f588252
github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339
Expand All @@ -8,6 +9,7 @@ github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d
github.com/boltdb/bolt ee4a0888a9abe7eefe5a0992ca4cb06864839873
github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99
github.com/dancannon/gorethink 6f088135ff288deb9d5546f4c71919207f891a70
github.com/coreos/etcd bdee27b19e8601ffd7bd4f0481abe9bbae04bd09
github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367
Expand Down
11 changes: 9 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ endif
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim
docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379

# Run docker containers necessary for CircleCI unit tests
docker-run-circle:
Expand All @@ -97,11 +98,17 @@ docker-run-circle:
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim
docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379

# Kill all docker containers, ignore errors
docker-kill:
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd

# Kill all docker containers, ignore errors
docker-kill:
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann etcd
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann etcd

# Run full unit tests using docker containers (includes setup and teardown)
test: docker-kill docker-run
Expand Down
204 changes: 138 additions & 66 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/etcd"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
)
Expand All @@ -20,6 +21,11 @@ var fDebug = flag.Bool("debug", false,
var fQuiet = flag.Bool("quiet", false,
"run in quiet mode")
var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit")
var fEtcd = flag.String("etcd", "", "etcd url where configuration is stored")
var fEtcdSendConfigDir = flag.String("etcdwriteconfigdir", "", "store the following config dir to etcd")
var fEtcdSendConfig = flag.String("etcdwriteconfig", "", "store the following config file to etcd")
var fEtcdSendLabel = flag.String("etcdwritelabel", "", "store config file to etcd with this label")
var fEtcdReadLabels = flag.String("etcdreadlabels", "", "read config from etcd using labels (comma-separated)")
var fConfig = flag.String("config", "", "configuration file to load")
var fConfigDirectory = flag.String("config-directory", "",
"directory containing additional *.conf files")
Expand Down Expand Up @@ -53,16 +59,20 @@ Usage:

The flags are:

-config <file> configuration file to load
-test gather metrics once, print them to stdout, and exit
-sample-config print out full sample configuration to stdout
-config-directory directory containing additional *.conf files
-input-filter filter the input plugins to enable, separator is :
-output-filter filter the output plugins to enable, separator is :
-usage print usage for a plugin, ie, 'telegraf -usage mysql'
-debug print metrics as they're generated to stdout
-quiet run in quiet mode
-version print the version to stdout
-config <file> configuration file to load
-test gather metrics once, print them to stdout, and exit
-sample-config print out full sample configuration to stdout
-config-directory directory containing additional *.conf files
-etcdwriteconfigdir store the following config dir to etcd
-etcdwriteconfig store the following config file to etcd
-etcdwritelabel store config file to etcd with this label
-etcdreadlabels read config from etcd using labels (comma-separated)
-input-filter filter the input plugins to enable, separator is :
-output-filter filter the output plugins to enable, separator is :
-usage print usage for a plugin, ie, 'telegraf -usage mysql'
-debug print metrics as they're generated to stdout
-quiet run in quiet mode
-version print the version to stdout

Examples:

Expand All @@ -83,63 +93,138 @@ Examples:
`

func main() {
// Read flags
flag.Usage = func() { usageExit(0) }
flag.Parse()
if flag.NFlag() == 0 {
usageExit(0)
}

// Prepare signals handling
reload := make(chan bool, 1)
reload <- true
for <-reload {
reload <- false
flag.Usage = func() { usageExit(0) }
flag.Parse()

if flag.NFlag() == 0 {
usageExit(0)
shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)

// Prepare etcd if needed
var e *etcd.EtcdClient
if *fEtcd != "" {
e = etcd.NewEtcdClient(*fEtcd, "/telegraf")
if *fEtcdSendConfig == "" && *fEtcdSendLabel == "" && *fEtcdSendConfigDir == "" {
go e.LaunchWatcher(shutdown, signals)
}
}

var inputFilters []string
if *fInputFiltersLegacy != "" {
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}
if *fInputFilters != "" {
inputFilter := strings.TrimSpace(*fInputFilters)
inputFilters = strings.Split(":"+inputFilter+":", ":")
// Handle signals
go func() {
for {
sig := <-signals
if sig == os.Interrupt {
close(shutdown)
} else if sig == syscall.SIGHUP {
log.Print("Reloading Telegraf config\n")
<-reload
reload <- true
close(shutdown)
}
}
}()

var outputFilters []string
if *fOutputFiltersLegacy != "" {
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
if *fOutputFilters != "" {
outputFilter := strings.TrimSpace(*fOutputFilters)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
// Prepare inputs
var inputFilters []string
if *fInputFiltersLegacy != "" {
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}
if *fInputFilters != "" {
inputFilter := strings.TrimSpace(*fInputFilters)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}

if *fVersion {
v := fmt.Sprintf("Telegraf - Version %s", Version)
fmt.Println(v)
return
}
// Prepare outputs
var outputFilters []string
if *fOutputFiltersLegacy != "" {
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
if *fOutputFilters != "" {
outputFilter := strings.TrimSpace(*fOutputFilters)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}

if *fSampleConfig {
config.PrintSampleConfig(inputFilters, outputFilters)
return
}
// Print version
if *fVersion {
v := fmt.Sprintf("Telegraf - Version %s", Version)
fmt.Println(v)
return
}

if *fUsage != "" {
if err := config.PrintInputConfig(*fUsage); err != nil {
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
log.Fatalf("%s and %s", err, err2)
}
// Print sample config
if *fSampleConfig {
config.PrintSampleConfig(inputFilters, outputFilters)
return
}

// Print usage
if *fUsage != "" {
if err := config.PrintInputConfig(*fUsage); err != nil {
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
log.Fatalf("%s and %s", err, err2)
}
return
}
return
}

for <-reload {
// Reset signal handler vars
shutdown = make(chan struct{})
reload <- false

// Prepare config
var (
c *config.Config
err error
)

if *fConfig != "" {
if *fEtcd != "" {
c = config.NewConfig()
c.OutputFilters = outputFilters
c.InputFilters = inputFilters
if *fEtcdSendConfigDir != "" {
// TODO check config format before write it
// Write config dir to etcd
err = c.LoadDirectory(*fEtcdSendConfigDir)
if err != nil {
log.Fatal(err)
}
err = e.WriteConfigDir(*fEtcdSendConfigDir)
if err != nil {
log.Fatal(err)
}
return
} else if *fEtcdSendConfig != "" && *fEtcdSendLabel != "" {
// TODO check config format before write it
// Write config to etcd
err = c.LoadConfig(*fEtcdSendConfig)
if err != nil {
log.Fatal(err)
}
err = e.WriteLabelConfig(*fEtcdSendLabel, *fEtcdSendConfig)
if err != nil {
log.Fatal(err)
}
return
} else {
// Read config to etcd
log.Printf("Config read from etcd with labels %s\n", *fEtcdReadLabels)
c, err = e.ReadConfig(c, *fEtcdReadLabels)
if err != nil {
log.Fatal(err)
}
}
} else if *fConfig != "" {
// Read config from file
c = config.NewConfig()
c.OutputFilters = outputFilters
c.InputFilters = inputFilters
Expand All @@ -152,19 +237,22 @@ func main() {
os.Exit(1)
}

// Read config dir
if *fConfigDirectoryLegacy != "" {
err = c.LoadDirectory(*fConfigDirectoryLegacy)
if err != nil {
log.Fatal(err)
}
}

// Read config dir
if *fConfigDirectory != "" {
err = c.LoadDirectory(*fConfigDirectory)
if err != nil {
log.Fatal(err)
}
}
// check config
if len(c.Outputs) == 0 {
log.Fatalf("Error: no outputs found, did you provide a valid config file?")
}
Expand Down Expand Up @@ -198,22 +286,6 @@ func main() {
log.Fatal(err)
}

shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)
go func() {
sig := <-signals
if sig == os.Interrupt {
close(shutdown)
}
if sig == syscall.SIGHUP {
log.Printf("Reloading Telegraf config\n")
<-reload
reload <- true
close(shutdown)
}
}()

log.Printf("Starting Telegraf (version %s)\n", Version)
log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
log.Printf("Loaded inputs: %s", strings.Join(c.InputNames(), " "))
Expand Down
16 changes: 15 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/influxdata/telegraf/plugins/outputs"

"github.com/influxdata/config"
"github.com/naoina/toml"
"github.com/naoina/toml/ast"
)

Expand All @@ -39,6 +40,7 @@ func NewConfig() *Config {
Agent: &AgentConfig{
Interval: internal.Duration{Duration: 10 * time.Second},
RoundInterval: true,
Labels: make([]string, 0),
FlushInterval: internal.Duration{Duration: 10 * time.Second},
FlushJitter: internal.Duration{Duration: 5 * time.Second},
},
Expand Down Expand Up @@ -92,6 +94,9 @@ type AgentConfig struct {
// Quiet is the option for running in quiet mode
Quiet bool
Hostname string

// Etcd labels
Labels []string
}

// Inputs returns a list of strings of the configured inputs.
Expand Down Expand Up @@ -316,7 +321,16 @@ func (c *Config) LoadDirectory(path string) error {

// LoadConfig loads the given config file and applies it to c
func (c *Config) LoadConfig(path string) error {
tbl, err := config.ParseFile(path)
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
err = c.LoadConfigFromText(data)
return err
}

func (c *Config) LoadConfigFromText(data []byte) error {
tbl, err := toml.Parse(data)
if err != nil {
return err
}
Expand Down
Loading