Skip to content

Commit

Permalink
Proper shutdown of kafka consumer impl and fix test (#5712)
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored Mar 5, 2024
1 parent 8bab585 commit e3b3997
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
15 changes: 10 additions & 5 deletions common/messaging/kafka/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type (
consumerHandler *consumerHandlerImpl
consumerGroup sarama.ConsumerGroup
msgChan <-chan messaging.Message
wg sync.WaitGroup
cancelFunc context.CancelFunc

logger log.Logger
Expand Down Expand Up @@ -95,23 +96,22 @@ func NewKafkaConsumer(
consumerHandler := newConsumerHandlerImpl(dlqProducer, topic, msgChan, metricsClient, logger)

return &consumerImpl{
topic: topic,

topic: topic,
consumerHandler: consumerHandler,
consumerGroup: consumerGroup,
msgChan: msgChan,

logger: logger,
logger: logger,
}, nil
}

func (c *consumerImpl) Start() error {

ctx, cancel := context.WithCancel(context.Background())
c.cancelFunc = cancel
c.wg.Add(1)

// consumer loop
go func() {
defer c.wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
Expand All @@ -133,7 +133,12 @@ func (c *consumerImpl) Start() error {
func (c *consumerImpl) Stop() {
c.logger.Info("Stopping consumer")
c.cancelFunc()
c.logger.Info("Waiting consumer goroutines to complete")
c.wg.Wait()
c.logger.Info("Stopping consumer handler and group")
c.consumerHandler.stop()
c.consumerGroup.Close()
c.logger.Info("Stopped consumer")
}

// Messages return the message channel for this consumer
Expand Down
2 changes: 2 additions & 0 deletions common/messaging/kafka/consumer_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ func initMockBroker(t *testing.T, group string) *sarama.MockBroker {
SetBroker(mockBroker.Addr(), mockBroker.BrokerID()).
SetLeader(topics[0], 0, mockBroker.BrokerID()).
SetController(mockBroker.BrokerID()),
"FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t).
SetCoordinator(sarama.CoordinatorGroup, group, mockBroker),
})
return mockBroker
}

0 comments on commit e3b3997

Please sign in to comment.