From a13277cd4bd46a8b0b3d7277e25fc702eb918346 Mon Sep 17 00:00:00 2001 From: bom-d-van Date: Fri, 14 May 2021 20:44:53 +0200 Subject: [PATCH 1/3] receiver/kafka: fix kafka connection leaks when failing to consume partition Clients and consumers are not closed when consumer.ConsumePartition failed, which creates a resource leaks. If the error is sticky due to bad offset saved in state file, the receiver would keep creating new consumer connections and not closing them. This commit fixes the issue, by making sure the connections and consumers are closed if there is errors when trying to consume a partition. At the same time, if an sarama.ErrOffsetOutOfRange error is returned, it would just falls back to oldest offset, instead of keep retrying. --- receiver/kafka/kafka.go | 46 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/receiver/kafka/kafka.go b/receiver/kafka/kafka.go index 0bf4daac5..b85718103 100644 --- a/receiver/kafka/kafka.go +++ b/receiver/kafka/kafka.go @@ -330,6 +330,13 @@ func (rcv *Kafka) consume() { ) return } + defer func() { + if err := client.Close(); err != nil { + rcv.logger.Error("failed to close client", + zap.Error(err), + ) + } + }() if rcv.kafkaState.offsetIsTimestamp { rcv.kafkaState.offsetIsTimestamp = false @@ -356,15 +363,42 @@ func (rcv *Kafka) consume() { ) return } + defer func() { + if err := consumer.Close(); err != nil { + rcv.logger.Error("failed to close consumer", + zap.Error(err), + ) + } + }() partitionConsumer, err := consumer.ConsumePartition(rcv.connectOptions.topic, rcv.connectOptions.partition, rcv.kafkaState.Offset) if err != nil { - rcv.logger.Error("failed to connect to kafka", - zap.Duration("reconnect_interval", rcv.reconnectInterval), - zap.Error(err), - ) - return + switch err { + case sarama.ErrOffsetOutOfRange: + rcv.logger.Error( + "kafka state offset out of range, restart from the oldest offset", + zap.Int64("kafka_state_offset", rcv.kafkaState.Offset), + ) + partitionConsumer, err = consumer.ConsumePartition(rcv.connectOptions.topic, rcv.connectOptions.partition, sarama.OffsetOldest) + } + + if err != nil { + rcv.logger.Error("failed to consume from partition", + zap.String("topic", rcv.connectOptions.topic), + zap.Int32("partition", rcv.connectOptions.partition), + zap.Int64("offset", rcv.kafkaState.Offset), + zap.Error(err), + ) + return + } } + defer func() { + if err := partitionConsumer.Close(); err != nil { + rcv.logger.Error("failed to close partition consumer", + zap.Error(err), + ) + } + }() rcv.consumer = partitionConsumer // Stop old worker @@ -465,6 +499,8 @@ func (rcv *Kafka) worker() { var payload []*points.Points var err error msgChan := rcv.consumer.Messages() + // TODO: + // * handle reconnect (probably not necessary)? for { messageReceived := true select { From 5815e54cd626faf6a12e61578f28fc8ff8480ebb Mon Sep 17 00:00:00 2001 From: bom-d-van Date: Fri, 14 May 2021 20:46:11 +0200 Subject: [PATCH 2/3] receiver/kafka: moves time.NewTicker out of for loop It is not necessary to create new ticker in every loop. --- receiver/kafka/kafka.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/kafka/kafka.go b/receiver/kafka/kafka.go index b85718103..f0286c572 100644 --- a/receiver/kafka/kafka.go +++ b/receiver/kafka/kafka.go @@ -415,8 +415,8 @@ func (rcv *Kafka) consume() { } func (rcv *Kafka) connect() { + reconnectTimer := time.NewTicker(rcv.reconnectInterval) for { - reconnectTimer := time.NewTicker(rcv.reconnectInterval) select { case <-rcv.closed: close(rcv.workerClosed) From 8032bdc081839b831e31f337e594bf71bd764ad0 Mon Sep 17 00:00:00 2001 From: bom-d-van Date: Fri, 14 May 2021 21:32:54 +0200 Subject: [PATCH 3/3] receiver/kafka: fix ci lint errors --- receiver/kafka/kafka.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/receiver/kafka/kafka.go b/receiver/kafka/kafka.go index f0286c572..dbaf86ce6 100644 --- a/receiver/kafka/kafka.go +++ b/receiver/kafka/kafka.go @@ -373,8 +373,7 @@ func (rcv *Kafka) consume() { partitionConsumer, err := consumer.ConsumePartition(rcv.connectOptions.topic, rcv.connectOptions.partition, rcv.kafkaState.Offset) if err != nil { - switch err { - case sarama.ErrOffsetOutOfRange: + if err == sarama.ErrOffsetOutOfRange { rcv.logger.Error( "kafka state offset out of range, restart from the oldest offset", zap.Int64("kafka_state_offset", rcv.kafkaState.Offset),