Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Kafka notifier: do not wait until timeout when there is no backlog to process, ie. no message in the partition queue #1315

Merged
merged 3 commits into from
May 23, 2019

Conversation

fkaleo
Copy link
Contributor

@fkaleo fkaleo commented May 17, 2019

Rationalised NotifierKafka's backlog processing:

  • keep bootTimeOffsets local to consumePartition and renamed it to lastAvailableOffsetAtStartup
  • at startup invoke GetOffset() to retrieve the explicit index in all cases ('oldest', 'newest', timestamp)
  • factored duplicated code into updateProcessBacklog() and updateMetrics()
  • when consumePartition() starts, check if backlog has been processed; fixes all cases of backlog processing stuck when no messages need to be received.

Note: partitionLagMetric was previously set to GetOffset(..., OffsetNewest) - msg.Offset and is now set to GetOffset(..., OffsetNewest) - msg.Offset - 1. Other metrics remained unchanged.
Fixes #1316

… process, ie. no message in the partition queue
@woodsaj
Copy link
Member

woodsaj commented May 17, 2019

While this approach works, i think we can resolve this issue in a more reliable way. eg, if the user sets the "offset" config item to 'newest' then the bootimeOffset would be > 0, but we should also call processBacklog.Done() as soon as we start.

I suggest a few changes:

  • in func (c *NotifierKafka) start() { we should always get the explicit offset to start from, ie if the user requests 'newest' (-1) or 'oldest' (-2) then we should pass those through to c.client.GetOffset().
  • Instead of fetching and storing bootimeOffsets when ConfigProcess is called, we should just fetch them in start(). eg just call bootTimeOffset, err = c.client.GetOffset(topic, partition, sarama.OffsetNewest)
  • we should then just pass bootTimeOffset through to consumePartition()
  • when consumePartition() starts, we can call processBacklog.Done() if currentOffset == bootTimeOffset

@fkaleo
Copy link
Contributor Author

fkaleo commented May 17, 2019

While this approach works, i think we can resolve this issue in a more reliable way. eg, if the user sets the "offset" config item to 'newest' then the bootimeOffset would be > 0, but we should also call processBacklog.Done() as soon as we start.

I'm not sure why in this case bootTimeOffsets[partition] would be > 0. It seems to be independent from what the "offset" config item is set to. For that reason I believe my change to be reliable, even in that case.

  • in func (c *NotifierKafka) start() { we should always get the explicit offset to start from, ie if the user requests 'newest' (-1) or 'oldest' (-2) then we should pass those through to c.client.GetOffset().

That seems like a good idea to make the code more obvious.
I assume you mean setting consumePartition's currentOffset to the value returned by c.client.GetOffset() in all cases.
However, if I'm not mistaken, c.client.GetOffset() returns the offset of the next message that c.consumer.ConsumePartition() will read whereas currentOffset at the moment represents the offset of the last message read by c.consumer.ConsumePartition(), which is a very useful definition both to check if reading the backlog is done (if currentOffset == bootTimeOffset) and to compute the lag.

  • Instead of fetching and storing bootimeOffsets when ConfigProcess is called, we should just fetch them in start(). eg just call bootTimeOffset, err = c.client.GetOffset(topic, partition, sarama.OffsetNewest)

  • we should then just pass bootTimeOffset through to consumePartition()

I agree this code belongs to start() and not to ConfigProcess() and we can avoid having a map (bootTimeOffsets) entirely.

  • when consumePartition() starts, we can call processBacklog.Done() if currentOffset == bootTimeOffset

If currentOffset always represents the offset of the last message read (and bootTimeOffset represents the offset of the last message available), then this feels correct. At the moment, currentOffset does not represent the offset of the last message read at startup, only after having received one message:

  • when offsetStr is "oldest": currentOffset is -2 before receiving any message
  • when offsetStr is "newest": currentOffset is -1 before receiving any message
  • when offsetStr is a timestamp: currentOffset is the offset of the first message that will be read, before receiving any message. It probably means partitionOffsetMetric and partitionLagMetric are off by one at startup in that case (bug).
  • in all 3 cases, after receiving the first message, currentOffset starts representing the offset of the last message read

I'm going to think that through a little more to figure out what an elegant solution would be. Perhaps just subtracting 1 to the result of GetOffset() when setting currentOffset.

@woodsaj
Copy link
Member

woodsaj commented May 17, 2019

I'm not sure why in this case bootTimeOffsets[partition] would be > 0

bootTimeOffsets[partition] is set to the response from client.GetOffset(topic, part, sarama.OffsetNewest). If a partition has ever received messages before, the returned offset will be >0

I assume you mean setting consumePartition's currentOffset to the value returned by c.client.GetOffset() in all cases.

Yes, exactly.

currentOffset at the moment represents the offset of the last message read by c.consumer.ConsumePartition()

Not when consumePartition() starts. Currently, the currentOffset passed to consumePartition() is -1 (oldest), -2 (newest) or the response from c.client.GetOffset() called with the desired offset timestamp.

Simply put, my idea here is to just call client.GetOffset(topic, part, sarama.OffsetNewest) and client.GetOffset(topic, part, <desired start offset>). If those offsets are the same, then there are no messages in the queue to consume.

@fkaleo
Copy link
Contributor Author

fkaleo commented May 17, 2019

bootTimeOffsets[partition] is set to the response from client.GetOffset(topic, part, sarama.OffsetNewest). If a partition has ever received messages before, the returned offset will be >0

Yes but that is unrelated to the "offset" config item. Hence I don't see why it would break.

@fkaleo
Copy link
Contributor Author

fkaleo commented May 17, 2019

Not when consumePartition() starts. Currently, the currentOffset passed to consumePartition() is -1 (oldest), -2 (newest) or the response from c.client.GetOffset() called with the desired offset timestamp.

Yep, agreed. That's exactly what I described in:

At the moment, currentOffset does not represent the offset of the last message read at startup, only after having received one message: [...]

@fkaleo
Copy link
Contributor Author

fkaleo commented May 17, 2019

Simply put, my idea here is to just call client.GetOffset(topic, part, sarama.OffsetNewest) and client.GetOffset(topic, part, <desired start offset>). If those offsets are the same, then there are no messages in the queue to consume.

What I was trying to say was that client.GetOffset(topic, part, <desired start offset>) will return the offset of the next message we will consume, not the last one we consumed. Hence I think doing what you suggest without subtraction would make us fire the processBacklog.Done() one message too soon. I think, not 100% sure :)

I am working on a patch doing what you are suggesting with a slight twist.
Incidentally cleaning up the meaning of the variables seems to remove quite a bit of duplicated code.

@woodsaj
Copy link
Member

woodsaj commented May 17, 2019

Yes but that is unrelated to the "offset" config item. Hence I don't see why it would break.

Your proposed changes wouldn't break anything. They just dont cover all scenarios where there are no messages to consume. eg if the config has 'offset' set to 'newest' nothing will be consumed until a new message is published to the partition.

I think the confusion here is due to what c.client.GetOffset() returns. If you ask kafka for the "newest" offset, it will give you the offset number that will be used for the next message to be received, not the offset of the last message received by the broker.

@fkaleo
Copy link
Contributor Author

fkaleo commented May 17, 2019

Your proposed changes wouldn't break anything. They just dont cover all scenarios where there are no messages to consume. eg if the config has 'offset' set to 'newest' nothing will be consumed until a new message is published to the partition.

Yes! I understood my mistake now. I had the return value of c.client.GetOffset() right but I simply did not consider the case you mentioned (where there are messages in the queue but we set the config option to 'newest' in order to not consume them).

So the patch I have locally takes care of that.

- keep bootTimeOffsets local to consumePartition and renamed it to lastAvailableOffsetAtStartup
- at startup invoke GetOffset() to retrieve the explicit index in all cases ('oldest', 'newest', timestamp)
- factored duplicated code into updateProcessBacklog() and updateMetrics()
- when consumePartition() starts, check if backlog has been processed; fixes all cases of backlog processing stuck when no messages need to be received.

Note: partitionLagMetric was previously set to GetOffset(..., OffsetNewest) - msg.Offset and is now set to GetOffset(..., OffsetNewest) - msg.Offset - 1. Other metrics remained unchanged.
@fkaleo
Copy link
Contributor Author

fkaleo commented May 17, 2019

Ok, I took a stab at it. @woodsaj what do you think of it?

@@ -125,16 +124,9 @@ func ConfigProcess(instance string) {
partitionLogSize = make(map[int32]*stats.Gauge64)
partitionLag = make(map[int32]*stats.Gauge64)

// get the "newest" offset for all partitions.
// when booting up, we will delay consuming metrics until we have
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest of this comment needs to be cleaned up. All this loop is doing is initialising stats for each partition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

@woodsaj woodsaj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks great.

@fkaleo fkaleo merged commit 146b5e6 into master May 23, 2019
@Dieterbe Dieterbe deleted the nowait_when_no_messages branch May 27, 2019 07:30
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

When there is no backlog to process, NotifierKafka waits until timeout
2 participants