-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka_consumer.go
208 lines (177 loc) · 6.84 KB
/
kafka_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
package kafka_consumer
import (
"fmt"
"log"
"strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
type Kafka struct {
ConsumerGroup string
Topics []string
Brokers string
MaxMessageLen int
Consumer *(kafka.Consumer)
//If SSLEnabled to true, all ssl settings will be enable.
//Need to use SSL connection if server end have config
//ssl.client.auth=required Need to fill ca,cert,key,keypwd
//ssl.client.auth=none Need to fill ca
SSLEnabled bool `toml:"ssl_enabled"`
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// If the key file has password, please put it here
SSLKeypwd string `toml:"ssl_keypwd"`
// Legacy metric buffer support
MetricBuffer int
// TODO remove PointBuffer, legacy support
PointBuffer int
Offset string
parser parsers.Parser
sync.Mutex
done chan struct{}
// keep the accumulator internally:
acc telegraf.Accumulator
// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer
// this is mostly for test purposes, but there may be a use-case for it later.
doNotCommitMsgs bool
}
var sampleConfig = `
## kafka servers
brokers = "localhost:9092"
## topic(s) to consume
topics = ["telegraf"]
## Optional SSL Config
ssl_enabled = false
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
# ssl_keypwd = ""
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
## Offset (must be one of the following:"beginning", "earliest", "end", "latest", "unset", "invalid", "stored")
offset = "latest"
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 65536
`
func (k *Kafka) SampleConfig() string {
return sampleConfig
}
func (k *Kafka) Description() string {
return "Read metrics from Kafka topic(s)"
}
func (k *Kafka) SetParser(parser parsers.Parser) {
k.parser = parser
}
func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
k.acc = acc
conf := kafka.ConfigMap{"bootstrap.servers": k.Brokers}
conf["group.id"] = k.ConsumerGroup
conf["session.timeout.ms"] = 6000
conf["go.events.channel.enable"] = true
conf["go.application.rebalance.enable"] = true
if k.SSLEnabled == true {
log.Printf("I! Will connect kafka with SSL configurations.")
conf["security.protocol"] = "ssl"
conf["ssl.ca.location"] = k.SSLCA
conf["ssl.certificate.location"] = k.SSLCert
conf["ssl.key.location"] = k.SSLKey
if len(k.SSLKeypwd)>0 {
conf["ssl.key.password"] = k.SSLKeypwd
}
}
switch strings.ToLower(k.Offset) {
case "beginning", "earliest", "end", "latest", "unset", "invalid", "stored":
log.Printf("I! Kafka consumer offset will be set to'%s'\n",
k.Offset)
conf["default.topic.config"] = kafka.ConfigMap{"auto.offset.reset": strings.ToLower(k.Offset)}
default:
log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
k.Offset)
conf["default.topic.config"] = kafka.ConfigMap{"auto.offset.reset": "latest"}
}
if k.Consumer == nil {
c, err := kafka.NewConsumer(&conf)
if err != nil {
fmt.Errorf("Failed to create consumer: %s\n", err)
return err
}
k.Consumer = c
log.Printf("I! Created Consumer %v\n", c)
err = k.Consumer.SubscribeTopics(k.Topics, nil)
if err != nil {
fmt.Errorf("Failed to subcribe topics: %v\n", k.Topics, err)
return err
}
}
k.done = make(chan struct{})
// Start the kafka message reader
go k.receiver()
log.Printf("I! Started the kafka consumer service, brokers: %v, topics: %v\n",
k.Brokers, k.Topics)
return nil
}
// receiver() reads all incoming messages from the consumer, and parses them into
// influxdb metric points.
func (k *Kafka) receiver() {
for {
select {
case <-k.done:
log.Printf("I! Done! Terminated.\n")
return
case ev := <-k.Consumer.Events():
switch e := ev.(type) {
case kafka.AssignedPartitions:
k.Consumer.Assign(e.Partitions)
case kafka.RevokedPartitions:
k.Consumer.Unassign()
case *kafka.Message:
//TODO check max length
log.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
metrics, err := k.parser.Parse(e.Value)
if err != nil {
k.acc.AddError(fmt.Errorf("Message Parse Error\nmessage: %s\nerror: %s",
string(e.Value), err.Error()))
}
for _, metric := range metrics {
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
k.acc.AddError(fmt.Errorf("%% Error: %v\n", e))
}
}
}
}
func (k *Kafka) Stop() {
k.Lock()
defer k.Unlock()
close(k.done)
if err := k.Consumer.Close(); err != nil {
k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error()))
}
}
func (k *Kafka) Gather(acc telegraf.Accumulator) error {
return nil
}
func init() {
inputs.Add("kafka_consumer", func() telegraf.Input {
return &Kafka{}
})
}