diff --git a/receiver/kafka/kafka.go b/receiver/kafka/kafka.go index 0bf4daac5..dbaf86ce6 100644 --- a/receiver/kafka/kafka.go +++ b/receiver/kafka/kafka.go @@ -330,6 +330,13 @@ func (rcv *Kafka) consume() { ) return } + defer func() { + if err := client.Close(); err != nil { + rcv.logger.Error("failed to close client", + zap.Error(err), + ) + } + }() if rcv.kafkaState.offsetIsTimestamp { rcv.kafkaState.offsetIsTimestamp = false @@ -356,15 +363,41 @@ func (rcv *Kafka) consume() { ) return } + defer func() { + if err := consumer.Close(); err != nil { + rcv.logger.Error("failed to close consumer", + zap.Error(err), + ) + } + }() partitionConsumer, err := consumer.ConsumePartition(rcv.connectOptions.topic, rcv.connectOptions.partition, rcv.kafkaState.Offset) if err != nil { - rcv.logger.Error("failed to connect to kafka", - zap.Duration("reconnect_interval", rcv.reconnectInterval), - zap.Error(err), - ) - return + if err == sarama.ErrOffsetOutOfRange { + rcv.logger.Error( + "kafka state offset out of range, restart from the oldest offset", + zap.Int64("kafka_state_offset", rcv.kafkaState.Offset), + ) + partitionConsumer, err = consumer.ConsumePartition(rcv.connectOptions.topic, rcv.connectOptions.partition, sarama.OffsetOldest) + } + + if err != nil { + rcv.logger.Error("failed to consume from partition", + zap.String("topic", rcv.connectOptions.topic), + zap.Int32("partition", rcv.connectOptions.partition), + zap.Int64("offset", rcv.kafkaState.Offset), + zap.Error(err), + ) + return + } } + defer func() { + if err := partitionConsumer.Close(); err != nil { + rcv.logger.Error("failed to close partition consumer", + zap.Error(err), + ) + } + }() rcv.consumer = partitionConsumer // Stop old worker @@ -381,8 +414,8 @@ func (rcv *Kafka) consume() { } func (rcv *Kafka) connect() { + reconnectTimer := time.NewTicker(rcv.reconnectInterval) for { - reconnectTimer := time.NewTicker(rcv.reconnectInterval) select { case <-rcv.closed: close(rcv.workerClosed) @@ -465,6 +498,8 @@ func (rcv *Kafka) worker() { var payload []*points.Points var err error msgChan := rcv.consumer.Messages() + // TODO: + // * handle reconnect (probably not necessary)? for { messageReceived := true select {