Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve throttled processing strategy #546

Merged
merged 7 commits into from
Jul 3, 2019
Merged

Improve throttled processing strategy #546

merged 7 commits into from
Jul 3, 2019

Conversation

martincostello
Copy link
Member

TL;DR In high throughput scenarios, the Throttled class' implementation causes Task backlogs that consume CPU and memory that cannot be recovered when message rates drop off.

This PR attempts to improve the behaviour of the Throttled implementation to use the async methods on the SemaphoreSlim class rather than wrap wait handles in tasks and manually queue to the thread pool.

Also makes changes to consider the fact that code using the processing strategy can race and think there are available workers but then there not be any available when it attempts to process the message.

This last point in particular needs some thought and discussion, as the implementation seems quite baked into that fact, so seems to just be asking for race conditions in retrospect.

The implementation does not appear to take into account that reads of the "free workers" value is non-atomic with respect to the time it takes to request messages from SQS and start to process them.

Details

These changes stem from a production issue where a worker that processes a single SQS queue was migrated from .NET Framework 4.7.2 to .NET Core 2.2.5. This change was canary'd on a single EC2 instance for ~24 hours on a queue that has a particularly high message throughput rate.

Below are the CPU and memory consumption for a 24 hour period where the code using JustSaying 6.0.1 is in use:

image

As can be seen, there is a slow memory leak in the service, as well as an increasing CPU usage from approximately 5pm which climbs up to ~50% by 9pm which never recovers even when messages being delivered drops off to near zero, as shown below.

image

There was otherwise no obvious signs of the worker being in distress, apart from the memory and CPU. The mean time to process messages in the canary was actually more consistent and messages where being handled faster than the .NET Framework version:

image

Capturing a process dump from the canary showed that there were hundreds of tasks scheduled to be run with various stacks and start points, suggesting the thread pool was reaching its limits.

Some sniffing around in the code lead to looking at the Throttled class for performance bottlenecks, as the application is small and only consumes one queue with a single handler and makes a HTTP call to an external service based on the contents of the message, so is quite simple and lightweight.

Digging into the history leads to some refactoring in #440, but that was merged to master after 6.0.1 was released to NuGet, with no significant changes in the rest of the commit history (as far back as 5th April 2017, it's hard to look before that due to renaming).

My working hunch is that the processing rate improved to a degree that the throttling became the bottleneck, and tasks to process messages were backing up due to the indefinite wait for the semaphore here:

https://github.com/justeat/JustSaying/blob/88f241c8a6bcb5768e2d57fae4aef906aacad53a/JustSaying/Messaging/MessageProcessingStrategies/Throttled.cs#L26

The attached refactoring is based on changes made directly to the application yesterday which was tested overnight last night to test the hypothesis (as well as increase the throttle factor from 8 to 16), which showed the same throughput improvements but without the CPU or memory issues. In fact, the CPU used is now less that the .NET Framework version in general.

Below are the comparison charts to the above with the fix. The CPU is the lowest orange line (look near 8pm) and the memory is the lowest yellow line, showing that while slightly more memory is used, there's no leak.

image

image

@codecov
Copy link

codecov bot commented Jun 20, 2019

Codecov Report

Merging #546 into master will increase coverage by 0.39%.
The diff coverage is 60.4%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #546      +/-   ##
==========================================
+ Coverage   38.12%   38.52%   +0.39%     
==========================================
  Files          78       78              
  Lines        2662     2754      +92     
  Branches      465      487      +22     
==========================================
+ Hits         1015     1061      +46     
- Misses       1498     1536      +38     
- Partials      149      157       +8
Impacted Files Coverage Δ
...ProcessingStrategies/DefaultThrottledThroughput.cs 100% <100%> (ø) ⬆️
...wsTools/MessageHandling/SqsNotificationListener.cs 61.25% <36.73%> (-7.2%) ⬇️
...Messaging/MessageProcessingStrategies/Throttled.cs 69.9% <68.53%> (-16.15%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0badfbe...08e9ceb. Read the comment docs.

Copy link
Contributor

@maurofranchi maurofranchi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems legit. Only a couple of comments

var counter = new ThreadSafeCounter();

var actions = BuildFakeIncomingMessages(messageCount, counter);

await ListenLoopExecuted(actions, messageProcessingStrategy);

fakeMonitor.DidNotReceive().IncrementThrottlingStatistic();
if (messageCount == capacity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please separate the tests where the messageCount == capacity into another test method so you can avoid the if and make what you want to assert more clear?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behaviour needs discussion overall, for now I just wanted it passing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to separate the two cases in later commits.

messageProcessingStrategy.AvailableWorkers.ShouldBeGreaterThan(0);

stopwatch.Elapsed.ShouldBeLessThanOrEqualTo(timeout,
$"ListenLoopExecuted took longer than timeout of {timeoutSeconds}s, with {actions.Count} of {initalActionCount} messages remaining");
stopwatch.Elapsed.ShouldBeLessThanOrEqualTo((timeout * 3),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does the * 3 come from?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Threading means things don't take exact amounts of time and pending an agreement on exactly how this should work, this was a big enough number to get the time within enough of some bounds to get the test passing pending a proper discussion on how this should work because the overall approach to the message processing seems a bit flawed to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I suspected it was just to make it as big as possible to make it pass. Will wait for the refactoring (eventually) after the discussion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test to not validate the timings anymore.

@iancooper
Copy link
Contributor

iancooper commented Jun 21, 2019 via email

@slang25
Copy link
Member

slang25 commented Jun 21, 2019

This is some really nice analysis 😄
Just to add a bit of context and history, I did a quick writeup on IMessageProcessingStrategy, the interface is basically broken because it pretty much guarantees there will be a race condition with how it will be consumed (calling WaitForAvailableWorkers then getting the number available). The more you can get away from the existing shape of this interface the better!
#422 (comment)

@slang25
Copy link
Member

slang25 commented Jun 21, 2019

Another thing to make you aware of (you may have already read it) is this great read from Mac Gravell on the topic of SemaphoreSlim and WaitAsync:
https://blog.marcgravell.com/2019/02/fun-with-spiral-of-death.html
and this upcoming change to corefx:
dotnet/corefx@ecba811

@martincostello
Copy link
Member Author

Thanks @slang25 - sounds like I did understand what was going on properly then, and that it is setting itself up for a race condition.

As 7's going to be a breaking change anyway, sounds like this is something I can try and fix properly here as part of this PR then, rather than fudge it and come back to it later.

Removing it and changing the "worker" design sounds like a sensible approach to make this nicer then, in some way.

Once thing I'd considered is instead of:

  1. Get X messages
  2. Process X messages in parallel
  3. Wait for free workers
  4. Repeat

We could change it to:

  1. Create Y workers
  2. Get Z messages
  3. Each worker Y processes Z messages sequentially
  4. Repeat

The idea being it's easier to have many things doing something serially, than one thing trying to manage many parallel things.

Thoughts?

I'll also check out that blog post and the related PR later on.

@martincostello
Copy link
Member Author

@slang25 Sounds like the app was experiencing a variant of the resource exhaustion thing Mark Gravell’s post, as the old code was using a TCS too, except we were still processing messages so the instance wasn’t totally dead.

@slang25
Copy link
Member

slang25 commented Jun 23, 2019

The TCS code I added when adding the semaphore implementation, I wanted an async way of waiting for the semaphore value to change and then to peek at the count.
At the time I considered doing a WaitAsync and Release but worried that a change to the semaphore in between could happen, tripping things up a bit. At the time this change was to mitigate against a bug that would lose count of workers, potentially creating an entire platform outage at peak, so I was mega-nervous, and favouring correctness over perf (even though this whole interface is broken).

I've given this some more thought, and I can't think of a way this would get tripped up, so I should have gone with this approach first time around.

On your suggestion about changing the processing pattern, I think what you're suggesting is a better approach. What can happen now with a shared throttled strategy of let's say 10 workers, under queue back pressure, is that every other message fetch tries to get 1, and because of concurrency they are still fetching too many, so if you have multiple queues the fetch counts can look like:

Q1 - ... 1, 9, 1, 1, 8, ...
Q2 - ... 1, 1, 1, 6, 1, ...
Q3 - ... 1, 1, 1, 1, 1, ...

or something as random as that.

Part of me wants to smooth out these numbers by tracking a rolling average of the rate of messages being processed, this would potentially introduce some (bounded) back pressure into the service (and out SQS) which is less desirable, but if kept minimal is manageable (messages could timeout easier for example). With this suggestion, the listenloop would have a steady cadence, this can also help reduce latency during quiet periods, rather than wait 20 seconds to receive a message batch containing 1 message, you could ask for 1 message and get it as soon as it's in the queue. However, all this comes at the cost of yet more complexity, which is where I always drop the idea. I think your suggestion wins in terms of simplicity and saneness.

If TFMs were no barrier, what you are suggesting would be really nice exposed as an IAsyncEnumerable<Message>, people would be able to use Ix.NET to throttle or do whatever. I've persuaded Steve Gordon it's a great fit for his high-perf SQS client (see here: https://github.com/highperfcloud/AwsSqsClient/blob/e13c65862cee61523ba986cd900421691b925f77/src/Aws.Sqs.Core/LightweightMessageReader.cs#L11), this would all fit together so nicely (perhaps better suited to a new project).

@iancooper
Copy link
Contributor

We could change it to:

1. Create Y workers

2. Get Z messages

3. Each worker Y processes Z messages sequentially

4. Repeat

FWIW Brighter uses the model of multiple performers, where each thread is a message pump i.e.

  1. Create one or more threads. Each thread does:
    (i) Get Message
    (ii) Translate Message
    (iii) Dispatch Message

And so works serially, with parallelism and explicit option. We use long-running threads not thread pool threads, as we have a long running loop, which is an anti-pattern for the thread pool which expects thread to do short jobs and optimizes for that AFAIK.

We are bit more old school, and the loop reads from an in-memory queue, into which we can insert a quit message so that we exit after the next job is taken, instead of a cancellation token, and the queue itself can act as a shared buffer on some transports so you can fill that buffer and have multiple performers read the same pool of work (it won't work on RMQ for example, where there is a thread affinity with a channel and the delivery tag that you ack with, but does for something like SQS which is just using HTTP and doesn't have open channels etc).

Our model reflects the need to preserve ordering which requires a single peformer and let's you manage that, and is overall simpler than trying to write back pressure on top of the pool, something that is possible but having done it a few times not something I really like over using explicit threads, as they are long running.

The biggest problem I need to fix though is that we have on thread for the pump and handler, which makes it easy, you only get the next item when you finish work, but to support an async message pump (for scenarios where you don't need ordering).

We want to support thread affinity on a call back and we can support that using our own queue to implement a sychronization context so that your callback becomes another item to execute via the loop, but that's not finished and has some tricky edge cases.

Anyway, mentioned just to compare notes on approaches, but we steered away from the thread pool right from the beginning due to less that positive experiences for this kind of scenario.

@slang25
Copy link
Member

slang25 commented Jun 24, 2019

Having a synchronous message pump makes a lot of sense, certainly better than blocking on thread pool threads as you say.

Here the pump is per queue and asynchronous, which saves us a thread per queue when idle. I'd certainly trade that for better-performing code. This PR hopefully gives us both (or at least to some extent).

Out of curiosity, are you saying that your message handler async callbacks are run on one thread? What do you want/need that for?

@iancooper
Copy link
Contributor

Having a synchronous message pump makes a lot of sense, certainly better than blocking on thread pool threads as you say.

Here the pump is per queue and asynchronous, which saves us a thread per queue when idle. I'd certainly trade that for better-performing code. This PR hopefully gives us both (or at least to some extent).

Out of curiosity, are you saying that your message handler async callbacks are run on one thread? What do you want/need that for?

it's more "can be" we let you set configureawait to true so that you can have thread affinity on your callback. We have a number of users that want that, mostly for legacy reasons, and even HttpClient is not quite as thread safe as most of might like. In addition, in some cases, you might not want to be pushed back onto the thread pool for your callbacks which is where the ASP.NET went in the end, as there is no synchronisation context in ASP.NET Core. I'm not entirely sure that dropping back into the thread pool is what you always want to do, as it comes with its own gotchas.

But for the moment it is parked because it is hard to get right, and the fact that the ASP.NET team quit on making it work was not encouraging...

@martincostello
Copy link
Member Author

Thanks for the input everyone - when I get the chance this week I'll come back to this PR and rework it some more based on the above discussion to change to something like this for the pump:

while (!cancellationToken.IsCancellationRequested)
{
    var messages = await GetMessagesAsync();

    foreach (var message in messages)
    {
        if (cancellationToken.IsCancellationRequested)
        {
            break;
        }

        await DoMessageStuffAsync(message);
    }
}

Then there just needs to be something opt-in to let you do that in parallel on several threads, which would let you do more in a worker that only processes one queue so that the app is making efficient usage of its resources (assuming multiple cores).

@martincostello martincostello changed the title [WIP] Improve throttled processing strategy Improve throttled processing strategy Jun 29, 2019
@martincostello
Copy link
Member Author

I think this is done and ready for review now.

@martincostello
Copy link
Member Author

I thought it didn’t queue anything else up further down the chain, so it would be actually handling the message before the await completed. Basically turning the strategy into “Do message, do message, do message, ..., get more messages, ...” rather than “get messages, despatch messages, get more messages, ...”.

@slang25
Copy link
Member

slang25 commented Jun 30, 2019

Sorry, I shouldn't try reviewing code on my phone 😳. I missed the await of RunActionAsync, this makes total sense. I'll have a proper looker this evening.

This was referenced Jun 30, 2019
slang25
slang25 previously approved these changes Jul 1, 2019
Copy link
Member

@slang25 slang25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fantastic work @martincostello, this head-on addresses some of the things that have bugged me for years, and the sequential processing option as well is a great addition. 😃

@martincostello
Copy link
Member Author

Thanks @slang25 😄

Will rebase onto #550 and then merged once I've gotten another ✔️

Improve the behaviour of the Throttled implementation to use the async methods on the SemaphoreSlim class rather than wrap wait handles in tasks and manually queue to the thread pool. Also makes changes to consider the fact that code using the processing strategy can race and think there are available workers but then there not be any available when it attempts to process the message.
Simplify the message loop and remove the concept of AvailableWorkers from IMessageProcessingStrategy.
Refactor the message loop to make it easier to follow.
Remove AvailableWorkers property.
Rename MaxWorkers to MaxConcurrency.
Add ThrottledOptions class.
Support serial processing as well as fire-and-forget using the thread pool.
Rename UseThreadPool to ProcessMessagesSequentially to make it more explicit and make it opt-in by default to match v6 behaviour.
Add unit tests that verify that messages are processed sequentially (or not) in Throttled.
Add a unit test that verifies that Throttled does not allow work concurrent work that specified by the options.
Rename method after changes in master.
@martincostello
Copy link
Member Author

Rebased.

Copy link
Member

@slang25 slang25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

Copy link
Contributor

@maurofranchi maurofranchi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good! Well done!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants