diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index eee1f6d3e5e0..3d03ec213419 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -26,7 +26,31 @@ import ( "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" ) -type consumer struct { +// SaramaConsumer is an interface to features of Sarama that we use +type SaramaConsumer interface { + Partitions() <-chan sc.PartitionConsumer + MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) + io.Closer +} + +// Config stores the configuration for a Consumer +type Config struct { + Topic string `yaml:"topic"` + GroupID string `yaml:"group_id"` + Brokers []string `yaml:"brokers"` + Parallelism int `yaml:"parallelism"` +} + +// Params are the parameters of a Consumer +type Params struct { + Config Config + Processor processor.SpanProcessor + Factory metrics.Factory `name:"service_metrics"` + Logger *zap.Logger +} + +// Consumer uses sarama to consume messages from kafka and handle +type Consumer struct { metricsFactory metrics.Factory logger *zap.Logger processorFactory processorFactory @@ -37,34 +61,62 @@ type consumer struct { SaramaConsumer } -// SaramaConsumer is an interface to features of Sarama that we use -type SaramaConsumer interface { - Partitions() <-chan sc.PartitionConsumer - MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) - io.Closer +// New is a constructor for a Consumer +func New(params Params) (Consumer, error) { + saramaConfig := sc.NewConfig() + saramaConfig.Group.Mode = sc.ConsumerModePartitions + saramaConsumer, err := sc.NewConsumer(params.Config.Brokers, params.Config.GroupID, []string{params.Config.Topic}, saramaConfig) + if err != nil { + return Consumer{}, err + } + return Consumer{ + metricsFactory: params.Factory, + logger: params.Logger, + close: make(chan struct{}, 1), + isClosed: sync.WaitGroup{}, + SaramaConsumer: saramaConsumer, + processorFactory: processorFactory{ + topic: params.Config.Topic, + consumer: saramaConsumer, + metricsFactory: params.Factory, + logger: params.Logger, + baseProcessor: params.Processor, + parallelism: params.Config.Parallelism, + }, + }, nil } -func (c *consumer) mainLoop() { +// Start begins consuming messages in a go routine +func (c *Consumer) Start() { c.isClosed.Add(1) c.logger.Info("Starting main loop") - go func() { - for { - select { - case pc := <-c.Partitions(): - c.isClosed.Add(2) - - go c.handleMessages(pc) - go c.handleErrors(pc.Partition(), pc.Errors()) - - case <-c.close: - c.isClosed.Done() - return - } + go c.mainLoop() +} + +// Close closes the Consumer and underlying sarama consumer +func (c *Consumer) Close() error { + close(c.close) + c.isClosed.Wait() + return c.SaramaConsumer.Close() +} + +func (c *Consumer) mainLoop() { + for { + select { + case pc := <-c.Partitions(): + c.isClosed.Add(2) + + go c.handleMessages(pc) + go c.handleErrors(pc.Partition(), pc.Errors()) + + case <-c.close: + c.isClosed.Done() + return } - }() + } } -func (c *consumer) handleMessages(pc sc.PartitionConsumer) { +func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { c.logger.Info("Starting message handler") defer c.isClosed.Done() defer c.closePartition(pc) @@ -87,13 +139,13 @@ func (c *consumer) handleMessages(pc sc.PartitionConsumer) { } } -func (c *consumer) closePartition(partitionConsumer sc.PartitionConsumer) { +func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) { c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition())) partitionConsumer.Close() // blocks until messages channel is drained c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition())) } -func (c *consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) { +func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) { c.logger.Info("Starting error handler") defer c.isClosed.Done() @@ -103,9 +155,3 @@ func (c *consumer) handleErrors(partition int32, errChan <-chan *sarama.Consumer c.logger.Error("Error consuming from Kafka", zap.Error(err)) } } - -func (c *consumer) Close() error { - close(c.close) - c.isClosed.Wait() - return c.SaramaConsumer.Close() -} diff --git a/cmd/ingester/app/consumer/consumer_metrics.go b/cmd/ingester/app/consumer/consumer_metrics.go index 526856a2d548..4d760978e118 100644 --- a/cmd/ingester/app/consumer/consumer_metrics.go +++ b/cmd/ingester/app/consumer/consumer_metrics.go @@ -30,7 +30,7 @@ type errMetrics struct { errCounter metrics.Counter } -func (c *consumer) newMsgMetrics(partition int32) msgMetrics { +func (c *Consumer) newMsgMetrics(partition int32) msgMetrics { f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))}) return msgMetrics{ counter: f.Counter("messages", nil), @@ -39,7 +39,7 @@ func (c *consumer) newMsgMetrics(partition int32) msgMetrics { } } -func (c *consumer) newErrMetrics(partition int32) errMetrics { +func (c *Consumer) newErrMetrics(partition int32) errMetrics { f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))}) return errMetrics{errCounter: f.Counter("errors", nil)} diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index e85eb2842349..39432c71e563 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -36,7 +36,7 @@ import ( type consumerTest struct { saramaConsumer *kmocks.SaramaConsumer - consumer *consumer + consumer *Consumer partitionConsumer *kmocks.PartitionConsumer } @@ -46,7 +46,7 @@ func withWrappedConsumer(fn func(c *consumerTest)) { metricsFactory := metrics.NewLocalFactory(0) c := &consumerTest{ saramaConsumer: sc, - consumer: &consumer{ + consumer: &Consumer{ metricsFactory: metricsFactory, logger: logger, close: make(chan struct{}), @@ -106,7 +106,7 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) { mp.On("Process", &saramaMessageWrapper{msg}).Return(nil) c.consumer.processorFactory.baseProcessor = mp - c.consumer.mainLoop() + c.consumer.Start() time.Sleep(100 * time.Millisecond) close(msgCh) close(errCh) @@ -149,7 +149,7 @@ func TestSaramaConsumerWrapper_start_Errors(t *testing.T) { c.partitionConsumer.On("Messages").Return((<-chan *sarama.ConsumerMessage)(msgCh)) c.partitionConsumer.On("Close").Return(nil) - c.consumer.mainLoop() + c.consumer.Start() time.Sleep(100 * time.Millisecond) close(msgCh) close(errCh)