From bddf37e2c3f7241c1398478c73205094507dac90 Mon Sep 17 00:00:00 2001 From: zxc111 Date: Sun, 5 Jun 2022 14:57:08 +0800 Subject: [PATCH] fix examples/consumergroup --- examples/consumergroup/main.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/examples/consumergroup/main.go b/examples/consumergroup/main.go index 0edc06cc7..88edd738c 100644 --- a/examples/consumergroup/main.go +++ b/examples/consumergroup/main.go @@ -179,10 +179,17 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29 - for message := range claim.Messages() { - log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic) - session.MarkMessage(message, "") + for { + select { + case message := <-claim.Messages(): + log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic) + session.MarkMessage(message, "") + + // Should return when `session.Context()` is done. + // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: + // https://github.com/Shopify/sarama/issues/1192 + case <-session.Context().Done(): + return nil + } } - - return nil }