-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Consumer group data, with offsets and lag #108
Conversation
Two solutions to this problem arrived on the same day (the walmart one, #109, and this one). Walmart doesn't use curator tree caches unlike the topics cache and also my solution's consumer cache. This reduces the amount of memory being used. Walmart also adds filtering of inactive consumers. However, its consumer and offset code is minimally integrated into the existing code. Some features, like modifications to the topic view to add offsets and a list of consumers reading from the topic, are less feasible with this lack of integration. I am thinking of replacing the method I get consumer data with Walmart's method, to take advantage of the memory efficiency, while maintaining the integration into the actor system of the current kafka-manager. This means that calls to the consumer list view will be faster due to the caching in BrokerViewCacheActor. This will also allow for a more complete integration of the consumer and offset information into the UI. Please reply to let me know if this sounds like a good solution. |
I might be wrong, but I prefer your original approach of using consumer cache despite of the fact that it takes more memory than fetching data directly from Zookeeper. One of the things I like Kafka Manager is that it maintains a cache about Kafka state so it doesn't have to fetch data from Zookeeper every time I want to get some information about brokers or topics. I think the use of consumer cache fits better to the design of Kafka Manager, and could be better for future usage if I want to add a REST API to Kafka Manager to retrieve consumer information, in which case Kafka Manager wouldn't have to fetch information from Zookeeper but return value directly from its cache to the client. It's just my two cents. |
I agree, that was why I used the cache in the first place, but I thought it was worth trying to call zookeeper directly, to check if the performance didn't improve significantly. At first glance, it doesn't look like calling zookeeper directly made a big difference in the memory usage anyway. |
If I were to add filtering of inactive consumers (a consumer that don't list any topics that it reads from) should I make it an option - say part of ClusterConfig? |
Can you add tests? |
I can add tests that simply respond that there aren't any consumers, but at least it would test that the call went through, etc. I'm not sure how to modify your SeededBroker to add a seeded consumer as well as a topic. In the meantime, here's a simple addition to the KafkaStateActor test that will get empty lists of consumers. |
Could you squash the commits? |
Currently, the call to get the partition offsets with SimpleConsumer is blocking, I'm going to transfer all of those operations to Futures. @patelh Do you have a preference as to how I get an executionContext? If I'm going to have futures in the ActorModel, I'll need a local ec to do some of the processing. |
@cvcal where would you like to have futures in the ActorModel? Not sure if having an execution context in ActorModel is a good idea. Also could you please squash commits into one when you think you won't need to reset to previous commit? It'll be nice to keep the number of commits under 5 per PR, ideally 1 or 2. |
Ok, I will squash everything, I just want to finish this first. I had initially thought you were referring to the most recent commits. Currently, with a sufficient number of topics and partitions, having to make a simple consumer for each can trigger timeouts, especially with calls like KSGetAllTopicDescriptions. I thought having futures within the returned value (within the topic description, maybe within TopicIdentities too, though the from() method could handle the block-until-complete) would be the cleanest and most resilient solution. Is there another solution you would prefer? The KafkaStateActor would need an execution context regardless even if ActorModel doesn't, since it would be placing the SimpleConsumer calls in futures. @jisookim0513 Would this be acceptable? If so, is there a way to get an execution context that would be better than another? |
@cvcal sorry, I have been very busy with other issues so I couldn't get a chance to read through your entire PR. I will try to do it tonight, but for the execution context, you can consider changing KafkaStateActor to LongRunningActor and use longRunningExecutionContext. |
Ah, I hadn't noticed that LongRunningPoolActor was a trait and could be mixed with BaseQueryCommandActor, thanks! I'll try to have my update out by tonight then. (update: I might not have a version I'm happy with by tonight, because this currently creates too many futures, and I haven't gotten it to work by grouping the calls by topic yet, but if you want to see the modification of KafkaStateActor that I've been asking about - here's a rough version: cvcal@6609b65) |
Cool, thanks. Could you ping me when you have the version you like? I'd like to try it on my laptop as well. |
@jisookim0513: So this version is better - it still has a problem with creating too many futures because it ends up asking for fresh offsets for a topic every time you ask for the topic description or for the consumer description of a consumer that eats from it - so if a topic or topics get consumed from a lot, the redundant work piles up and could approach the task-limit. The fix for that is caching the results, and I'm working on that, but this should work for now. |
case KSGetConsumedTopicDescription(consumer, topic) => | ||
sender ! getConsumedTopicDescription(consumer,topic) | ||
|
||
case KSGetConsumerDescriptions(consumers) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess KSGetConsumerDescriptions doesn't get used, you're right, I added it automatically, when I was thinking of having a list of valid consumer groups that the user could curate, and forgot to get rid of it.
Hmm.. I tried the latest version on your branch, and it worked with my local testing cluster that had less than three topics. However, it doesn't seem to be working with a remote cluster that has quite a bit of topics and partitions. Caching and polling the information sounds like a good idea instead of fetching fresh data every time. However, I am a bit concerned that I wasn't able to see the topic list with the use of futures. Would it be insufficient if we don't have the topic offset information on TopicDescription (and TopicIdentity)? @patelh what do you think? |
It didn't work at all? I'm having a problem with java IO exceptions currently, is the problem you're having linked to ClosedChannelExceptions? I'm trying to fix that right now, so if the problem you were having is unrelated, could you let me know? |
@cvcal with my local test cluster I remember I was having a problem with ClosedChannelExceptions but I was able to see consumer information and topic offsets. For the remote cluster, the time-out occurred so I couldn't access topic list (only broker list worked). |
Ok, that's what I thought. This should fix it. If you disagree with my decision to put offsets in the consumer and topic identities, feel free to change it, but my internship ends this week, so I probably won't have a chance to implement any changes you suggest at this point. I do want to clean up a few things on the UI-end that I had been neglecting as I was trying to resolve the bigger problem, so I'll squash all the commits once that is over. If you're curious, the problem was that I was creating too many channels (one for each simple consumer) and was running up against a java limit. I had modeled the offset-getting simpleconsumer code after Kafka's GetOffsetShell, which did not close the connections it created, probably because it could get away with it. In this case, though, the limit was a problem. The reason you could still see the brokerlist was that that was the only page that listed information that did not try to get topic or consumer identities, and so it relied only on the channels created for the curator caches, which were still open. All other calls ended up requiring an update to the topic identities, which is what timed out since it couldn't create new consumers. Caching delayed the arrival of the channel limit, since it makes fewer simple consumers, and then adding consumer.close() fixed the underlying problem. |
0bf93d5
to
93f04e1
Compare
…s to the topics. The KafkaStateActor now tracks the following additional information: - adds a Curator treeCache to cache the consumer information, which includes the names of consumer instances that own each partition, and the offsets at every partition. - adds cached offsets for the topics. These must be gotten directly from Kafka as they are not stored in Zookeeper, so the call functions by using a simple consumer for every partition. Currently, the frequency of updates to these offsets is hard-coded to every 5 seconds or less frequently. - configurable in the clusterConfig is the possibility to exclude any consumer that do not have an offsets/ owners/ AND ids/ subdirectory as being "inactive" The UI now has a consumer list view, which lists all consumers and the topics they consume from, an individual page for each consumer that lists each topic with the cumulative lag and the percentage of partitions that have an owner (if all is well, this should be 100%), a page for every consumed topic that lists the offset, lag, and owner information at every partition for that consumer-topic combination. To the topic views, there is now the most recent offset at every partition and the cumulative log size, as well as links to the consumed state of the topic inside a list of the consumers that consume from this topic.
Please rebase off master. Let me know if you need help (please add me as contributor to your repo so I can push to your branch). |
I've started fixing merge conflicts on my local branch. |
Thanks @patelh! I'm also no longer a contributor to Knewton's repo, but I'll see what I can do. |
@patelh If knewton were to restore my write access temporarily, could you give me access to where you were fixing the conflicts so that I can push it to this repo? |
I don't know if this helps, but if you push your merge conflict fixes to my repository, I can ask Knewton to accept my pull request to their fork. I'm sorry this is more complicated and slow than it could have been. Thanks for your patience, @patelh |
I've pushed a new branch to your repo. Please push to knewton or close this PR and create a new one or I can create a new PR with your branch (your commits would be preserved). |
@cvcal It may be faster to create a new pull request to merge cvcal branch from your repo to yahoo's. Please just create a new PR. |
I agree, I wanted to see if it could happen quickly enough to be worth it, but I doubt it. |
Add Consumer group data, with offsets and lag - #108 but direct
This addresses issue #80