Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get rid of sarama-cluster, use sarama consumer group instead. #2009

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ required = [

[[constraint]]
name = "github.com/Shopify/sarama"
revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"
revision = "1.24.1"

[[constraint]]
name = "github.com/grpc-ecosystem/go-grpc-middleware"
Expand Down
2 changes: 0 additions & 2 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
factoryParams := consumer.ProcessorFactoryParams{
Topic: options.Topic,
Parallelism: options.Parallelism,
SaramaConsumer: saramaConsumer,
BaseProcessor: spanProcessor,
Logger: logger,
Factory: metricsFactory,
Expand All @@ -81,7 +80,6 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
ProcessorFactory: *processorFactory,
MetricsFactory: metricsFactory,
Logger: logger,
DeadlockCheckInterval: options.DeadlockInterval,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, this can be configured via CLI. You might need to mark it as "deprecated" in the CLI help text + add an entry to the changelog.

}
return consumer.New(consumerParams)
}
194 changes: 78 additions & 116 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ package consumer

import (
"sync"
"time"
"context"

"github.com/Shopify/sarama"
sc "github.com/bsm/sarama-cluster"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

Expand All @@ -29,153 +28,116 @@ import (

// Params are the parameters of a Consumer
type Params struct {
ProcessorFactory ProcessorFactory
MetricsFactory metrics.Factory
Logger *zap.Logger
InternalConsumer consumer.Consumer
DeadlockCheckInterval time.Duration
ProcessorFactory ProcessorFactory
MetricsFactory metrics.Factory
Logger *zap.Logger
InternalConsumer consumer.Consumer
}

// Consumer uses sarama to consume and handle messages from kafka
type Consumer struct {
metricsFactory metrics.Factory
logger *zap.Logger
metricsFactory metrics.Factory
logger *zap.Logger
internalConsumer consumer.Consumer
processorFactory ProcessorFactory
partitionsHeld int64
partitionsHeldGauge metrics.Gauge
}

internalConsumer consumer.Consumer
processorFactory ProcessorFactory
type consumerGroupHandler struct {
processorFactory ProcessorFactory
partitionToProcessor map[int32]processor.SpanProcessor
logger *zap.Logger
consumer *Consumer
partitionToProcessorLock sync.RWMutex
}

deadlockDetector deadlockDetector
func (h *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
h.partitionToProcessor = map[int32]processor.SpanProcessor{}
return nil
}

partitionIDToState map[int32]*consumerState
partitionMapLock sync.Mutex
partitionsHeld int64
partitionsHeldGauge metrics.Gauge
func (h *consumerGroupHandler) getProcessFactory(session sarama.ConsumerGroupSession, partition int32, offset int64) processor.SpanProcessor {
h.partitionToProcessorLock.RLock()
msgProcessor := h.partitionToProcessor[partition]
h.partitionToProcessorLock.RUnlock()
if msgProcessor == nil {
msgProcessor = h.processorFactory.new(session, partition, offset-1)
h.partitionToProcessorLock.Lock()
h.partitionToProcessor[partition] = msgProcessor
h.partitionToProcessorLock.Unlock()
}
return msgProcessor
}
func (h *consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
msgMetrics := h.consumer.newMsgMetrics(claim.Partition())

type consumerState struct {
wg sync.WaitGroup
partitionConsumer sc.PartitionConsumer
for {
select {
case msg, ok := <-claim.Messages():
if !ok {
h.logger.Info("Message channel closed. ", zap.Int32("partition", claim.Partition()))
return nil
}
h.logger.Debug("Got msg", zap.Any("msg", msg))
msgMetrics.counter.Inc(1)
msgMetrics.offsetGauge.Update(msg.Offset)
msgMetrics.lagGauge.Update(claim.HighWaterMarkOffset() - msg.Offset - 1)
msgProcessor := h.getProcessFactory(sess, claim.Partition(), msg.Offset)
msgProcessor.Process(&saramaMessageWrapper{msg})
}
}
return nil
}

// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, params.DeadlockCheckInterval)
return &Consumer{
metricsFactory: params.MetricsFactory,
logger: params.Logger,
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory),
}, nil
}

// Start begins consuming messages in a go routine
func (c *Consumer) Start() {
c.deadlockDetector.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this was added after the first implementation because of a specific problem that Uber experienced, although I don't remember what the problem was exactly. Are you aware of that? Is it safe to remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm aware of that, and still not sure if this should be removed, Is something I'm investigating.. it seems like the problem was solved in this implementation. but still not 100% sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe deprecate as Juca suggests, but only start the deadlock detector if internal > 0 (I think 0 is the default and means no deadlock detection used)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this is not necesary anymore, the issue was fixed on sarama. IBM/sarama#1156

Anyway, I agree with the idea of preserve this deadlockDetector, just in case, and mark it as deprecated. I'll reintroduce it.

go func() {
c.logger.Info("Starting main loop")
for pc := range c.internalConsumer.Partitions() {
c.partitionMapLock.Lock()
if p, ok := c.partitionIDToState[pc.Partition()]; ok {
// This is a guard against simultaneously draining messages
// from the last time the partition was assigned and
// processing new messages for the same partition, which may lead
// to the cleanup process not completing
p.wg.Wait()
ctx := context.Background()
handler := consumerGroupHandler{
processorFactory: c.processorFactory,
logger: c.logger,
consumer: c,
}
defer func() { _ = c.internalConsumer.Close() }()

go func() {
for err := range c.internalConsumer.Errors() {
if error, ok := err.(*sarama.ConsumerError); ok {
c.logger.Info("Starting error handler", zap.Int32("partition", error.Partition))
errMetrics := c.newErrMetrics(error.Partition)
errMetrics.errCounter.Inc(1)
c.logger.Error("Error consuming from Kafka", zap.Error(err))
}
}
c.partitionIDToState[pc.Partition()] = &consumerState{partitionConsumer: pc}
c.partitionIDToState[pc.Partition()].wg.Add(2)
c.partitionMapLock.Unlock()
c.partitionMetrics(pc.Partition()).startCounter.Inc(1)
go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())
c.logger.Info("Finished handling errors")
}()

for {
err := c.internalConsumer.Consume(ctx, &handler)
if err != nil {
panic(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's rude. Do you really want a panic here?

Copy link
Contributor Author

@rubenvp8510 rubenvp8510 Jan 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, not really, I'll remove it. I copied from an example of how to use the consumer group, but it shouldn't be here.

}

}
}()
}

// Close closes the Consumer and underlying sarama consumer
func (c *Consumer) Close() error {
c.partitionMapLock.Lock()
for _, p := range c.partitionIDToState {
c.closePartition(p.partitionConsumer)
p.wg.Wait()
}
c.partitionMapLock.Unlock()
c.deadlockDetector.close()
c.logger.Info("Closing parent consumer")
return c.internalConsumer.Close()
}

func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
c.partitionMapLock.Lock()
c.partitionsHeld++
c.partitionsHeldGauge.Update(c.partitionsHeld)
wg := &c.partitionIDToState[pc.Partition()].wg
c.partitionMapLock.Unlock()
defer func() {
c.closePartition(pc)
wg.Done()
c.partitionMapLock.Lock()
c.partitionsHeld--
c.partitionsHeldGauge.Update(c.partitionsHeld)
c.partitionMapLock.Unlock()
}()

msgMetrics := c.newMsgMetrics(pc.Partition())

var msgProcessor processor.SpanProcessor

deadlockDetector := c.deadlockDetector.startMonitoringForPartition(pc.Partition())
defer deadlockDetector.close()

for {
select {
case msg, ok := <-pc.Messages():
if !ok {
c.logger.Info("Message channel closed. ", zap.Int32("partition", pc.Partition()))
return
}
c.logger.Debug("Got msg", zap.Any("msg", msg))
msgMetrics.counter.Inc(1)
msgMetrics.offsetGauge.Update(msg.Offset)
msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1)
deadlockDetector.incrementMsgCount()

if msgProcessor == nil {
msgProcessor = c.processorFactory.new(pc.Partition(), msg.Offset-1)
defer msgProcessor.Close()
}

msgProcessor.Process(saramaMessageWrapper{msg})

case <-deadlockDetector.closePartitionChannel():
c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", pc.Partition()))
return
}
}
}

func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
partitionConsumer.Close() // blocks until messages channel is drained
c.partitionMetrics(partitionConsumer.Partition()).closeCounter.Inc(1)
c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
}

func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
c.logger.Info("Starting error handler", zap.Int32("partition", partition))
c.partitionMapLock.Lock()
wg := &c.partitionIDToState[partition].wg
c.partitionMapLock.Unlock()
defer wg.Done()

errMetrics := c.newErrMetrics(partition)
for err := range errChan {
errMetrics.errCounter.Inc(1)
c.logger.Error("Error consuming from Kafka", zap.Error(err))
}
c.logger.Info("Finished handling errors", zap.Int32("partition", partition))
}
9 changes: 3 additions & 6 deletions cmd/ingester/app/consumer/processor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package consumer

import (
"github.com/Shopify/sarama"
"io"

"github.com/uber/jaeger-lib/metrics"
Expand All @@ -23,15 +24,13 @@ import (
"github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/offset"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor/decorator"
"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
)

// ProcessorFactoryParams are the parameters of a ProcessorFactory
type ProcessorFactoryParams struct {
Parallelism int
Topic string
BaseProcessor processor.SpanProcessor
SaramaConsumer consumer.Consumer
Factory metrics.Factory
Logger *zap.Logger
RetryOptions []decorator.RetryOption
Expand All @@ -40,7 +39,6 @@ type ProcessorFactoryParams struct {
// ProcessorFactory is a factory for creating startedProcessors
type ProcessorFactory struct {
topic string
consumer consumer.Consumer
metricsFactory metrics.Factory
logger *zap.Logger
baseProcessor processor.SpanProcessor
Expand All @@ -52,7 +50,6 @@ type ProcessorFactory struct {
func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, error) {
return &ProcessorFactory{
topic: params.Topic,
consumer: params.SaramaConsumer,
metricsFactory: params.Factory,
logger: params.Logger,
baseProcessor: params.BaseProcessor,
Expand All @@ -61,11 +58,11 @@ func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, erro
}, nil
}

func (c *ProcessorFactory) new(partition int32, minOffset int64) processor.SpanProcessor {
func (c *ProcessorFactory) new(session sarama.ConsumerGroupSession, partition int32, minOffset int64) processor.SpanProcessor {
c.logger.Info("Creating new processors", zap.Int32("partition", partition))

markOffset := func(offset int64) {
c.consumer.MarkPartitionOffset(c.topic, partition, offset, "")
session.MarkOffset(c.topic, partition, offset, "")
}

om := offset.NewManager(minOffset, markOffset, partition, c.metricsFactory)
Expand Down
Loading