Skip to content

Commit

Permalink
Kafka output producer, send telegraf metrics to Kafka brokers
Browse files Browse the repository at this point in the history
Closes #38
  • Loading branch information
sparrc committed Aug 26, 2015
1 parent 4342678 commit 63c5320
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 19 deletions.
31 changes: 16 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,37 +103,38 @@ at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output
measurements at a 10s interval and will collect totalcpu & percpu data.

```
[outputs]
[outputs.influxdb]
url = "http://192.168.59.103:8086" # required.
database = "telegraf" # required.
[tags]
dc = "denver-1"
dc = "denver-1"
[agent]
interval = "10s"
interval = "10s"
# OUTPUTS
[outputs]
[outputs.influxdb]
url = "http://192.168.59.103:8086" # required.
database = "telegraf" # required.
# PLUGINS
[cpu]
percpu = true
totalcpu = true
percpu = true
totalcpu = true
```

Below is how to configure `tagpass` parameters (added in 0.1.4)

```
# Don't collect CPU data for cpu6 & cpu7
[cpu.tagdrop]
cpu = [ "cpu6", "cpu7" ]
cpu = [ "cpu6", "cpu7" ]
[disk]
[disk.tagpass]
# tagpass conditions are OR, not AND.
# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home)
# then the metric passes
fstype = [ "ext4", "xfs" ]
path = [ "/opt", "/home" ]
# tagpass conditions are OR, not AND.
# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home)
# then the metric passes
fstype = [ "ext4", "xfs" ]
path = [ "/opt", "/home" ]
```

## Supported Plugins
Expand Down
3 changes: 3 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func (a *Agent) Connect() error {
if err != nil {
return err
}
if a.Debug {
log.Printf("Successfully connected to output: %s\n", o.name)
}
}
return nil
}
Expand Down
3 changes: 0 additions & 3 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,8 @@ func main() {
}

shutdown := make(chan struct{})

signals := make(chan os.Signal)

signal.Notify(signals, os.Interrupt)

go func() {
<-signals
close(shutdown)
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ var header = `# Telegraf configuration
# debug = false
# hostname = "prod3241"
###############################################################################
# OUTPUTS #
###############################################################################
Expand All @@ -368,7 +369,6 @@ var header2 = `
###############################################################################
# PLUGINS #
###############################################################################
`

// PrintSampleConfig prints the sample config!
Expand Down
1 change: 1 addition & 0 deletions outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ package all
import (
_ "github.com/influxdb/telegraf/outputs/datadog"
_ "github.com/influxdb/telegraf/outputs/influxdb"
_ "github.com/influxdb/telegraf/outputs/kafka"
)
53 changes: 53 additions & 0 deletions outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package kafka

import (
"github.com/Shopify/sarama"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/outputs"
)

type Kafka struct {
// Kafka brokers to send metrics to
Brokers []string

producer sarama.SyncProducer
}

var sampleConfig = `
# URLs of kafka brokers
brokers = ["localhost:9092"]
`

func (k *Kafka) Connect() error {
producer, err := sarama.NewSyncProducer(k.Brokers, nil)
if err != nil {
return err
}
k.producer = producer
return nil
}

func (k *Kafka) Close() error {
return k.producer.Close()
}

func (k *Kafka) SampleConfig() string {
return sampleConfig
}

func (k *Kafka) Description() string {
return "Configuration for the Kafka server to send metrics to"
}

func (k *Kafka) Write(bp client.BatchPoints) error {
if len(bp.Points) == 0 {
return nil
}
return nil
}

func init() {
outputs.Add("kafka", func() outputs.Output {
return &Kafka{}
})
}
11 changes: 11 additions & 0 deletions outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package kafka

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestFoo(t *testing.T) {
require.NoError(t, nil)
}

0 comments on commit 63c5320

Please sign in to comment.