Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace LinkedBlockingQueue with ConcurrentLinkedQueue #90

Open
prudhvi opened this issue Oct 30, 2019 · 19 comments
Open

Replace LinkedBlockingQueue with ConcurrentLinkedQueue #90

prudhvi opened this issue Oct 30, 2019 · 19 comments

Comments

@prudhvi
Copy link

prudhvi commented Oct 30, 2019

Hi Team
We are noticing performance issue when using this client on a java app that emits high volume of metric updates/sec ie; in the order of > 600k (600 thousand) metric updates/sec. We did see improvement in throughput of the app when we replaced LinkedBlockingQueue with ConcurrentLinkedQueue.

Our app consumes messages from kafka and writes to Elasticsearch , in peak it has a throughput of 70k-90k messages per host per second and it emits/updates metrics for every consumed message.

The performance issue when using the LinkedBlockingQueue is due to the fact multiple threads of the app are contending to acquire lock to the queue and are in waiting state(as per thread dump and flamegraphs). Are there any reasons to not use ConcurrentLinkedQueue. Would you accept the upstream PR to replace the queue implementation or making the queue implementation configurable.

@prudhvi
Copy link
Author

prudhvi commented Nov 6, 2019

cc @baojr

@dsahni
Copy link

dsahni commented Nov 6, 2019

Hi @prudhvi, thanks for bringing this issue to our attention. We are reviewing your questions and will get back to you asap.

@prudhvi
Copy link
Author

prudhvi commented Nov 12, 2019

Just a small update incase if you think ConcurrentLinkedQueue in unbounded , what do you think of using http://javadox.com/org.jctools/jctools-core/1.0/org/jctools/queues/MpscArrayQueue.html from https://github.com/JCTools/JCTools. We tried this queue and it seems to do better than Linked BlockingQueue but not as good as ConcurrentLinkedQueue which is unbounded and requires more heap. The bottleneck we ran with MpscArrayQueue is that the single consumer thread that's writing to statsd can't consume as fast as producers are writing metrics to the queue( with queuesize of 10000) , so might involve more work there to remove the overhead in consumer thread or using multiple consumer threads using MPMC queue.

@olivielpeau
Copy link
Member

Hi @prudhvi, thanks for your suggestions on the queue implementation! Just so you know, we have work planned to improve the performance of this client: the queue implementation is something we're looking into, but we're also looking into the way the client currently groups metrics in UDP datagrams, and into reducing the memory allocations done by the client.

  1. before we dig into details of the queue implementation, could you share the options with which you initialize the client? If you could share some code snippets that reflect how the dogstatsd client is used within your app, that could help as well. (If you'd prefer not to share this publicly here, please open a support case by emailing the info to support@datadoghq.com, and mention this github issue in the email).
  2. do you have more details on the metric types (gauge, count, distribution, etc) of the metrics that you send with the client? Depending on the metric type that your app uses, some simple aggregation in the app could significantly reduce the metric throughput.
  3. do you use one instance of the client for the whole app, or one instance per worker thread? Having one instance of the client per worker thread could improve performance as well.

If none of these questions allows identifying low-hanging fruit to improve the client's performance, let's continue the conversation on the queue implementation and other code change we could make to improve the client's performance.

@prudhvi
Copy link
Author

prudhvi commented Nov 19, 2019

Thanks for the follow up @olivielpeau

  1. Sure will send the snippets showing usage to your support email.

  2. For the app that I mentioned 80% of the metrics are counters and remaining 20% are histograms. Prior to statsd integration we were using Yammer MetricsRegistry from which we are periodically scraping metrics and sending them to kafka. But with statsd integration using this client we are intercepting every call to update a metric in MetricRegistry(eg: counter inc , dec and histogram update) and sending that update to statsd agent( veneur) via this client. We are not periodically scraping from MetricsRegistry for statsd integration as we want to support first class tags in future(where counter inc and dec api's should support adding arbitrary tags) and don't want to be tightly coupled with MetricRegistry (which doesn't have firstclass tags support and even in upcoming version 5 it supports tags by storing every uniquely tagged metric in MetricRegistry) . Although I understand MetricRegistry provides local aggregation of counters which would reduce the number of updates. For now we are resorting to down sampling heavily to support this volume.

  3. Currently we are using one instance per whole app, as discussed above in 2) to support easier migration to statsd via existing code we are intercepting every call to MetricRegistry metric update and sending that update to Statsd agent via Singleton statsd client(Intercepting setup is done via Guice modules where we bind MetricRegistry to a custom implementation of our MetricRegistryWrapper which sends update to in memory MetricRegistry(Yammer singleton) as well as statsd.

@prudhvi
Copy link
Author

prudhvi commented Nov 20, 2019

@olivielpeau FYI included snippets for 1) at https://help.datadoghq.com/hc/en-us/requests/278328

@olivielpeau
Copy link
Member

Thanks Prudhvi for all these details!

With a volume of 600K points/sec on a single client, allowing for fully arbitrary tags is a challenge.

Have you identified the hot loop in the app that submits the highest volume of points? And would it be conceivable in the short-term to modify that part of the app so that it uses a slightly different interface with non-arbitrary tags (which would more easily allow for pre-aggregation, in particular for counters)? I don't know if you have access to the app's code itself, and if it's conceivable as part of the migration to modify the app's code.

Also, how many threads (roughly) does this app have?

@prudhvi
Copy link
Author

prudhvi commented Nov 21, 2019

Thanks @olivielpeau .
Yeah it's the app we maintain so we have access to modify it. Also we have all our java apps in a monorepo and this statsd metrics module (guice) gets installed in every app. Sorry when I was saying about adding tags support , I didn't mean specifically for this app. It's adding support to our metrics module that all other apps in monorepo can use. But yeah we will make sure to not cause tag explosion for this usecase.

Yeah we identified the part that produces such high volume of metrics. As ours' is a monorepo we have module for developing kafka consumer apps which provides a hook method for every message consumed from kafka and different kafka consumer applications implement that hook based on their need. In our case we are reading log messages from kafka and updating 14-15 metrics for every message we process. Changing the interface for this kafka consumer module isn't totally in our control.

And would it be conceivable in the short-term to modify that part of the app so that it uses a slightly different interface with non-arbitrary tags (which would more easily allow for pre-aggregation, in particular for counters)?

Not sure I understand this question, even if we limit the tag names we could emit so that it's not arbitrary how does the preaggegation work as the client still sends an statsd update message for every inc/dec operation even with limited tags. It would be nice if the dogstatsd java client does aggregation similar to MetricRegistry for counters instead of parsing and processing each update message(Lot of string parsing and formatting). Or are are you suggesting we do the pre aggregation before sending a counter update via statsd client? If that's the case as I said earlier it's not an easy change and many other kafka consumer apps depend on the interface. And adding tags won't create a new problem for us as they will be limited, it's just the sheer volume of metric updates we are sending that's causing the bottleneck. Will share the flamegraphs I took with https://github.com/jvm-profiling-tools/async-profiler before and after integrating with statsd client that shows that the CPU is spent mostly on statsd client updates.

We have 350-390 threads (including both daemon and non daemon from ThreadMXBean.getThreadCount() ) running in the app.

@SerCeMan
Copy link

SerCeMan commented Nov 23, 2019

Hey, folks! We've noticed a similar pattern in our applications where we observed these two issues:

  • Extensive CPU usage on EC2 spent sending UDP packets.

Depending on the application, the CPU consumption can vary from 5% to 25%. Here's an exempt from a related CPU flame-graph:
Screen Shot 2019-11-23 at 3 01 04 pm

but we're also looking into the way the client currently groups metrics in UDP datagrams, and into reducing the memory allocations done by the client.

@olivielpeau, I assume that this is exactly that problem, and you're already on top of it?

  • CPU usage, and locking on the shared metrics queue.

I believe this is the exact issue that was discussed here initially. IMO, apart from ConcurrentLinkedQueue, a viable solution here would be to use an MPSC (Multiple-Producer Single-Consumer) queue from JCTools. This can give quick and significant benefits with minimum code changes. Or, maybe even thread-local-based constructs if we can assume that the data is completely independent. I think a small set of JMH benchmarks would work incredibly well for this project as DD is used in many high-load project.

@prudhvi
Copy link
Author

prudhvi commented Nov 25, 2019

Thanks for the followup @SerCeMan . I have tried MPSC queue with queuesize of 10k and noticed that the single consumer wasn't able to consume fast enough if we have multiple producers sending statsd messages to the queue at the rate I mentioned( > 600k metric updates per second) so we are seeing incorrect values as the queue drops the messages if it's full. So suggested if MPMC could be looked into or optimizing the consumer thread so that it can keep up with producer rate.

@SerCeMan
Copy link

Hey, @prudhvi! What do you mean by "wasn't able to consume fast enough"? As far as I can see, there is only a single consume in the NonBlockingStatsDClient - c.t.s.NonBlockingStatsDClient#executor thread, that's why I suggested MPSC queue.

Are you seeing thread starvation where the consumer thread doesn't get enough quota to send the data? If yes, can it be a sign that the system is now working much faster overall? Or are you seeing something different?

@prudhvi
Copy link
Author

prudhvi commented Nov 25, 2019

Yeah it only has single consumer for now, but suggesting if it could be made multiple consumers so that the queue doesn't fill up and drop messages. We noticed that with MPSC and 10k queuesize and 5000 as maxPacketSizeBytes metrics updates from producers are getting dropped.

Not sure if it's starvation or just slowness in writing to Unix Domain Socket in the consumer thread that prevents it from keeping up to speed.(we are using Unix Domain socket not UDP)

@olivielpeau
Copy link
Member

Thanks for all the additional details @prudhvi, and thanks @SerCeMan for chiming in (and the suggestions)

@prudhvi When using the MPSC queue, were the producers at least fast enough to not noticeably slow down your application? (I'm assuming they were not anymore, but double-checking)

When using dogstatsd with Unix Domain Socket, the thread that writes to the socket is blocked if the socket is full, and on most systems the socket has a relatively small max buffer size by default. So a small burst of data (or a small timeframe during which the statsd agent doesn't read from the socket) can easily end up with the writing thread filling up the socket and then having to wait on the statsd agent to read from the socket.

Therefore it may be worth looking into the kernel parameters defining the buffer sizes of the sockets. Increasing the buffer size can help when the writing thread is blocked waiting on the statsd agent to read from the socket. Could you run the following commands to find out the values of these parameters on your hosts:

  • sysctl net.unix.max_dgram_qlen: defines the max number of datagrams in the buffer of unix domain sockets
  • sysctl net.core.wmem_default and sysctl net.core.wmem_max: defines the max buffer size (in bytes) a unix domain socket is allowed to have.

You can increase net.unix.max_dgram_qlen (with sysctl -w net.unix.max_dgram_qlen=X) roughly as long as net.unix.max_dgram_qlen * maxPacketSizeBytes remains below net.core.wmem_default.

This may help increase the throughput, let us know, in any case the info will be helpful. I'm not sure at the moment if increasing the number of consumer threads would actually improve the problem here (as they may all end up blocking on the write to the socket).

@prudhvi
Copy link
Author

prudhvi commented Dec 3, 2019

@olivielpeau when using MPSC queue it's still spending 30% CPU instead of 40% CPU (when I searched for yammer in svg) and still having overhead for the application. I will attach the flamegraphs for MPSC queue in the ticket.

Thanks aware of socket size limitations so we bumped the net.core.wmem_max to 8MB on our hosts from a low value. We didn't want to bump the default as it would effect all sockets consuming more memory irrespective of usecase and we run multiple LXC app containers on single host
our default are net.core.wmem_default = 212992. net.unix.max_dgram_qlen = 512

Veneur(statsd agent) starts listening on socket by setting the socket size to 2MB (less than the max 8MB configured via net.core.wmem_max) . However I tested by configuring socket size to 8MB as well and didn't see any difference in CPU usage by the consumer thread as per the flamegraphs. But yeah you are right multiple consumer threads might not help if writing to socket is blocker. However I am not sure how to check(what tools to use) if the socket buffer is full for Unix Datagram sockets and is blocking the consumer thread in writing to socket. Let me know if there is a way to verify it.

@truthbk
Copy link
Member

truthbk commented Jan 3, 2020

Hi All!

Once again thank you @prudhvi and @SerCeMan for your feedback. We have started a bit of a revamp to improve client performance - first focusing on throughput (though we will later focus on resource efficiency, ie. cpu/memory usage).

We have 3 PRs (with more work in the pipeline):

Trying to address some of the issues described here, or at least trying to provide different options to our users who may be in need of greater performance. As we can see from @SerCeMan flamegraph we spend quite a bit of time writing to the socket, so the first quick gain was to move the socket writes to a different thread. Decoupling the datagram assembly logic from the actual submission should allow the dogstatsd application to continue to write statsd messages to the queue. The new approach with sender workers (writing to the socket) will also improve our client behavior while waiting on IO as the IO-blocked thread - now separate - should give up it's time-slice at the kernel level. The number of worker threads (ie. consumers) is configurable and we have currently not experienced any gains by increasing the number of socket-writing consumers.

To deal with the architectural change we have introduced a direct buffer pool, configurable in size, that should allow us to curb memory allocations, helping performance, in the new scenario. Currently the only scenario that would block the client packet-assembling thread would be running out of byte buffers (ie. having them all queued up for submission, with none remaining in the pool). This is an unlikely scenario which can be dealt with by dropping incoming messages.

To further improve performance, an option to use a non-blocking queue has been introduced as well. This should definitely reduce contention. The actual queue implementation we might finally use is still undecided - we are currently testing a ConcurrentLinkedQueue with a soft upper bound to curb memory usage, but we have to try MPSC, and other array-based queues to make the final decision.

I will post some numbers soon to get a sense of the gains introduced by the proposed changes. Thank you all for your feedback and ideas with respect to the client and way we can improve it.

Feel free to test it out if you wish. The PRs are stacked meaning they build up on top of each other, so #96 includes the changes in the other PRs as well. Please note that there are some breaking changes (in particular in the way the client is instantiated - better documentation should be added to the PR; sorry about that), so please keep that in mind if you decide to test these with your applications.

@prudhvi
Copy link
Author

prudhvi commented Jan 7, 2020

Thanks a lot for these improvements @truthbk , we will try these changes. It would be nice to merge in the benchmark tests too.

@prudhvi
Copy link
Author

prudhvi commented Jan 7, 2020

cc @AndrooHan

@truthbk
Copy link
Member

truthbk commented May 5, 2020

@prudhvi @AndrooHan we have released v2.10.0 with a large set of improvements, and a bunch of new knobs we can use to improve performance. We have also been able to reduce the latency considerably for metric submission. I wanted to let you guys know in case you wanted to give it a shot. It should help. Also, if you encounter any issues please let us know and we will make sure we address them ASAP.

I will leave the issue open a few more days, couple of weeks, and if there's no feedback I will likely go ahead and close it.

Regardless, please feel free to reach out if you need anything.

@franz1981
Copy link

Sorry to join late the discussion, but if you need a fast unbounded queue on JCTools that work better under contention, if it turns to be the issue with the other MPSC (including JCTools ones), we've added https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscUnboundedXaddArrayQueue.java (that has a MPMC variant as well). Feel free to try and report any question :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants