From 7c9717f542184e9f5448468eb1890047db8144cb Mon Sep 17 00:00:00 2001 From: woodsaj Date: Wed, 8 Feb 2017 21:02:20 +0800 Subject: [PATCH 1/6] add config option for initialOffset --- cmd/mt-replicator/consume.go | 4 ++-- cmd/mt-replicator/main.go | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/mt-replicator/consume.go b/cmd/mt-replicator/consume.go index a7c0fd0dff..7ed47016bc 100644 --- a/cmd/mt-replicator/consume.go +++ b/cmd/mt-replicator/consume.go @@ -14,9 +14,9 @@ type Consumer struct { Done chan struct{} } -func NewConsumer(brokers []string, group, topic string) (*Consumer, error) { +func NewConsumer(brokers []string, group, topic string, initialOffset int) (*Consumer, error) { config := cluster.NewConfig() - config.Consumer.Offsets.Initial = sarama.OffsetNewest + config.Consumer.Offsets.Initial = int64(initialOffset) config.ClientID = "mt-replicator" config.Group.Return.Notifications = true config.ChannelBufferSize = 1000 diff --git a/cmd/mt-replicator/main.go b/cmd/mt-replicator/main.go index 9f6ec8de6d..51b509a776 100644 --- a/cmd/mt-replicator/main.go +++ b/cmd/mt-replicator/main.go @@ -23,6 +23,7 @@ var ( group = flag.String("group", "mt-replicator", "Kafka consumer group") srcTopic = flag.String("src-topic", "mdm", "topic name on source cluster") dstTopic = flag.String("dst-topic", "mdm", "topic name on destination cluster") + initialOffset = flag.Int("initial-offset", -2, "initial offset to consume from. (-2=oldest, -1=newest)") srcBrokerStr = flag.String("src-brokers", "localhost:9092", "tcp address of source kafka cluster (may be be given multiple times as a comma-separated list)") dstBrokerStr = flag.String("dst-brokers", "localhost:9092", "tcp address for kafka cluster to consume from (may be be given multiple times as a comma-separated list)") @@ -72,7 +73,7 @@ func main() { srcBrokers := strings.Split(*srcBrokerStr, ",") dstBrokers := strings.Split(*dstBrokerStr, ",") - consumer, err := NewConsumer(srcBrokers, *group, *srcTopic) + consumer, err := NewConsumer(srcBrokers, *group, *srcTopic, *initialOffset) if err != nil { log.Fatal(4, err.Error()) } From e392fa286470e1d59a77363785d1ff3c31f47311 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Thu, 9 Feb 2017 02:08:21 +0800 Subject: [PATCH 2/6] update mt-replicator to also be able to replicate metricpersist messages. --- cmd/mt-replicator/main.go | 118 ++++++++++++++++++--------- cmd/mt-replicator/metricpersist.go | 127 +++++++++++++++++++++++++++++ 2 files changed, 208 insertions(+), 37 deletions(-) create mode 100644 cmd/mt-replicator/metricpersist.go diff --git a/cmd/mt-replicator/main.go b/cmd/mt-replicator/main.go index 51b509a776..7574acff54 100644 --- a/cmd/mt-replicator/main.go +++ b/cmd/mt-replicator/main.go @@ -18,14 +18,18 @@ var ( showVersion = flag.Bool("version", false, "print version string") logLevel = flag.Int("log-level", 2, "log level. 0=TRACE|1=DEBUG|2=INFO|3=WARN|4=ERROR|5=CRITICAL|6=FATAL") - partitionScheme = flag.String("partition-scheme", "byOrg", "method used for partitioning metrics. (byOrg|bySeries)") - compression = flag.String("compression", "none", "compression: none|gzip|snappy") - group = flag.String("group", "mt-replicator", "Kafka consumer group") - srcTopic = flag.String("src-topic", "mdm", "topic name on source cluster") - dstTopic = flag.String("dst-topic", "mdm", "topic name on destination cluster") - initialOffset = flag.Int("initial-offset", -2, "initial offset to consume from. (-2=oldest, -1=newest)") - srcBrokerStr = flag.String("src-brokers", "localhost:9092", "tcp address of source kafka cluster (may be be given multiple times as a comma-separated list)") - dstBrokerStr = flag.String("dst-brokers", "localhost:9092", "tcp address for kafka cluster to consume from (may be be given multiple times as a comma-separated list)") + partitionScheme = flag.String("partition-scheme", "byOrg", "method used for partitioning metrics. (byOrg|bySeries)") + compression = flag.String("compression", "none", "compression: none|gzip|snappy") + replicateMetrics = flag.Bool("metrics", false, "replicate metrics") + replicatePersist = flag.Bool("persist", false, "replicate persistMetrics") + group = flag.String("group", "mt-replicator", "Kafka consumer group") + srcTopic = flag.String("src-topic", "mdm", "topic name on source cluster") + dstTopic = flag.String("dst-topic", "mdm", "topic name on destination cluster") + persistSrcTopic = flag.String("persist-src-topic", "metricpersist", "metricPersist topic name on source cluster") + persistDstTopic = flag.String("persist-dst-topic", "metricpersist", "metricPersist topic name on destination cluster") + initialOffset = flag.Int("initial-offset", -2, "initial offset to consume from. (-2=oldest, -1=newest)") + srcBrokerStr = flag.String("src-brokers", "localhost:9092", "tcp address of source kafka cluster (may be be given multiple times as a comma-separated list)") + dstBrokerStr = flag.String("dst-brokers", "localhost:9092", "tcp address for kafka cluster to consume from (may be be given multiple times as a comma-separated list)") wg sync.WaitGroup ) @@ -51,18 +55,13 @@ func main() { return } + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + if *group == "" { log.Fatal(4, "--group is required") } - if *srcTopic == "" { - log.Fatal(4, "--src-topic is required") - } - - if *dstTopic == "" { - log.Fatal(4, "--dst-topic is required") - } - if *srcBrokerStr == "" { log.Fatal(4, "--src-brokers required") } @@ -72,33 +71,78 @@ func main() { srcBrokers := strings.Split(*srcBrokerStr, ",") dstBrokers := strings.Split(*dstBrokerStr, ",") + wg := new(sync.WaitGroup) - consumer, err := NewConsumer(srcBrokers, *group, *srcTopic, *initialOffset) - if err != nil { - log.Fatal(4, err.Error()) - } - publisher, err := NewPublisher(dstBrokers, *dstTopic, *compression, *partitionScheme) - if err != nil { - log.Fatal(4, err.Error()) + if *replicateMetrics { + + if *srcTopic == "" { + log.Fatal(4, "--src-topic is required") + } + + if *dstTopic == "" { + log.Fatal(4, "--dst-topic is required") + } + + consumer, err := NewConsumer(srcBrokers, *group, *srcTopic, *initialOffset) + if err != nil { + log.Fatal(4, err.Error()) + } + publisher, err := NewPublisher(dstBrokers, *dstTopic, *compression, *partitionScheme) + if err != nil { + log.Fatal(4, err.Error()) + } + log.Info("starting metrics consumer") + consumer.Start(publisher) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-consumer.Done: + log.Info("metrics consumer ended.") + return + case <-sigChan: + log.Info("metrics shutdown started.") + consumer.Stop() + } + } + publisher.Stop() + }() } - log.Info("starting consumer") - consumer.Start(publisher) + if *replicatePersist { + if *persistSrcTopic == "" { + log.Fatal(4, "--persist-src-topic is required") + } - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + if *persistDstTopic == "" { + log.Fatal(4, "--persist-dst-topic is required") + } -LOOP: - for { - select { - case <-consumer.Done: - log.Info("consumer ended.") - break LOOP - case <-sigChan: - log.Info("shutdown started.") - consumer.Stop() + metricPersist, err := NewPersistRelay(srcBrokers, dstBrokers, *group, *persistSrcTopic, *persistDstTopic, *initialOffset) + if err != nil { + log.Fatal(4, err.Error()) } + + log.Info("starting metricPersist relay") + metricPersist.Start() + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-metricPersist.Done: + log.Info("metricPersist ended.") + return + case <-sigChan: + log.Info("metricPersist shutdown started.") + metricPersist.Stop() + } + } + }() } - publisher.Stop() + wg.Wait() + log.Info("shutdown complete") + log.Close() } diff --git a/cmd/mt-replicator/metricpersist.go b/cmd/mt-replicator/metricpersist.go new file mode 100644 index 0000000000..d45aef5af5 --- /dev/null +++ b/cmd/mt-replicator/metricpersist.go @@ -0,0 +1,127 @@ +package main + +import ( + "fmt" + "time" + + "github.com/Shopify/sarama" + "github.com/bsm/sarama-cluster" + "github.com/raintank/worldping-api/pkg/log" +) + +type PersistRelay struct { + consumer *cluster.Consumer + producer sarama.SyncProducer + destTopic string + Done chan struct{} +} + +func NewPersistRelay(srcBrokers []string, dstBrokers []string, group, srcTopic, destTopic string, initialOffset int) (*PersistRelay, error) { + config := cluster.NewConfig() + config.Consumer.Offsets.Initial = int64(initialOffset) + config.ClientID = "mt-persist-replicator" + config.Group.Return.Notifications = true + config.ChannelBufferSize = 1000 + config.Consumer.Fetch.Min = 1 + config.Consumer.Fetch.Default = 32768 + config.Consumer.MaxWaitTime = time.Second + config.Consumer.MaxProcessingTime = time.Second + config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message + config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message + config.Producer.Compression = sarama.CompressionSnappy + config.Config.Version = sarama.V0_10_0_0 + err := config.Validate() + if err != nil { + return nil, err + } + consumer, err := cluster.NewConsumer(srcBrokers, fmt.Sprintf("%s-persist", group), []string{srcTopic}, config) + if err != nil { + return nil, err + } + + producer, err := sarama.NewSyncProducer(dstBrokers, &config.Config) + if err != nil { + return nil, err + } + + return &PersistRelay{ + consumer: consumer, + producer: producer, + destTopic: destTopic, + Done: make(chan struct{}), + }, nil +} + +func (c *PersistRelay) Consume() { + ticker := time.NewTicker(time.Second * 10) + counter := 0 + counterTs := time.Now() + msgChan := c.consumer.Messages() + complete := false + defer close(c.Done) + for { + select { + case m, ok := <-msgChan: + if !ok { + return + } + log.Debug("received metricPersist message with key: %s", m.Key) + msg := &sarama.ProducerMessage{ + Key: sarama.ByteEncoder(m.Key), + Topic: c.destTopic, + Value: sarama.ByteEncoder(m.Value), + } + complete = false + for !complete { + _, _, err := c.producer.SendMessage(msg) + if err != nil { + log.Error(3, "failed to publish metricPersist message. %s . trying again in 1second", err) + time.Sleep(time.Second) + } else { + complete = true + } + } + counter++ + c.consumer.MarkPartitionOffset(m.Topic, m.Partition, m.Offset, "") + case t := <-ticker.C: + log.Info("%d metricpersist messages procesed in last %.1fseconds.", counter, t.Sub(counterTs).Seconds()) + counter = 0 + counterTs = t + } + } + +} + +func (c *PersistRelay) Stop() { + c.consumer.Close() + c.producer.Close() +} + +func (c *PersistRelay) Start() { + go c.Consume() +} + +func (c *PersistRelay) kafkaNotifications() { + for msg := range c.consumer.Notifications() { + if len(msg.Claimed) > 0 { + for topic, partitions := range msg.Claimed { + log.Info("kafka consumer claimed %d partitions on topic: %s", len(partitions), topic) + } + } + if len(msg.Released) > 0 { + for topic, partitions := range msg.Released { + log.Info("kafka consumer released %d partitions on topic: %s", len(partitions), topic) + } + } + + if len(msg.Current) == 0 { + log.Info("kafka consumer is no longer consuming from any partitions.") + } else { + log.Info("kafka Current partitions:") + for topic, partitions := range msg.Current { + log.Info("kafka Current partitions: %s: %v", topic, partitions) + } + } + } + log.Info("kafka notification processing stopped") +} From 12cc0bf06768ed5ff5811eab5be90e3cf565f133 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Mon, 13 Feb 2017 20:39:20 +0800 Subject: [PATCH 3/6] close producer when consumer closes --- cmd/mt-replicator/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mt-replicator/main.go b/cmd/mt-replicator/main.go index 7574acff54..47784559f8 100644 --- a/cmd/mt-replicator/main.go +++ b/cmd/mt-replicator/main.go @@ -100,13 +100,13 @@ func main() { select { case <-consumer.Done: log.Info("metrics consumer ended.") + publisher.Stop() return case <-sigChan: log.Info("metrics shutdown started.") consumer.Stop() } } - publisher.Stop() }() } From 807823a1f97843b60d99cefb4726acce9fd5cf9c Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 13 Feb 2017 15:16:51 +0100 Subject: [PATCH 4/6] clarify src/dst topic If metric relay was more important or common than persist relaying, or the default one, then we could have it as it was. However since both are disabled by default, it makes more sense to be explicit --- cmd/mt-replicator/main.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/mt-replicator/main.go b/cmd/mt-replicator/main.go index 47784559f8..2e43421605 100644 --- a/cmd/mt-replicator/main.go +++ b/cmd/mt-replicator/main.go @@ -23,8 +23,8 @@ var ( replicateMetrics = flag.Bool("metrics", false, "replicate metrics") replicatePersist = flag.Bool("persist", false, "replicate persistMetrics") group = flag.String("group", "mt-replicator", "Kafka consumer group") - srcTopic = flag.String("src-topic", "mdm", "topic name on source cluster") - dstTopic = flag.String("dst-topic", "mdm", "topic name on destination cluster") + metricSrcTopic = flag.String("metric-src-topic", "mdm", "metrics topic name on source cluster") + metricDstTopic = flag.String("metric-dst-topic", "mdm", "metrics topic name on destination cluster") persistSrcTopic = flag.String("persist-src-topic", "metricpersist", "metricPersist topic name on source cluster") persistDstTopic = flag.String("persist-dst-topic", "metricpersist", "metricPersist topic name on destination cluster") initialOffset = flag.Int("initial-offset", -2, "initial offset to consume from. (-2=oldest, -1=newest)") @@ -75,19 +75,19 @@ func main() { if *replicateMetrics { - if *srcTopic == "" { - log.Fatal(4, "--src-topic is required") + if *metricSrcTopic == "" { + log.Fatal(4, "--metric-src-topic is required") } - if *dstTopic == "" { - log.Fatal(4, "--dst-topic is required") + if *metricDstTopic == "" { + log.Fatal(4, "--metric-dst-topic is required") } - consumer, err := NewConsumer(srcBrokers, *group, *srcTopic, *initialOffset) + consumer, err := NewConsumer(srcBrokers, *group, *metricSrcTopic, *initialOffset) if err != nil { log.Fatal(4, err.Error()) } - publisher, err := NewPublisher(dstBrokers, *dstTopic, *compression, *partitionScheme) + publisher, err := NewPublisher(dstBrokers, *metricDstTopic, *compression, *partitionScheme) if err != nil { log.Fatal(4, err.Error()) } From e764d9634e1582965b0f61c7d2ae14ade517bf49 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Thu, 16 Feb 2017 12:32:48 +0800 Subject: [PATCH 5/6] refactor mt-replicator. - use similar code structure for replicating metrics and persist messages. - send persistMessages in batches to greatly improve performance --- cmd/mt-replicator/consume.go | 118 ------------------- cmd/mt-replicator/main.go | 74 +++++------- cmd/mt-replicator/metricpersist.go | 102 +++++++++-------- cmd/mt-replicator/metrics.go | 175 +++++++++++++++++++++++++++++ cmd/mt-replicator/publish.go | 98 ---------------- 5 files changed, 253 insertions(+), 314 deletions(-) delete mode 100644 cmd/mt-replicator/consume.go create mode 100644 cmd/mt-replicator/metrics.go delete mode 100644 cmd/mt-replicator/publish.go diff --git a/cmd/mt-replicator/consume.go b/cmd/mt-replicator/consume.go deleted file mode 100644 index 7ed47016bc..0000000000 --- a/cmd/mt-replicator/consume.go +++ /dev/null @@ -1,118 +0,0 @@ -package main - -import ( - "time" - - "github.com/Shopify/sarama" - "github.com/bsm/sarama-cluster" - "github.com/raintank/worldping-api/pkg/log" - "gopkg.in/raintank/schema.v1" -) - -type Consumer struct { - consumer *cluster.Consumer - Done chan struct{} -} - -func NewConsumer(brokers []string, group, topic string, initialOffset int) (*Consumer, error) { - config := cluster.NewConfig() - config.Consumer.Offsets.Initial = int64(initialOffset) - config.ClientID = "mt-replicator" - config.Group.Return.Notifications = true - config.ChannelBufferSize = 1000 - config.Consumer.Fetch.Min = 1 - config.Consumer.Fetch.Default = 32768 - config.Consumer.MaxWaitTime = time.Second - config.Consumer.MaxProcessingTime = time.Second - config.Config.Version = sarama.V0_10_0_0 - err := config.Validate() - if err != nil { - return nil, err - } - consumer, err := cluster.NewConsumer(brokers, group, []string{topic}, config) - if err != nil { - return nil, err - } - - return &Consumer{ - consumer: consumer, - Done: make(chan struct{}), - }, nil -} - -func (c *Consumer) Consume(publisher *Publisher) { - buf := make([]*schema.MetricData, 0) - ticker := time.NewTicker(time.Second * 10) - counter := 0 - counterTs := time.Now() - msgChan := c.consumer.Messages() - defer close(c.Done) - for { - select { - case m, ok := <-msgChan: - if !ok { - return - } - md := &schema.MetricData{} - _, err := md.UnmarshalMsg(m.Value) - if err != nil { - log.Error(3, "kafka-mdm decode error, skipping message. %s", err) - continue - } - counter++ - buf = append(buf, md) - if len(buf) > 1000 { - log.Debug("flushing metricData buffer to kafka.") - complete := false - for !complete { - if err = publisher.Send(buf); err != nil { - log.Error(3, "failed to publish %d metrics. trying again in 1second", len(buf)) - time.Sleep(time.Second) - } else { - complete = true - } - } - buf = buf[:0] - c.consumer.MarkPartitionOffset(m.Topic, m.Partition, m.Offset, "") - } - case t := <-ticker.C: - log.Info("%d metrics procesed in last %.1fseconds.", counter, t.Sub(counterTs).Seconds()) - counter = 0 - counterTs = t - } - } - -} - -func (c *Consumer) Stop() { - c.consumer.Close() -} - -func (c *Consumer) Start(publisher *Publisher) { - go c.Consume(publisher) -} - -func (c *Consumer) kafkaNotifications() { - for msg := range c.consumer.Notifications() { - if len(msg.Claimed) > 0 { - for topic, partitions := range msg.Claimed { - log.Info("kafka consumer claimed %d partitions on topic: %s", len(partitions), topic) - } - } - if len(msg.Released) > 0 { - for topic, partitions := range msg.Released { - log.Info("kafka consumer released %d partitions on topic: %s", len(partitions), topic) - } - } - - if len(msg.Current) == 0 { - log.Info("kafka consumer is no longer consuming from any partitions.") - } else { - log.Info("kafka Current partitions:") - for topic, partitions := range msg.Current { - log.Info("kafka Current partitions: %s: %v", topic, partitions) - } - } - } - log.Info("kafka notification processing stopped") -} diff --git a/cmd/mt-replicator/main.go b/cmd/mt-replicator/main.go index 2e43421605..cdfb68786b 100644 --- a/cmd/mt-replicator/main.go +++ b/cmd/mt-replicator/main.go @@ -18,8 +18,8 @@ var ( showVersion = flag.Bool("version", false, "print version string") logLevel = flag.Int("log-level", 2, "log level. 0=TRACE|1=DEBUG|2=INFO|3=WARN|4=ERROR|5=CRITICAL|6=FATAL") - partitionScheme = flag.String("partition-scheme", "byOrg", "method used for partitioning metrics. (byOrg|bySeries)") - compression = flag.String("compression", "none", "compression: none|gzip|snappy") + partitionScheme = flag.String("partition-scheme", "bySeries", "method used for partitioning metrics. (byOrg|bySeries)") + compression = flag.String("compression", "snappy", "compression: none|gzip|snappy") replicateMetrics = flag.Bool("metrics", false, "replicate metrics") replicatePersist = flag.Bool("persist", false, "replicate persistMetrics") group = flag.String("group", "mt-replicator", "Kafka consumer group") @@ -30,15 +30,8 @@ var ( initialOffset = flag.Int("initial-offset", -2, "initial offset to consume from. (-2=oldest, -1=newest)") srcBrokerStr = flag.String("src-brokers", "localhost:9092", "tcp address of source kafka cluster (may be be given multiple times as a comma-separated list)") dstBrokerStr = flag.String("dst-brokers", "localhost:9092", "tcp address for kafka cluster to consume from (may be be given multiple times as a comma-separated list)") - - wg sync.WaitGroup ) -type topic struct { - src string - dst string -} - func main() { flag.Usage = func() { fmt.Fprintln(os.Stderr, "mt-replicator") @@ -51,12 +44,13 @@ func main() { log.NewLogger(0, "console", fmt.Sprintf(`{"level": %d, "formatting":false}`, *logLevel)) if *showVersion { - fmt.Printf("eventtank (built with %s, git hash %s)\n", runtime.Version(), GitHash) + fmt.Printf("mt-replicator (built with %s, git hash %s)\n", runtime.Version(), GitHash) return } - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + if !*replicateMetrics && !*replicatePersist { + log.Fatal(4, "at least one of --metrics or --persist is needed.") + } if *group == "" { log.Fatal(4, "--group is required") @@ -69,12 +63,14 @@ func main() { log.Fatal(4, "--dst-brokers required") } + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + srcBrokers := strings.Split(*srcBrokerStr, ",") dstBrokers := strings.Split(*dstBrokerStr, ",") wg := new(sync.WaitGroup) - + shutdown := make(chan struct{}) if *replicateMetrics { - if *metricSrcTopic == "" { log.Fatal(4, "--metric-src-topic is required") } @@ -83,30 +79,20 @@ func main() { log.Fatal(4, "--metric-dst-topic is required") } - consumer, err := NewConsumer(srcBrokers, *group, *metricSrcTopic, *initialOffset) + metrics, err := NewMetricsReplicator(srcBrokers, dstBrokers, *compression, *group, *metricSrcTopic, *metricDstTopic, *initialOffset, *partitionScheme) if err != nil { log.Fatal(4, err.Error()) } - publisher, err := NewPublisher(dstBrokers, *metricDstTopic, *compression, *partitionScheme) - if err != nil { - log.Fatal(4, err.Error()) - } - log.Info("starting metrics consumer") - consumer.Start(publisher) + + log.Info("starting metrics replicator") + metrics.Start() wg.Add(1) go func() { - defer wg.Done() - for { - select { - case <-consumer.Done: - log.Info("metrics consumer ended.") - publisher.Stop() - return - case <-sigChan: - log.Info("metrics shutdown started.") - consumer.Stop() - } - } + <-shutdown + log.Info("metrics replicator shutdown started.") + metrics.Stop() + log.Info("metrics replicator ended.") + wg.Done() }() } @@ -119,29 +105,25 @@ func main() { log.Fatal(4, "--persist-dst-topic is required") } - metricPersist, err := NewPersistRelay(srcBrokers, dstBrokers, *group, *persistSrcTopic, *persistDstTopic, *initialOffset) + metricPersist, err := NewPersistReplicator(srcBrokers, dstBrokers, *group, *persistSrcTopic, *persistDstTopic, *initialOffset) if err != nil { log.Fatal(4, err.Error()) } - log.Info("starting metricPersist relay") + log.Info("starting metricPersist replicator") metricPersist.Start() wg.Add(1) go func() { - defer wg.Done() - for { - select { - case <-metricPersist.Done: - log.Info("metricPersist ended.") - return - case <-sigChan: - log.Info("metricPersist shutdown started.") - metricPersist.Stop() - } - } + <-shutdown + log.Info("metricPersist replicator shutdown started.") + metricPersist.Stop() + log.Info("metricPersist replicator ended.") + wg.Done() }() } + <-sigChan + close(shutdown) wg.Wait() log.Info("shutdown complete") log.Close() diff --git a/cmd/mt-replicator/metricpersist.go b/cmd/mt-replicator/metricpersist.go index d45aef5af5..db2402b73f 100644 --- a/cmd/mt-replicator/metricpersist.go +++ b/cmd/mt-replicator/metricpersist.go @@ -9,14 +9,14 @@ import ( "github.com/raintank/worldping-api/pkg/log" ) -type PersistRelay struct { +type PersistReplicator struct { consumer *cluster.Consumer producer sarama.SyncProducer destTopic string - Done chan struct{} + done chan struct{} } -func NewPersistRelay(srcBrokers []string, dstBrokers []string, group, srcTopic, destTopic string, initialOffset int) (*PersistRelay, error) { +func NewPersistReplicator(srcBrokers []string, dstBrokers []string, group, srcTopic, destTopic string, initialOffset int) (*PersistReplicator, error) { config := cluster.NewConfig() config.Consumer.Offsets.Initial = int64(initialOffset) config.ClientID = "mt-persist-replicator" @@ -44,46 +44,56 @@ func NewPersistRelay(srcBrokers []string, dstBrokers []string, group, srcTopic, return nil, err } - return &PersistRelay{ + return &PersistReplicator{ consumer: consumer, producer: producer, destTopic: destTopic, - Done: make(chan struct{}), + done: make(chan struct{}), }, nil } -func (c *PersistRelay) Consume() { - ticker := time.NewTicker(time.Second * 10) +func (r *PersistReplicator) Consume() { + accountingTicker := time.NewTicker(time.Second * 10) + flushTicker := time.NewTicker(time.Second) counter := 0 counterTs := time.Now() - msgChan := c.consumer.Messages() - complete := false - defer close(c.Done) + msgChan := r.consumer.Messages() + var m *sarama.ConsumerMessage + var ok bool + + buf := make([]*sarama.ProducerMessage, 0) + defer close(r.done) for { select { - case m, ok := <-msgChan: + case m, ok = <-msgChan: if !ok { + if len(buf) != 0 { + r.Flush(buf) + } return } log.Debug("received metricPersist message with key: %s", m.Key) msg := &sarama.ProducerMessage{ Key: sarama.ByteEncoder(m.Key), - Topic: c.destTopic, + Topic: r.destTopic, Value: sarama.ByteEncoder(m.Value), } - complete = false - for !complete { - _, _, err := c.producer.SendMessage(msg) - if err != nil { - log.Error(3, "failed to publish metricPersist message. %s . trying again in 1second", err) - time.Sleep(time.Second) - } else { - complete = true - } + buf = append(buf, msg) + if len(buf) >= 1000 { + r.Flush(buf) + counter += len(buf) + buf = buf[:0] + r.consumer.MarkPartitionOffset(m.Topic, m.Partition, m.Offset, "") } - counter++ - c.consumer.MarkPartitionOffset(m.Topic, m.Partition, m.Offset, "") - case t := <-ticker.C: + case <-flushTicker.C: + if len(buf) == 0 { + continue + } + r.Flush(buf) + counter += len(buf) + buf = buf[:0] + r.consumer.MarkPartitionOffset(m.Topic, m.Partition, m.Offset, "") + case t := <-accountingTicker.C: log.Info("%d metricpersist messages procesed in last %.1fseconds.", counter, t.Sub(counterTs).Seconds()) counter = 0 counterTs = t @@ -92,36 +102,24 @@ func (c *PersistRelay) Consume() { } -func (c *PersistRelay) Stop() { - c.consumer.Close() - c.producer.Close() +func (r *PersistReplicator) Flush(buf []*sarama.ProducerMessage) { + for { + err := r.producer.SendMessages(buf) + if err != nil { + log.Error(3, "failed to publish metricPersist message. %s . trying again in 1second", err) + time.Sleep(time.Second) + } else { + return + } + } } -func (c *PersistRelay) Start() { - go c.Consume() +func (r *PersistReplicator) Stop() { + r.consumer.Close() + <-r.done + r.producer.Close() } -func (c *PersistRelay) kafkaNotifications() { - for msg := range c.consumer.Notifications() { - if len(msg.Claimed) > 0 { - for topic, partitions := range msg.Claimed { - log.Info("kafka consumer claimed %d partitions on topic: %s", len(partitions), topic) - } - } - if len(msg.Released) > 0 { - for topic, partitions := range msg.Released { - log.Info("kafka consumer released %d partitions on topic: %s", len(partitions), topic) - } - } - - if len(msg.Current) == 0 { - log.Info("kafka consumer is no longer consuming from any partitions.") - } else { - log.Info("kafka Current partitions:") - for topic, partitions := range msg.Current { - log.Info("kafka Current partitions: %s: %v", topic, partitions) - } - } - } - log.Info("kafka notification processing stopped") +func (r *PersistReplicator) Start() { + go r.Consume() } diff --git a/cmd/mt-replicator/metrics.go b/cmd/mt-replicator/metrics.go new file mode 100644 index 0000000000..761c523d00 --- /dev/null +++ b/cmd/mt-replicator/metrics.go @@ -0,0 +1,175 @@ +package main + +import ( + "time" + + "github.com/Shopify/sarama" + "github.com/bsm/sarama-cluster" + "github.com/raintank/metrictank/cluster/partitioner" + "github.com/raintank/worldping-api/pkg/log" + "gopkg.in/raintank/schema.v1" +) + +func GetCompression(codec string) sarama.CompressionCodec { + switch codec { + case "none": + return sarama.CompressionNone + case "gzip": + return sarama.CompressionGZIP + case "snappy": + return sarama.CompressionSnappy + default: + log.Fatal(5, "unknown compression codec %q", codec) + return 0 // make go compiler happy, needs a return *roll eyes* + } +} + +type MetricsReplicator struct { + consumer *cluster.Consumer + producer sarama.SyncProducer + partitioner *partitioner.Kafka + destTopic string + done chan struct{} +} + +func NewMetricsReplicator(srcBrokers, dstBrokers []string, compression, group, srcTopic, destTopic string, initialOffset int, partitionScheme string) (*MetricsReplicator, error) { + config := cluster.NewConfig() + config.Consumer.Offsets.Initial = int64(initialOffset) + config.ClientID = "mt-replicator" + config.Group.Return.Notifications = true + config.ChannelBufferSize = 1000 + config.Consumer.Fetch.Min = 1 + config.Consumer.Fetch.Default = 32768 + config.Consumer.MaxWaitTime = time.Second + config.Consumer.MaxProcessingTime = time.Second + config.Config.Version = sarama.V0_10_0_0 + config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message + config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message + config.Producer.Compression = GetCompression(compression) + err := config.Validate() + if err != nil { + return nil, err + } + consumer, err := cluster.NewConsumer(srcBrokers, group, []string{srcTopic}, config) + if err != nil { + return nil, err + } + producer, err := sarama.NewSyncProducer(dstBrokers, &config.Config) + if err != nil { + return nil, err + } + partitioner, err := partitioner.NewKafka(partitionScheme) + if err != nil { + return nil, err + } + + return &MetricsReplicator{ + consumer: consumer, + producer: producer, + partitioner: partitioner, + destTopic: destTopic, + done: make(chan struct{}), + }, nil +} + +func (r *MetricsReplicator) Consume() { + buf := make([]*schema.MetricData, 0) + accountingTicker := time.NewTicker(time.Second * 10) + flushTicker := time.NewTicker(time.Second) + counter := 0 + counterTs := time.Now() + msgChan := r.consumer.Messages() + var m *sarama.ConsumerMessage + var ok bool + + defer close(r.done) + for { + select { + case m, ok = <-msgChan: + if !ok { + if len(buf) != 0 { + r.Flush(buf) + } + return + } + md := &schema.MetricData{} + _, err := md.UnmarshalMsg(m.Value) + if err != nil { + log.Error(3, "kafka-mdm decode error, skipping message. %s", err) + continue + } + + buf = append(buf, md) + if len(buf) > 1000 { + r.Flush(buf) + counter += len(buf) + buf = buf[:0] + r.consumer.MarkPartitionOffset(m.Topic, m.Partition, m.Offset, "") + } + case <-flushTicker.C: + if len(buf) == 0 { + continue + } + r.Flush(buf) + counter += len(buf) + buf = buf[:0] + r.consumer.MarkPartitionOffset(m.Topic, m.Partition, m.Offset, "") + case t := <-accountingTicker.C: + log.Info("%d metrics procesed in last %.1fseconds.", counter, t.Sub(counterTs).Seconds()) + counter = 0 + counterTs = t + } + } + +} + +func (r *MetricsReplicator) Stop() { + r.consumer.Close() + <-r.done + r.producer.Close() +} + +func (r *MetricsReplicator) Start() { + go r.Consume() +} + +func (r *MetricsReplicator) Flush(metrics []*schema.MetricData) { + if len(metrics) == 0 { + return + } + + payload := make([]*sarama.ProducerMessage, len(metrics)) + + for i, metric := range metrics { + data, err := metric.MarshalMsg(nil) + if err != nil { + log.Fatal(4, "Failed to Marshal metric. %s", err) + } + + key, err := r.partitioner.GetPartitionKey(metric, nil) + if err != nil { + log.Fatal(4, "Failed to get partitionKey for metric. %s", err) + } + + payload[i] = &sarama.ProducerMessage{ + Key: sarama.ByteEncoder(key), + Topic: r.destTopic, + Value: sarama.ByteEncoder(data), + } + + } + for { + err := r.producer.SendMessages(payload) + if err != nil { + if errors, ok := err.(sarama.ProducerErrors); ok { + for i := 0; i < 10 && i < len(errors); i++ { + log.Error(4, "ProducerError %d/%d: %s", i, len(errors), errors[i].Error()) + } + } else { + log.Error(4, "ProducerError %s", err) + } + } else { + return + } + } +} diff --git a/cmd/mt-replicator/publish.go b/cmd/mt-replicator/publish.go deleted file mode 100644 index 30741d3772..0000000000 --- a/cmd/mt-replicator/publish.go +++ /dev/null @@ -1,98 +0,0 @@ -package main - -import ( - "github.com/Shopify/sarama" - part "github.com/raintank/metrictank/cluster/partitioner" - "github.com/raintank/worldping-api/pkg/log" - "gopkg.in/raintank/schema.v1" -) - -type Publisher struct { - topic string - producer sarama.SyncProducer - partitioner *part.Kafka -} - -func GetCompression(codec string) sarama.CompressionCodec { - switch codec { - case "none": - return sarama.CompressionNone - case "gzip": - return sarama.CompressionGZIP - case "snappy": - return sarama.CompressionSnappy - default: - log.Fatal(5, "unknown compression codec %q", codec) - return 0 // make go compiler happy, needs a return *roll eyes* - } -} - -func NewPublisher(brokers []string, topic string, compression string, partitionScheme string) (*Publisher, error) { - // We are looking for strong consistency semantics. - // Because we don't change the flush settings, sarama will try to produce messages - // as fast as possible to keep latency low. - config := sarama.NewConfig() - config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message - config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message - config.Producer.Compression = GetCompression(compression) - err := config.Validate() - if err != nil { - return nil, err - } - - client, err := sarama.NewSyncProducer(brokers, config) - if err != nil { - return nil, err - } - partitioner, err := part.NewKafka(partitionScheme) - if err != nil { - return nil, err - } - - return &Publisher{ - topic: topic, - producer: client, - partitioner: partitioner, - }, nil -} - -func (p *Publisher) Stop() error { - return p.producer.Close() -} - -func (p *Publisher) Send(metrics []*schema.MetricData) error { - if len(metrics) == 0 { - return nil - } - - payload := make([]*sarama.ProducerMessage, len(metrics)) - - for i, metric := range metrics { - data, err := metric.MarshalMsg(nil) - if err != nil { - return err - } - - key, err := p.partitioner.GetPartitionKey(metric, nil) - if err != nil { - return err - } - - payload[i] = &sarama.ProducerMessage{ - Key: sarama.ByteEncoder(key), - Topic: p.topic, - Value: sarama.ByteEncoder(data), - } - - } - err := p.producer.SendMessages(payload) - if err != nil { - if errors, ok := err.(sarama.ProducerErrors); ok { - for i := 0; i < 10 && i < len(errors); i++ { - log.Error(4, "ProducerError %d/%d: %s", i, len(errors), errors[i].Error()) - } - } - return err - } - return nil -} From cbee7ee6a967810d6833f61b50f09443a81cad3c Mon Sep 17 00:00:00 2001 From: Anthony Woods Date: Thu, 16 Feb 2017 23:45:37 +0800 Subject: [PATCH 6/6] remove redundant check of buffer length --- cmd/mt-replicator/metrics.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/cmd/mt-replicator/metrics.go b/cmd/mt-replicator/metrics.go index 761c523d00..9939353423 100644 --- a/cmd/mt-replicator/metrics.go +++ b/cmd/mt-replicator/metrics.go @@ -134,10 +134,6 @@ func (r *MetricsReplicator) Start() { } func (r *MetricsReplicator) Flush(metrics []*schema.MetricData) { - if len(metrics) == 0 { - return - } - payload := make([]*sarama.ProducerMessage, len(metrics)) for i, metric := range metrics {