Skip to content

Commit

Permalink
Export consumer
Browse files Browse the repository at this point in the history
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
  • Loading branch information
Davit Yeghshatyan committed Jul 20, 2018
1 parent 933efb3 commit 1561fcb
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 36 deletions.
106 changes: 76 additions & 30 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,31 @@ import (
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
)

type consumer struct {
// SaramaConsumer is an interface to features of Sarama that we use
type SaramaConsumer interface {
Partitions() <-chan sc.PartitionConsumer
MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
io.Closer
}

// Config stores the configuration for a Consumer
type Config struct {
Topic string `yaml:"topic"`
GroupID string `yaml:"group_id"`
Brokers []string `yaml:"brokers"`
Parallelism int `yaml:"parallelism"`
}

// Params are the parameters of a Consumer
type Params struct {
Config Config
Processor processor.SpanProcessor
Factory metrics.Factory `name:"service_metrics"`
Logger *zap.Logger
}

// Consumer uses sarama to consume messages from kafka and handle
type Consumer struct {
metricsFactory metrics.Factory
logger *zap.Logger
processorFactory processorFactory
Expand All @@ -37,34 +61,62 @@ type consumer struct {
SaramaConsumer
}

// SaramaConsumer is an interface to features of Sarama that we use
type SaramaConsumer interface {
Partitions() <-chan sc.PartitionConsumer
MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
io.Closer
// New is a constructor for a Consumer
func New(params Params) (Consumer, error) {
saramaConfig := sc.NewConfig()
saramaConfig.Group.Mode = sc.ConsumerModePartitions
saramaConsumer, err := sc.NewConsumer(params.Config.Brokers, params.Config.GroupID, []string{params.Config.Topic}, saramaConfig)
if err != nil {
return Consumer{}, err
}
return Consumer{
metricsFactory: params.Factory,
logger: params.Logger,
close: make(chan struct{}, 1),
isClosed: sync.WaitGroup{},
SaramaConsumer: saramaConsumer,
processorFactory: processorFactory{
topic: params.Config.Topic,
consumer: saramaConsumer,
metricsFactory: params.Factory,
logger: params.Logger,
baseProcessor: params.Processor,
parallelism: params.Config.Parallelism,
},
}, nil
}

func (c *consumer) mainLoop() {
// Start begins consuming messages in a go routine
func (c *Consumer) Start() {
c.isClosed.Add(1)
c.logger.Info("Starting main loop")
go func() {
for {
select {
case pc := <-c.Partitions():
c.isClosed.Add(2)

go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())

case <-c.close:
c.isClosed.Done()
return
}
go c.mainLoop()
}

// Close closes the Consumer and underlying sarama consumer
func (c *Consumer) Close() error {
close(c.close)
c.isClosed.Wait()
return c.SaramaConsumer.Close()
}

func (c *Consumer) mainLoop() {
for {
select {
case pc := <-c.Partitions():
c.isClosed.Add(2)

go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())

case <-c.close:
c.isClosed.Done()
return
}
}()
}
}

func (c *consumer) handleMessages(pc sc.PartitionConsumer) {
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler")
defer c.isClosed.Done()
defer c.closePartition(pc)
Expand All @@ -87,13 +139,13 @@ func (c *consumer) handleMessages(pc sc.PartitionConsumer) {
}
}

func (c *consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
partitionConsumer.Close() // blocks until messages channel is drained
c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
}

func (c *consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
c.logger.Info("Starting error handler")
defer c.isClosed.Done()

Expand All @@ -103,9 +155,3 @@ func (c *consumer) handleErrors(partition int32, errChan <-chan *sarama.Consumer
c.logger.Error("Error consuming from Kafka", zap.Error(err))
}
}

func (c *consumer) Close() error {
close(c.close)
c.isClosed.Wait()
return c.SaramaConsumer.Close()
}
4 changes: 2 additions & 2 deletions cmd/ingester/app/consumer/consumer_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type errMetrics struct {
errCounter metrics.Counter
}

func (c *consumer) newMsgMetrics(partition int32) msgMetrics {
func (c *Consumer) newMsgMetrics(partition int32) msgMetrics {
f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
return msgMetrics{
counter: f.Counter("messages", nil),
Expand All @@ -39,7 +39,7 @@ func (c *consumer) newMsgMetrics(partition int32) msgMetrics {
}
}

func (c *consumer) newErrMetrics(partition int32) errMetrics {
func (c *Consumer) newErrMetrics(partition int32) errMetrics {
f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
return errMetrics{errCounter: f.Counter("errors", nil)}

Expand Down
8 changes: 4 additions & 4 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (

type consumerTest struct {
saramaConsumer *kmocks.SaramaConsumer
consumer *consumer
consumer *Consumer
partitionConsumer *kmocks.PartitionConsumer
}

Expand All @@ -46,7 +46,7 @@ func withWrappedConsumer(fn func(c *consumerTest)) {
metricsFactory := metrics.NewLocalFactory(0)
c := &consumerTest{
saramaConsumer: sc,
consumer: &consumer{
consumer: &Consumer{
metricsFactory: metricsFactory,
logger: logger,
close: make(chan struct{}),
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
mp.On("Process", &saramaMessageWrapper{msg}).Return(nil)
c.consumer.processorFactory.baseProcessor = mp

c.consumer.mainLoop()
c.consumer.Start()
time.Sleep(100 * time.Millisecond)
close(msgCh)
close(errCh)
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestSaramaConsumerWrapper_start_Errors(t *testing.T) {
c.partitionConsumer.On("Messages").Return((<-chan *sarama.ConsumerMessage)(msgCh))
c.partitionConsumer.On("Close").Return(nil)

c.consumer.mainLoop()
c.consumer.Start()
time.Sleep(100 * time.Millisecond)
close(msgCh)
close(errCh)
Expand Down

0 comments on commit 1561fcb

Please sign in to comment.