Skip to content

Commit

Permalink
Get rid of sarama-cluster, use sarama consumer group instead.
Browse files Browse the repository at this point in the history
Signed-off-by: Ruben Vargas <ruben.vp8510@gmail.com>
  • Loading branch information
rubenvp8510 committed Jan 7, 2020
1 parent 08a0a4b commit 5688027
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 141 deletions.
2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ required = [

[[constraint]]
name = "github.com/Shopify/sarama"
revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"
revision = "1.24.1"

[[constraint]]
name = "github.com/grpc-ecosystem/go-grpc-middleware"
Expand Down
2 changes: 0 additions & 2 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
factoryParams := consumer.ProcessorFactoryParams{
Topic: options.Topic,
Parallelism: options.Parallelism,
SaramaConsumer: saramaConsumer,
BaseProcessor: spanProcessor,
Logger: logger,
Factory: metricsFactory,
Expand All @@ -81,7 +80,6 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
ProcessorFactory: *processorFactory,
MetricsFactory: metricsFactory,
Logger: logger,
DeadlockCheckInterval: options.DeadlockInterval,
}
return consumer.New(consumerParams)
}
194 changes: 78 additions & 116 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ package consumer

import (
"sync"
"time"
"context"

"github.com/Shopify/sarama"
sc "github.com/bsm/sarama-cluster"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

Expand All @@ -29,153 +28,116 @@ import (

// Params are the parameters of a Consumer
type Params struct {
ProcessorFactory ProcessorFactory
MetricsFactory metrics.Factory
Logger *zap.Logger
InternalConsumer consumer.Consumer
DeadlockCheckInterval time.Duration
ProcessorFactory ProcessorFactory
MetricsFactory metrics.Factory
Logger *zap.Logger
InternalConsumer consumer.Consumer
}

// Consumer uses sarama to consume and handle messages from kafka
type Consumer struct {
metricsFactory metrics.Factory
logger *zap.Logger
metricsFactory metrics.Factory
logger *zap.Logger
internalConsumer consumer.Consumer
processorFactory ProcessorFactory
partitionsHeld int64
partitionsHeldGauge metrics.Gauge
}

internalConsumer consumer.Consumer
processorFactory ProcessorFactory
type consumerGroupHandler struct {
processorFactory ProcessorFactory
partitionToProcessor map[int32]processor.SpanProcessor
logger *zap.Logger
consumer *Consumer
partitionToProcessorLock sync.RWMutex
}

deadlockDetector deadlockDetector
func (h *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
h.partitionToProcessor = map[int32]processor.SpanProcessor{}
return nil
}

partitionIDToState map[int32]*consumerState
partitionMapLock sync.Mutex
partitionsHeld int64
partitionsHeldGauge metrics.Gauge
func (h *consumerGroupHandler) getProcessFactory(session sarama.ConsumerGroupSession, partition int32, offset int64) processor.SpanProcessor {
h.partitionToProcessorLock.RLock()
msgProcessor := h.partitionToProcessor[partition]
h.partitionToProcessorLock.RUnlock()
if msgProcessor == nil {
msgProcessor = h.processorFactory.new(session, partition, offset-1)
h.partitionToProcessorLock.Lock()
h.partitionToProcessor[partition] = msgProcessor
h.partitionToProcessorLock.Unlock()
}
return msgProcessor
}
func (h *consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
msgMetrics := h.consumer.newMsgMetrics(claim.Partition())

type consumerState struct {
wg sync.WaitGroup
partitionConsumer sc.PartitionConsumer
for {
select {
case msg, ok := <-claim.Messages():
if !ok {
h.logger.Info("Message channel closed. ", zap.Int32("partition", claim.Partition()))
return nil
}
h.logger.Debug("Got msg", zap.Any("msg", msg))
msgMetrics.counter.Inc(1)
msgMetrics.offsetGauge.Update(msg.Offset)
msgMetrics.lagGauge.Update(claim.HighWaterMarkOffset() - msg.Offset - 1)
msgProcessor := h.getProcessFactory(sess, claim.Partition(), msg.Offset)
msgProcessor.Process(&saramaMessageWrapper{msg})
}
}
return nil
}

// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, params.DeadlockCheckInterval)
return &Consumer{
metricsFactory: params.MetricsFactory,
logger: params.Logger,
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory),
}, nil
}

// Start begins consuming messages in a go routine
func (c *Consumer) Start() {
c.deadlockDetector.start()
go func() {
c.logger.Info("Starting main loop")
for pc := range c.internalConsumer.Partitions() {
c.partitionMapLock.Lock()
if p, ok := c.partitionIDToState[pc.Partition()]; ok {
// This is a guard against simultaneously draining messages
// from the last time the partition was assigned and
// processing new messages for the same partition, which may lead
// to the cleanup process not completing
p.wg.Wait()
ctx := context.Background()
handler := consumerGroupHandler{
processorFactory: c.processorFactory,
logger: c.logger,
consumer: c,
}
defer func() { _ = c.internalConsumer.Close() }()

go func() {
for err := range c.internalConsumer.Errors() {
if error, ok := err.(*sarama.ConsumerError); ok {
c.logger.Info("Starting error handler", zap.Int32("partition", error.Partition))
errMetrics := c.newErrMetrics(error.Partition)
errMetrics.errCounter.Inc(1)
c.logger.Error("Error consuming from Kafka", zap.Error(err))
}
}
c.partitionIDToState[pc.Partition()] = &consumerState{partitionConsumer: pc}
c.partitionIDToState[pc.Partition()].wg.Add(2)
c.partitionMapLock.Unlock()
c.partitionMetrics(pc.Partition()).startCounter.Inc(1)
go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())
c.logger.Info("Finished handling errors")
}()

for {
err := c.internalConsumer.Consume(ctx, &handler)
if err != nil {
panic(err)
}

}
}()
}

// Close closes the Consumer and underlying sarama consumer
func (c *Consumer) Close() error {
c.partitionMapLock.Lock()
for _, p := range c.partitionIDToState {
c.closePartition(p.partitionConsumer)
p.wg.Wait()
}
c.partitionMapLock.Unlock()
c.deadlockDetector.close()
c.logger.Info("Closing parent consumer")
return c.internalConsumer.Close()
}

func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
c.partitionMapLock.Lock()
c.partitionsHeld++
c.partitionsHeldGauge.Update(c.partitionsHeld)
wg := &c.partitionIDToState[pc.Partition()].wg
c.partitionMapLock.Unlock()
defer func() {
c.closePartition(pc)
wg.Done()
c.partitionMapLock.Lock()
c.partitionsHeld--
c.partitionsHeldGauge.Update(c.partitionsHeld)
c.partitionMapLock.Unlock()
}()

msgMetrics := c.newMsgMetrics(pc.Partition())

var msgProcessor processor.SpanProcessor

deadlockDetector := c.deadlockDetector.startMonitoringForPartition(pc.Partition())
defer deadlockDetector.close()

for {
select {
case msg, ok := <-pc.Messages():
if !ok {
c.logger.Info("Message channel closed. ", zap.Int32("partition", pc.Partition()))
return
}
c.logger.Debug("Got msg", zap.Any("msg", msg))
msgMetrics.counter.Inc(1)
msgMetrics.offsetGauge.Update(msg.Offset)
msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1)
deadlockDetector.incrementMsgCount()

if msgProcessor == nil {
msgProcessor = c.processorFactory.new(pc.Partition(), msg.Offset-1)
defer msgProcessor.Close()
}

msgProcessor.Process(saramaMessageWrapper{msg})

case <-deadlockDetector.closePartitionChannel():
c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", pc.Partition()))
return
}
}
}

func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
partitionConsumer.Close() // blocks until messages channel is drained
c.partitionMetrics(partitionConsumer.Partition()).closeCounter.Inc(1)
c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
}

func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
c.logger.Info("Starting error handler", zap.Int32("partition", partition))
c.partitionMapLock.Lock()
wg := &c.partitionIDToState[partition].wg
c.partitionMapLock.Unlock()
defer wg.Done()

errMetrics := c.newErrMetrics(partition)
for err := range errChan {
errMetrics.errCounter.Inc(1)
c.logger.Error("Error consuming from Kafka", zap.Error(err))
}
c.logger.Info("Finished handling errors", zap.Int32("partition", partition))
}
9 changes: 3 additions & 6 deletions cmd/ingester/app/consumer/processor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package consumer

import (
"github.com/Shopify/sarama"
"io"

"github.com/uber/jaeger-lib/metrics"
Expand All @@ -23,15 +24,13 @@ import (
"github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/offset"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor/decorator"
"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
)

// ProcessorFactoryParams are the parameters of a ProcessorFactory
type ProcessorFactoryParams struct {
Parallelism int
Topic string
BaseProcessor processor.SpanProcessor
SaramaConsumer consumer.Consumer
Factory metrics.Factory
Logger *zap.Logger
RetryOptions []decorator.RetryOption
Expand All @@ -40,7 +39,6 @@ type ProcessorFactoryParams struct {
// ProcessorFactory is a factory for creating startedProcessors
type ProcessorFactory struct {
topic string
consumer consumer.Consumer
metricsFactory metrics.Factory
logger *zap.Logger
baseProcessor processor.SpanProcessor
Expand All @@ -52,7 +50,6 @@ type ProcessorFactory struct {
func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, error) {
return &ProcessorFactory{
topic: params.Topic,
consumer: params.SaramaConsumer,
metricsFactory: params.Factory,
logger: params.Logger,
baseProcessor: params.BaseProcessor,
Expand All @@ -61,11 +58,11 @@ func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, erro
}, nil
}

func (c *ProcessorFactory) new(partition int32, minOffset int64) processor.SpanProcessor {
func (c *ProcessorFactory) new(session sarama.ConsumerGroupSession, partition int32, minOffset int64) processor.SpanProcessor {
c.logger.Info("Creating new processors", zap.Int32("partition", partition))

markOffset := func(offset int64) {
c.consumer.MarkPartitionOffset(c.topic, partition, offset, "")
session.MarkOffset(c.topic, partition, offset, "")
}

om := offset.NewManager(minOffset, markOffset, partition, c.metricsFactory)
Expand Down
Loading

0 comments on commit 5688027

Please sign in to comment.