-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathForwarderKafka.go
60 lines (53 loc) · 1.46 KB
/
ForwarderKafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package canarytools
import (
"crypto/tls"
"github.com/Shopify/sarama"
log "github.com/sirupsen/logrus"
)
// KafkaForwarder sends alerts to kafka
type KafkaForwarder struct {
// w *kafka.Writer
p sarama.SyncProducer
topic string
l *log.Logger
// TODO: TLS!
}
// NewKafkaForwarder creates a new KafkaForwarder
func NewKafkaForwarder(brokers []string, topic string, tlsconfig *tls.Config, l *log.Logger) (kafkaforwarder *KafkaForwarder, err error) {
kafkaforwarder = &KafkaForwarder{}
kafkaforwarder.l = l
kafkaforwarder.topic = topic
sarama.Logger = l
// producer config
config := sarama.NewConfig()
config.Producer.Retry.Max = 5
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Return.Successes = true
config.ClientID = "CanaryChirpForwarder"
if tlsconfig != nil {
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsconfig
}
kafkaforwarder.p, err = sarama.NewSyncProducer(brokers, config)
return
}
func (kf KafkaForwarder) Forward(outChan <-chan []byte, incidentAckerChan chan<- []byte) {
for i := range outChan {
kf.l.WithFields(log.Fields{
"source": "KafkaForwarder",
"stage": "forward",
}).Debug("Kafka out incident")
_, _, err := kf.p.SendMessage(&sarama.ProducerMessage{
Topic: kf.topic,
Value: sarama.ByteEncoder(i),
})
if err != nil {
kf.l.WithFields(log.Fields{
"source": "KafkaForwarder",
"stage": "forward",
"err": err,
}).Error("Kafka error")
}
incidentAckerChan <- i
}
}