-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix pulsar intrinsic bugs #176
Conversation
@Override | ||
// currently each liiklus consumers consume all partition, and leaves the client to rebalance | ||
// in pulsar, the pulsar client does not rebalance naturally on failover or exclusive | ||
public void testMultipleGroups() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test is pretty storage-agnostic, but does it have to be overridden? I am afraid we're introducing a regression here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure it's the regression, if anything I would say that the tests depends too much on the intrinsic behaviour of the underlying storage.
e.g. for kafka, we can have two consumers for a group name, and there will be rebalancing, and the rebalancing is working properly. e.g. if one consumer disconnnect, the second consumer will do a reset offset based on liiklus offset and continue consuming. However, in pulsar failover mode, having two consumers, doing an offset seek will disconnect all consumers, and cause the second consumer to also be stuck.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't exclusive mode allow only one consumer to be connected at a time? So that we get assignment-like behaviour, assumed by this test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it differs in subscription time, or let me check the tests in more detail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testExclusiveRecordDistribution
here the test checks that if two consumers connect at the same time, that each consumer will have messages consumed. however this is not correct, as the rebalancing can happens that only one consumer is the one handling all partitions. this is not very liiklus specific, but dependant on the records storage native client implementation.
testMultipleGroups
here the test checks that if one consumer is connected, that it's getting all messages. however the next thing tested is that if a second consumer is connected. the second consumer also have consumed some messages. this is also the wrong assumption as above as liiklus does not do any balancing, but the records storage native client implementation does.
Checking above tests, since liiklus always assign all partitions to any connecting consumers, the rebalancing happens based on the records storage native client implementation. So, it is the test which is not consistent, or in other words, the test is expecting some native client implementation behaviour (in this case pulsar).
or am I missing some understanding here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it rebalance only if a consumer of a partition died because of any reason. but this rebalancing is more on liiklus side, which connects the pulsar consumer with exclusive subscription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, it falls back, but the "rebalancing" is not happening (where "rebalancing" is re-assigning partitions between consumers to balance the load)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which makes me wonder.... unlike in Kafka, in Pulsar, partitions are topics, and there is N consumers for each "partitioned topic". Which means that Pulsar cannot detect that N consumers are coming from the same "process", hence no rebalancing.
Maybe there is some property to set when "exclusive" is used to make it detect overassigned instances and do the "rebalancing"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulsar's abstraction is a bit different than Kafka. In Kafka, there is a partition -> consumer. I don't think the partition abstraction is that different (except that in pulsar it's exposed as another topic internally). In pulsar, there is a partition -> subscription -> consumer.
So, a pulsar client does not really connect to a partition, but it connects to a subscription (based on a subscription name). In turn, subscription is connected to 1 topic or more (in our case it's only 1 subscription (per 1 subscription name) per 1 partition. Exclusive means, that only one consumer can be connected to a subscription. With Failover, it means, we can have multiple consumers connected to a subscription, and if a consumer die, pulsar will push all unacked messages to other consumer. So there is not really any rebalancing happening, as if the first consumer is connected, the message in the partition will go to that consumer in any subscription mode.
However, in liiklus, we do a manual seek with the consumer. The offset of a partition actually lies with the subscription. Hence if a manual seek is done. Pulsar disconnect all consumers of a subscription. Hence, a rebalancing might happen on reconnect as the one to reconnect first might be another consumer. However this is a side effect of the last pulsar records storage implementation. The next issue is that, because of this seek. For some reason, if the receiving consumer is disconnected, the other connected consumer won't get any message.
Additionally as extra information, this is not further investigated yet. Since the seek only happens on initial subscription. It means in case of a failover switch, the second consumer will get all messages since the last seek as liiklus does not ack any messages in pulsar, and in case of failover pulsar send to the other consumer all un-acked messages (based on the last saved offset in the subscription). As the other connected consumer from liiklus can't detect when the other disconnect, it won't know when to do a new seek based on liiklus offset, and can just receive all messages the other consumer have previously received. However, as mentioned above, this does not happens in the test since we only test new consumer connection, and did not really test what happens on failover, is the one receiving message is killed.
So in short, to my understanding pulsar does not really rebalance automatically and we can't assume it will happen on new consumer creation.
p.s. there is actually a way to have always a rebalancing, and that is the Key_Shared or Shared subscription mode. However, liiklus offset and ack does not really support this in a good way as in pulsar, ack is actually done on a message basis, and in liiklus it's done on an offset basis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulsar disconnect all consumers of a subscription.
FYI it is fixed in master: https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1567098757051900?thread_ts=1567090931.044200&cid=C5Z4T36F7
(I will comment more, just wanted to point out eagerly)
@Override | ||
// currently each liiklus consumers consume all partition, and leaves the client to rebalance | ||
// in pulsar, the pulsar client does not rebalance naturally on failover or exclusive | ||
public void testExclusiveRecordDistribution() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as in testMultipleGroups
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as well in testMultipleGroups
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test verifies that:
- Eventually, both subscriptions will receive assignments
- their received records are not overlapping
this is not Kafka specific, and if this test fails - there seems to be a bug in the storage implementation
|
||
@Test | ||
@Override | ||
// since pulsar behave differently for closed ledger, need to overwrite this test to include the 1 entry setback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the whole point of Liiklus is too abstract away the semantics of different event storages, I don't understand why this test does not pass as it is in the TCK?
Also, TCK tests should not be changed, only new tests can be added. There are assumptions for storages that do not support more than 1 partition, but other than that, the storages should behave similarly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this one, it's indeed because there is an inconsistency in pulsar behaviour for checking what is a valid entry id. This would mean that we will need to patch pulsar and wait for a new release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI I'm trying to get help in Pulsar's Slack about it, will report back as soon as I have any info
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can check the PulsarRecordsStorage comment on the adaptForSeek method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have no access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would highly suggest to get it if you plan to work with Pulsar, they have a very active community and help with different kinds of questions :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it
...ar-records-storage/src/main/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorage.java
Outdated
Show resolved
Hide resolved
...ar-records-storage/src/main/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorage.java
Outdated
Show resolved
Hide resolved
tck/src/main/java/com/github/bsideup/liiklus/records/tests/SubscribeTest.java
Outdated
Show resolved
Hide resolved
...ecords-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java
Outdated
Show resolved
Hide resolved
toOffset(message.getMessageId()) | ||
); | ||
}) | ||
.delaySubscription(initialOffset.flatMap(offset -> resetSubscriptionOffset(consumer, offset))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT if we follow the code that on the current ledger we just don't subtract the offset?
that's maybe not the safest way in the theory since while we are asking about the latest message the ledger could be right after closed, but quick tests are working
.map(PulsarRecordsStorage::fromOffset)
.cast(MessageIdImpl.class)
.flatMap(messageId -> {
return Mono.fromCompletionStage(() -> ConsumerImplAccessor.getLastMessageIdAsync(consumer))
.map(last -> {
var msg = (MessageIdImpl) last;
if (msg.getLedgerId() == messageId.getLedgerId()) {
return messageId;
}
return adaptForSeek(messageId);
});
})
There are a few bugs currently to fix in order to run pulsar with liiklus: