Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #523 from raintank/replicatorUpdate
Browse files Browse the repository at this point in the history
Replicator update
  • Loading branch information
Anthony Woods authored Feb 16, 2017
2 parents e413081 + cbee7ee commit 02041d6
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 260 deletions.
118 changes: 0 additions & 118 deletions cmd/mt-replicator/consume.go

This file was deleted.

115 changes: 71 additions & 44 deletions cmd/mt-replicator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,20 @@ var (
showVersion = flag.Bool("version", false, "print version string")
logLevel = flag.Int("log-level", 2, "log level. 0=TRACE|1=DEBUG|2=INFO|3=WARN|4=ERROR|5=CRITICAL|6=FATAL")

partitionScheme = flag.String("partition-scheme", "byOrg", "method used for partitioning metrics. (byOrg|bySeries)")
compression = flag.String("compression", "none", "compression: none|gzip|snappy")
group = flag.String("group", "mt-replicator", "Kafka consumer group")
srcTopic = flag.String("src-topic", "mdm", "topic name on source cluster")
dstTopic = flag.String("dst-topic", "mdm", "topic name on destination cluster")
srcBrokerStr = flag.String("src-brokers", "localhost:9092", "tcp address of source kafka cluster (may be be given multiple times as a comma-separated list)")
dstBrokerStr = flag.String("dst-brokers", "localhost:9092", "tcp address for kafka cluster to consume from (may be be given multiple times as a comma-separated list)")

wg sync.WaitGroup
partitionScheme = flag.String("partition-scheme", "bySeries", "method used for partitioning metrics. (byOrg|bySeries)")
compression = flag.String("compression", "snappy", "compression: none|gzip|snappy")
replicateMetrics = flag.Bool("metrics", false, "replicate metrics")
replicatePersist = flag.Bool("persist", false, "replicate persistMetrics")
group = flag.String("group", "mt-replicator", "Kafka consumer group")
metricSrcTopic = flag.String("metric-src-topic", "mdm", "metrics topic name on source cluster")
metricDstTopic = flag.String("metric-dst-topic", "mdm", "metrics topic name on destination cluster")
persistSrcTopic = flag.String("persist-src-topic", "metricpersist", "metricPersist topic name on source cluster")
persistDstTopic = flag.String("persist-dst-topic", "metricpersist", "metricPersist topic name on destination cluster")
initialOffset = flag.Int("initial-offset", -2, "initial offset to consume from. (-2=oldest, -1=newest)")
srcBrokerStr = flag.String("src-brokers", "localhost:9092", "tcp address of source kafka cluster (may be be given multiple times as a comma-separated list)")
dstBrokerStr = flag.String("dst-brokers", "localhost:9092", "tcp address for kafka cluster to consume from (may be be given multiple times as a comma-separated list)")
)

type topic struct {
src string
dst string
}

func main() {
flag.Usage = func() {
fmt.Fprintln(os.Stderr, "mt-replicator")
Expand All @@ -46,20 +44,16 @@ func main() {
log.NewLogger(0, "console", fmt.Sprintf(`{"level": %d, "formatting":false}`, *logLevel))

if *showVersion {
fmt.Printf("eventtank (built with %s, git hash %s)\n", runtime.Version(), GitHash)
fmt.Printf("mt-replicator (built with %s, git hash %s)\n", runtime.Version(), GitHash)
return
}

if *group == "" {
log.Fatal(4, "--group is required")
}

if *srcTopic == "" {
log.Fatal(4, "--src-topic is required")
if !*replicateMetrics && !*replicatePersist {
log.Fatal(4, "at least one of --metrics or --persist is needed.")
}

if *dstTopic == "" {
log.Fatal(4, "--dst-topic is required")
if *group == "" {
log.Fatal(4, "--group is required")
}

if *srcBrokerStr == "" {
Expand All @@ -69,35 +63,68 @@ func main() {
log.Fatal(4, "--dst-brokers required")
}

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

srcBrokers := strings.Split(*srcBrokerStr, ",")
dstBrokers := strings.Split(*dstBrokerStr, ",")
wg := new(sync.WaitGroup)
shutdown := make(chan struct{})
if *replicateMetrics {
if *metricSrcTopic == "" {
log.Fatal(4, "--metric-src-topic is required")
}

consumer, err := NewConsumer(srcBrokers, *group, *srcTopic)
if err != nil {
log.Fatal(4, err.Error())
}
publisher, err := NewPublisher(dstBrokers, *dstTopic, *compression, *partitionScheme)
if err != nil {
log.Fatal(4, err.Error())
if *metricDstTopic == "" {
log.Fatal(4, "--metric-dst-topic is required")
}

metrics, err := NewMetricsReplicator(srcBrokers, dstBrokers, *compression, *group, *metricSrcTopic, *metricDstTopic, *initialOffset, *partitionScheme)
if err != nil {
log.Fatal(4, err.Error())
}

log.Info("starting metrics replicator")
metrics.Start()
wg.Add(1)
go func() {
<-shutdown
log.Info("metrics replicator shutdown started.")
metrics.Stop()
log.Info("metrics replicator ended.")
wg.Done()
}()
}

log.Info("starting consumer")
consumer.Start(publisher)
if *replicatePersist {
if *persistSrcTopic == "" {
log.Fatal(4, "--persist-src-topic is required")
}

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
if *persistDstTopic == "" {
log.Fatal(4, "--persist-dst-topic is required")
}

LOOP:
for {
select {
case <-consumer.Done:
log.Info("consumer ended.")
break LOOP
case <-sigChan:
log.Info("shutdown started.")
consumer.Stop()
metricPersist, err := NewPersistReplicator(srcBrokers, dstBrokers, *group, *persistSrcTopic, *persistDstTopic, *initialOffset)
if err != nil {
log.Fatal(4, err.Error())
}

log.Info("starting metricPersist replicator")
metricPersist.Start()
wg.Add(1)
go func() {
<-shutdown
log.Info("metricPersist replicator shutdown started.")
metricPersist.Stop()
log.Info("metricPersist replicator ended.")
wg.Done()
}()
}
publisher.Stop()

<-sigChan
close(shutdown)
wg.Wait()
log.Info("shutdown complete")
log.Close()
}
125 changes: 125 additions & 0 deletions cmd/mt-replicator/metricpersist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package main

import (
"fmt"
"time"

"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/raintank/worldping-api/pkg/log"
)

type PersistReplicator struct {
consumer *cluster.Consumer
producer sarama.SyncProducer
destTopic string
done chan struct{}
}

func NewPersistReplicator(srcBrokers []string, dstBrokers []string, group, srcTopic, destTopic string, initialOffset int) (*PersistReplicator, error) {
config := cluster.NewConfig()
config.Consumer.Offsets.Initial = int64(initialOffset)
config.ClientID = "mt-persist-replicator"
config.Group.Return.Notifications = true
config.ChannelBufferSize = 1000
config.Consumer.Fetch.Min = 1
config.Consumer.Fetch.Default = 32768
config.Consumer.MaxWaitTime = time.Second
config.Consumer.MaxProcessingTime = time.Second
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
config.Producer.Compression = sarama.CompressionSnappy
config.Config.Version = sarama.V0_10_0_0
err := config.Validate()
if err != nil {
return nil, err
}
consumer, err := cluster.NewConsumer(srcBrokers, fmt.Sprintf("%s-persist", group), []string{srcTopic}, config)
if err != nil {
return nil, err
}

producer, err := sarama.NewSyncProducer(dstBrokers, &config.Config)
if err != nil {
return nil, err
}

return &PersistReplicator{
consumer: consumer,
producer: producer,
destTopic: destTopic,
done: make(chan struct{}),
}, nil
}

func (r *PersistReplicator) Consume() {
accountingTicker := time.NewTicker(time.Second * 10)
flushTicker := time.NewTicker(time.Second)
counter := 0
counterTs := time.Now()
msgChan := r.consumer.Messages()
var m *sarama.ConsumerMessage
var ok bool

buf := make([]*sarama.ProducerMessage, 0)
defer close(r.done)
for {
select {
case m, ok = <-msgChan:
if !ok {
if len(buf) != 0 {
r.Flush(buf)
}
return
}
log.Debug("received metricPersist message with key: %s", m.Key)
msg := &sarama.ProducerMessage{
Key: sarama.ByteEncoder(m.Key),
Topic: r.destTopic,
Value: sarama.ByteEncoder(m.Value),
}
buf = append(buf, msg)
if len(buf) >= 1000 {
r.Flush(buf)
counter += len(buf)
buf = buf[:0]
r.consumer.MarkPartitionOffset(m.Topic, m.Partition, m.Offset, "")
}
case <-flushTicker.C:
if len(buf) == 0 {
continue
}
r.Flush(buf)
counter += len(buf)
buf = buf[:0]
r.consumer.MarkPartitionOffset(m.Topic, m.Partition, m.Offset, "")
case t := <-accountingTicker.C:
log.Info("%d metricpersist messages procesed in last %.1fseconds.", counter, t.Sub(counterTs).Seconds())
counter = 0
counterTs = t
}
}

}

func (r *PersistReplicator) Flush(buf []*sarama.ProducerMessage) {
for {
err := r.producer.SendMessages(buf)
if err != nil {
log.Error(3, "failed to publish metricPersist message. %s . trying again in 1second", err)
time.Sleep(time.Second)
} else {
return
}
}
}

func (r *PersistReplicator) Stop() {
r.consumer.Close()
<-r.done
r.producer.Close()
}

func (r *PersistReplicator) Start() {
go r.Consume()
}
Loading

0 comments on commit 02041d6

Please sign in to comment.