Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

Leader change during re-balance may cause partition not being consumed #125

Open
waterlx opened this issue Nov 30, 2017 · 4 comments
Open

Comments

@waterlx
Copy link

waterlx commented Nov 30, 2017

Simplified it into the following scenario:
3 brokers. 1 topic with 4 partitions. 2 consumer instances to consume that topic. The start index of broker, partition and consumer is 0.

When c0 (consumer instance 0) calls utils.go#dividePartitionsBetweenConsumers(), the leaders are like:

  • p0 on b0
  • p1 on b1
  • p2 on b2
  • p3 on b0

After sort(by leader then by partition id), partitions is like

[
    {p0 b0 xxxx}  // {partition leader address}
    {p3 b0 xxxx}
    {p1 b1 xxxx}
    {p2 b2 xxxx}
] 

So c0 gets its myPartitions (to claim) like p0, p3.

Then p0 somehow change its leader to b2. The leaders are like:

  • p0 on b2
  • p1 on b1
  • p2 on b2
  • p3 on b0

And another consumer instance c1 calls utils.go#dividePartitionsBetweenConsumers().
After sort(by leader then by partition id), partitions is like

[
    {p3 b0 xxxx}
    {p1 b1 xxxx}
    {p0 b2 xxxx}
    {p2 b2 xxxx}
]

c1 gets its myPartitions (to claim) like p0, p2.

As a result, we have a condition that c0 tries to claim p0 and p3 while c1 tries to claim p0 and p2.

  • c0 and c1 both fight for p0 and c0 wins (as it tries to claim it firstly).
  • But no one tries to claim p1.

In utils.go, we sort the partitionLeader by leader firstly.

func (pls partitionLeaders) Less(i, j int) bool {
	return pls[i].leader < pls[j].leader || (pls[i].leader == pls[j].leader && pls[i].id < pls[j].id)
}

When leader changes between 2 calls of dividePartitionsBetweenConsumers(), the result after sort is changed. I believe the root cause it here.

@waterlx
Copy link
Author

waterlx commented Nov 30, 2017

I am not sure if Less() could be changed to drop the comparison of leader and compare partition.id only.
The sort on leader will have the following benefit, according to https://kafka.apache.org/documentation/#impl_consumerrebalance

we try to assign partitions to consumers in such a way that reduces the number of broker nodes each consumer has to connect to.

The drop of leader comparison will break that. But keep comparing leader will not let us avoid the condition described in the issue when there is a leader change

@waterlx
Copy link
Author

waterlx commented Nov 30, 2017

As a second thought, shall we trigger a re-balance when there is a leader change? Could be a solution for this issue?

@waterlx
Copy link
Author

waterlx commented Nov 30, 2017

If I get it correctly, Kafka Java client does not consider leader and sorts on partition in numeric order. See RangeAssignor in https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/PartitionAssignor.scala

@secfree
Copy link

secfree commented Dec 6, 2018

@waterlx Thank you very much for your detailed analysis. One of my legacy projects is using this library and encountered the same problem.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants