Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition offset rewinds during rebalances #209

Closed
4 of 7 tasks
AlexJF opened this issue Jul 5, 2018 · 4 comments
Closed
4 of 7 tasks

Partition offset rewinds during rebalances #209

AlexJF opened this issue Jul 5, 2018 · 4 comments

Comments

@AlexJF
Copy link

AlexJF commented Jul 5, 2018

Description

We have a large pool (100+) of consumers using the high-level functional API consumer all with the same consumer group. These consumers use the auto-commit functionality but we call StoreOffsets ourselves at the end of our processing pipeline. We also explicitly handle rebalance events.

We've noticed that whenever we do a deployment (we do a rolling deployment, 20 nodes at a time, so there are often a few partition assign+revoke events in succession) we sometimes witness offset rewinds in some partitions (we monitor offsets using Datadog integrations and have alerts for when the derivative of committed offsets goes negative).

It's not trivial to reproduce and it's not really feasible for us to activate the extra debug logging as it quickly overflows our disk given our traffic pattern. However, I attempted to capture the situation using a Datadog graphing of assigned and committed offsets for an affected partition at the time its offset got rewinded by approx 6.7k messages (10 minutes of data):

image

x-axis is time, y-axis is offset. Solid lines represent offsets as reported by a CommittedOffsets event. Dashed lines represent offsets as reported by consumer.Committed that is called on AssignedPartitions event and these are used as the offsets for consumer.Assign.

The ideal representation for this would actually be points not lines but that's unfortunately not a display option in Datadog at the moment. I put some black dots around the "points" of interest to try and minimize the noise caused by all the line interpolation.

Hypothesis

Barring some misuse of the library on our end (don't think it's the case since it matches the examples I see everywhere else), the situation described by the previous image suggests that there might be 2 separate issues here:

  • Stored offsets are not reset automatically between partition rebalances. The fact that i-01d9b goes back to an offset very close to the one it had been assigned to 10 minutes before suggests that the auto-commit mechanism committed an old stored offset for this partition.
  • The offset returned by consumer.Committed for i-0cd0a 10 minutes before was actually bigger than the one that got returned by consumer.Committed() for i-01d9b during the offset rewind. This suggests that the auto-commit that committed the old stored offset successfully updated the broker committed offset before i-01d9b even got the response to consumer.Committed, much less consumer.Assign.

We'll be experimenting with this hypothesis by explicitly calling consumer.Commit() and storing -1001 as the offset for all assigned partitions before a call to Unassign().

Code

Config

	kafkaCfg := &kafka.ConfigMap{
		"bootstrap.servers":  strings.Join(opts.KafkaServers, ","),
		"group.id":           opts.KafkaConsumerGroup,
		"session.timeout.ms": defaultSessionTimeoutMs, // 30 s

		// Auto commit on the background the offsets we store.
		"enable.auto.commit":      true,
		"auto.commit.interval.ms": int(offsetCommitTicker / time.Millisecond), // 1 s
		// Don't automatically store offset of each message sent to the app. We store offsets
		// after successful processing
		"enable.auto.offset.store": false,

		"go.application.rebalance.enable": true,

		"default.topic.config": kafka.ConfigMap{
			"auto.offset.reset": opts.AutoOffsetReset, // latest
		},

		// Ends up just being noisy
		"log.connection.close": false,

		// This value is per topic+partitions so lets split the total memory we want to use by all topic+partition
		// combinations.
		"queued.max.messages.kbytes": queueKBPerTopicPartition, // this is approx 10MB.

		// Set message max bytes to the max supported by our Kafka broker (5MB).
		// This influences other properties like fetch.max.bytes and fetch.message.max.bytes.
		"message.max.bytes": maxBytesPerMessage, // 5 MB

		"enable.partition.eof": true,
	}

Event handling

	switch e := ev.(type) {
	case kafka.AssignedPartitions:
		assignedOffsets, err := kc.Committed(e.Partitions, int(committedOffsetsTimeout/time.Millisecond))

		if err != nil {
			log.Fatal("error calculating assigned offsets",
				log.String("context", "consumer"),
				log.ErrorField(err),
			)
		}

		reportPartitionsChange("assigned", assignedOffsets)

		fr.Assign(assignedOffsets)
		kc.Assign(assignedOffsets)

	case kafka.RevokedPartitions:
		reportPartitionsChange("revoked", e.Partitions)
		// Wait until all inflight messages have been acked
		waitTime, ok := fr.WaitNoFlight(noFlightTimeout)
		if !ok {
			log.Fatal("timed out while waiting for no more requests to be in flight",
				log.String("context", "consumer"),
			)
		}
		fr.Unassign()
		kc.Unassign()

	case *kafka.Message:
		if _, ok := fr.Track(e.TopicPartition); !ok {
			log.Debug("skipping message as it's not in active assignment",
				log.String("context", "consumer"),
				log.Int32("partition", e.TopicPartition.Partition),
				log.String("topic", *e.TopicPartition.Topic),
				log.Int64("offset", int64(e.TopicPartition.Offset)),
			)
			return
		}

		if _, d, err := handleMessage(e); err == nil {
			stream <- d
		}
	case kafka.Error:
		log.Fatal("error while processing stream",
			log.String("context", "consumer"),
			log.ErrorField(e),
		)
	case kafka.OffsetsCommitted:
		// Submit committed offsets via gauge and report errors
		...
	case kafka.PartitionEOF:
		if fr.TrackEOF(e) {
			commitEOF(kc, e)
		}

kc = KafkaConsumer
fr = FlightRecorder, basically keeps track of in-flight messages and EOFs. It also has some safeguards against processing messages with offsets previous to those that were reported as committed during the AssignedPartitions events (we thought this might have been the cause of the offset rewinds before we had enough telemetry but actually have seen no instance where we received a message with an offset previous to the one we passed on via consumer.Assign).

Committing

		go func() {
			for i := range in {
				ctx := i.(context.Context)

				tp, err := TopicPartitionFromContext(ctx)
				if err != nil {
					log.Warn("error retrieving topic partition from context",
						log.String("context", "consumer"),
						log.ErrorField(err),
					)
					continue
				}

				_, err = kc.StoreOffsets([]kafka.TopicPartition{tp})
				if err != nil {
					offsetStoreErrorsC.Incr()
					log.Error("error storing offsets",
						log.String("context", "consumer"),
						log.ErrorField(err),
					)
					continue
				}

				if _, eof := fr.Ack(tp); eof != nil {
					commitEOF(kc, *eof)
				}
			}

			log.Info("nothing more in flight",
				log.String("context", "consumer"),
			)

			// Close the consumer only once we are sure no more acks will come
			err := kc.Close()
			if err != nil {
				log.Error("error closing consumer",
					log.String("context", "consumer"),
					log.ErrorField(err))
			} else {
				log.Info("closed consumer",
					log.String("context", "consumer"),
				)
			}

			// Notify waiters that we are done
			close(out)
			log.Info("notified done",
				log.String("context", "consumer"),
			)
		}()

How to reproduce

Not clear. We haven't been able to reproduce in a controlled environment yet.

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()):
  • Apache Kafka broker version: kafka_2.12-0.10.2.1
  • Client configuration: ConfigMap{...}
  • Operating system: Ubuntu 16.04.4 Xenial
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
@AlexJF
Copy link
Author

AlexJF commented Aug 6, 2018

An update on this: we seem to not have gotten any new reset offsets during deployments by executing this in the revoked partitions event:

	// Start by committing any pending stored offsets
	_, err := kc.Commit()
	if err != nil {
		log.Error("error committing consumer",
			log.String("context", "consumer"),
			log.ErrorField(err))
	} else {
		log.Info("committed consumer",
			log.String("context", "consumer"),
		)
	}

	// Then reset all stored offsets to invalid offset so they don't get committed by the auto commit.
	currentAssignment := fr.CurrentAssignment()
	if len(currentAssignment) > 0 {
		resetStoredOffsets := make([]kafka.TopicPartition, 0, len(currentAssignment))
		for k := range currentAssignment {
			resetStoredOffsets = append(resetStoredOffsets, kafka.TopicPartition{
				Partition: k.Partition,
				Topic:     &(k.Topic),
				Offset:    kafka.OffsetInvalid,
			})
		}
		_, err = kc.StoreOffsets(resetStoredOffsets)
		if err != nil {
			log.Error("error resetting stored offsets",
				log.String("context", "consumer"),
				log.ErrorField(err),
			)
		} else {
			log.Info("reset stored offsets",
				log.String("context", "consumer"),
			)
		}
	}

	// Finally, call unassign on the kafka consumer so we leave the consumer group.
	err = kc.Unassign()
	if err != nil {
		log.Error("error unassigning consumer",
			log.String("context", "consumer"),
			log.ErrorField(err))
	} else {
		log.Info("unassigned consumer",
			log.String("context", "consumer"),
		)
	}

so it does seem like my initial hypothesis holds

@jeffwidman
Copy link
Contributor

@AlexJF I'm super curious--did you start noticing the negative lag due to https://github.com/DataDog/integrations-core/blob/181bfec3f7fdffc750f8ec82a9b36ae1c8734e56/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py#L336-L343 ?

Or do you use a different check internally to monitor for negative consumer lag?

@AlexJF
Copy link
Author

AlexJF commented Jan 4, 2019

We never actually observed negative lag at any point here.

We noticed this problem due to the tracking of the kafka.consumer.lag metric (dashboards and monitors) and it manifested by a sudden jump to a very high positive value (but not corresponding to an auto reset to oldest as the max lag we usually saw was between 1 and 3 hours and the retention on this cluster was set to 6 hours)

lag increase

@jeffwidman
Copy link
Contributor

Oops, my bad, misread your description as the lag going the other way.

I understand what you're saying now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants