Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profiling Scheduler Performance #4443

Open
mrocklin opened this issue Jan 19, 2021 · 87 comments
Open

Profiling Scheduler Performance #4443

mrocklin opened this issue Jan 19, 2021 · 87 comments

Comments

@mrocklin
Copy link
Member

mrocklin commented Jan 19, 2021

When running Dask with TLS security turned on, the scheduler can easily become bottlenecked by SSL communication. See the performance report below, in particular the "Scheduler Profile" tab.

https://gistcdn.githack.com/mrocklin/1d24fbac2c66364d8717952ee8827c64/raw/9c87343cb358b54eb26dbfe8b0714120c0f5ad81/report.html

I ran this on my laptop with the following code

import dask
from dask.distributed import Client, performance_report, wait

client = Client(security=True)

dask.config.set({"optimization.fuse.active": False})

df = dask.datasets.timeseries(start="2020-01-01", end="2020-01-31", partition_freq="1h", freq="60s").persist()

with performance_report("report.html"):
    df2 = df.set_index("x").persist()
    wait(df2)

When this is run on a larger cluster with many workers this problem becomes significantly worse. What are some ways that we can reduce the cost of secure communication?

@mrocklin
Copy link
Member Author

Hrm, here is another performance report for when security is turned off. Tornado is still a significant cost.

https://gist.githubusercontent.com/mrocklin/5b1e870bc37875f03bf0a6fe0aaec4ba/raw/35cb4a2a803ebe6683f551fed1a17f07fb515c32/insecure.html

@mrocklin
Copy link
Member Author

OK, I think that this is just down to python's socket.send costs.

I put a timer around the socket.send calls in tornado/iostream.py and came away with

 3.37 s / 10764 = 313.11 us

We're spending 300-500us per call and making lots of calls. I can try to batch things a little bit on the worker side but that will only give us a factor increase. I'm curious how we can take communication overhead off of the main thread.

For reference, I got these numbers by instrumenting Tornado in the following way

diff --git a/tornado/iostream.py b/tornado/iostream.py
index 768b404b..f3d000bc 100644
--- a/tornado/iostream.py
+++ b/tornado/iostream.py
@@ -81,6 +81,38 @@ if sys.platform == "darwin":
 _WINDOWS = sys.platform.startswith("win")
 
 
+
+import contextlib
+from collections import defaultdict
+from time import time
+from dask.utils import format_time
+
+total_time_data = defaultdict(float)
+counts_data = defaultdict(int)
+
+
+@contextlib.contextmanager
+def duration(name: str) -> None:
+    start = time()
+
+    yield
+
+    stop = time()
+
+    total_time_data[name] += stop - start
+    counts_data[name] += 1
+
+
+import atexit
+
+@atexit.register
+def _():
+    for name in total_time_data:
+        duration = total_time_data[name]
+        count = counts_data[name]
+        print(name, format_time(duration), "/", count, "=", format_time(duration / count))
+
+
 class StreamClosedError(IOError):
     """Exception raised by `IOStream` methods when the stream is closed.
 
@@ -1144,7 +1176,8 @@ class IOStream(BaseIOStream):
 
     def write_to_fd(self, data: memoryview) -> int:
         try:
-            return self.socket.send(data)  # type: ignore
+            with duration("send"):
+                return self.socket.send(data)  # type: ignore
         finally:
             # Avoid keeping to data, which can be a memoryview.
             # See https://github.com/tornadoweb/tornado/pull/2008
@@ -1564,7 +1597,8 @@ class SSLIOStream(IOStream):
 
     def write_to_fd(self, data: memoryview) -> int:
         try:
-            return self.socket.send(data)  # type: ignore
+            with duration("send"):
+                return self.socket.send(data)  # type: ignore
         except ssl.SSLError as e:
             if e.args[0] == ssl.SSL_ERROR_WANT_WRITE:
                 # In Python 3.5+, SSLSocket.send raises a WANT_WRITE error if

@pitrou do you have thoughts here on if it is possible to avoid this 300us cost ?

@pitrou
Copy link
Member

pitrou commented Jan 20, 2021

Do you witness different numbers when TLS is turned off?

@pitrou
Copy link
Member

pitrou commented Jan 20, 2021

Also, to get a better idea of what's happening, can you print all deciles rather than simply the average? The actual distribution should be insightful.

@pitrou
Copy link
Member

pitrou commented Jan 20, 2021

Oh, and account IOStream and SSLIOStream durations separately!

@mrocklin
Copy link
Member Author

These numbers are actually with SSL turned off. I realize now that the title of the issue is confusing. I realized when diving into this that this is slow with normal TCP without security, so I've focused on that for now.

And yes, I'll get deciles shortly.

@pitrou
Copy link
Member

pitrou commented Jan 20, 2021

If SSL is turned off and this is a non-blocking socket (as it should be, since we're using Tornado), then the only reasonable explanation is GIL-induced measurement bias. The quantiles should probably help validate this hypothesis.

@mrocklin
Copy link
Member Author

--------------------
send : 10562 events
0.01 %:  0.95 us
25.0 %:  20.98 us
50.0 %:  35.52 us
75.0 %:  47.21 us
99.9 %:  18.56 ms

@mrocklin
Copy link
Member Author

This is surprising though. I would expect the 50% value to be around 300us. I'm double-checking the instrumentation.

@pitrou
Copy link
Member

pitrou commented Jan 20, 2021

I'm not surprised. Most calls are quite fast (it's just a non-blocking system call). A small fraction of the calls have to wait for the GIL before returning, and therefore take more than 10 ms.

@pitrou
Copy link
Member

pitrou commented Jan 20, 2021

In other words, you're just seeing the effects of the GIL on performance of a single thread in a multi-thread Python program. System calls like socket.send release the GIL so tend to attract GIL switches on them, which is why they can look so costly :-)

@pitrou
Copy link
Member

pitrou commented Jan 20, 2021

This tooling might be useful to better detect such situations:
https://www.maartenbreddels.com/perf/jupyter/python/tracing/gil/2021/01/14/Tracing-the-Python-GIL.html

cc @maartenbreddels

@mrocklin
Copy link
Member Author

Yeah, you've mentioned that calls like socket.send are likely masking some other call before. I think that this is the first time that I fully understand what is going on. Seeing the quantiles helped me. Thank you for directing me to that.

I'm still left with the question of "what is taking time and making things slow?" I don't have much experience profiling code at this level when the GIL is involved. @maartenbreddels, any suggestions?

@maartenbreddels
Copy link

I tried to reproduce it locally, but I get a wildly different report, dump of what I've done:

$ pip install distributed==2021.01.0
$ pip install dask==2020.12.0
$ dask-scheduler --host 0.0.0.0
$ dask-worker localhost:8786 --nthreads=3 --nprocs=4 --memory-limit=10G
import dask
from dask.distributed import Client, performance_report, wait


def main(args=None):
    client = Client('127.0.0.1:8786')

    dask.config.set({"optimization.fuse.active": False})

    df = dask.datasets.timeseries(start="2020-01-01", end="2020-01-31", partition_freq="1h", freq="60s").persist()
    with performance_report("report.html"):
        df2 = df.set_index("x").persist()
        wait(df2)

if __name__ == "__main__":
    main()
$ /usr/bin/time -v python use_case/dask-scheduler.py         dev
        Command being timed: "python use_case/dask-scheduler.py"
        User time (seconds): 2.02
        System time (seconds): 3.61
        Percent of CPU this job got: 21%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:26.09
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 133176
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 41883
        Voluntary context switches: 3903
        Involuntary context switches: 123225
        Swaps: 0
        File system inputs: 0
        File system outputs: 20776
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0

Reults in:
image

Which shows >10x more tasks.

Am I doing something wrong here?

@maartenbreddels
Copy link

A small fraction of the calls have to wait for the GIL before returning, and therefore take more than 10 ms.

@pitrou Just curious why you think 10ms, I'd guess 5ms from the default of sys.getswitchinterval

System calls like socket.send release the GIL so tend to attract GIL switches on them, which is why they can look so costly :-)

I've been thinking a bit what the best way is to describe why it is/looks so costly. Would you agree that it's not perse the GIL switch that is costly (e.g the thread context switch is relatively cheap), but that often releasing the GIL in a thread within 5ms, while other threads don't do that (like running pure Python code) will lead that thread too often having to wait for the GIL.

"what is taking time and making things slow?"

I think it is the attempt to return from the (Python) send function, which tries to obtain the GIL. The (Python) send function released the GIL, does the send syscall, which returns in 1us, but wanting to return to Python land, it first needs to acquire the GIL. If other threads are not as eager to release the GIL (such as pure Python code, which will do that only after 5ms, or a long running c-function that does not release the GIL), send will effectively take 5ms.

This is how I understand the situation described in https://bugs.python.org/issue7946 which I've went over a bit in maartenbreddels/fastblog#3 (comment) (comment section of https://www.maartenbreddels.com/perf/jupyter/python/tracing/gil/2021/01/14/Tracing-the-Python-GIL.html )

I think giltracer might tell you if the picture painted here is correct, if that is the case, there are no good solutions I think. Workarounds/bandaids I can think of:

  • call a GIL releasing function from the other threads (e.g time.sleep(0)) to
  • call/change sys.setswitchinterval

Possibly you may be able to identify a c-function that does not release the GIL (unlikely), or you can make a strong case for doing a c-extension to work around this. This is actually the primary reason I build this tool and wrote the article, I want to make sure before building a C version of a ThreadPoolExector that it's worth it (and still not sure about it).

@pitrou
Copy link
Member

pitrou commented Jan 21, 2021

@pitrou Just curious why you think 10ms, I'd guess 5ms from the default of sys.getswitchinterval

Hmm, yes, you're right. I was thinking about the typical OS timeslice.

Would you agree that it's not perse the GIL switch that is costly (e.g the thread context switch is relatively cheap), but that often releasing the GIL in a thread within 5ms, while other threads don't do that (like running pure Python code) will lead that thread too often having to wait for the GIL.

I don't know if it's "too often". It depends what other threads are doing too. The distribution of durations can tell us how "often" that happens.

there are no good solutions I think

Before finding solutions, one would have to ask the question "does this need solving?". Even if you find ways to balance scheduling between the two Python threads (the one that does IO and the one that runs pure Python code), you'll still be executing the same workload in the same process execution time, just ordered differently.

So the question is: would it improve your overall performance (for example by providing data earlier to other nodes) if you managed to prioritize IO calls before pure Python code in the scheduler process?

@mrocklin
Copy link
Member Author

I tried to reproduce it locally, but I get a wildly different report, dump of what I've done:

Yes, when I produced the original performance reports I forgot to call wait(df2) inside of the performance_report context manager. I fixed this so that when others ran the example they would get correct results, but I didn't update my wrong results in the links. Your results are more correct.

With regards to thread switching I'm not sure I understand what is happening. My understanding from what I read above is that if you have two threads trading off the GIL there is a multi-millisecond delay in handing off the GIL. This would surprise me. I generally expect lock-style objects to engage in 10us or so in Python. Why would the GIL be so much slower?

Before finding solutions, one would have to ask the question "does this need solving?". Even if you find ways to balance scheduling between the two Python threads (the one that does IO and the one that runs pure Python code), you'll still be executing the same workload in the same process execution time, just ordered differently.

Even if this doesn't improve performance I'm very curious about what would improve performance. I care as much about visibility here as anything else. People today are very curious about how to make the scheduler run faster. They're happy to pour engineering resources into it. Currently they're targetting various aspects of the scheduler, but I'm not confident that we're working in the right place. I am searching for more visibility into what is taking up time.

Another thing we could do here, if it would help, is try to keep the scheduler single-threaded. Currently we intentionally offload compression/decompression to a separate thread. To me this seems like a good idea, but that's because I assumed that engaging multiple threads didn't cause significant GIL issues. The 5-10ms number above has me confused (it's way higher than I would have expected). Would keeping things single-threaded improve performance and/or visibility?

@pitrou
Copy link
Member

pitrou commented Jan 21, 2021

My understanding from what I read above is that if you have two threads trading off the GIL there is a multi-millisecond delay in handing off the GIL.

Releasing the GIL is fast. Acquiring the GIL can lead to waits if the GIL is already held by someone else, which is what is being witnessed here :-)

What socket.send does is:

  1. release the GIL (fast)
  2. call the system call send() to put data on the TCP buffer (medium-fast? a system call is not costless, and depending on the data size the copy may be slightly expensive as well)
  3. acquire the GIL (slow if need to wait for some other thread to release it!)

Even if this doesn't improve performance I'm very curious about what would improve performance.

My opinion is that it would need one or both of these things:

  • try to vectorize the scheduling algorithm (as discussed on another issue)
  • rewrite the scheduler in another language such as Rust

Both things obviously non-trivial...
By the way, given the amount of time that's regularly been consumed in investigating scheduler performance issues (or perceived as such), perhaps the "write in another language" possibility should be explored seriously.

The 5-10ms number above has me confused (it's way higher than I would have expected). Would keeping things single-threaded improve performance and/or visibility?

It would certainly make things easier to understand, but also would give worse performance. Offloading compression probably benefits overall performance, but also makes it slightly less predictable.

@mrocklin
Copy link
Member Author

try to vectorize the scheduling algorithm (as discussed on another issue)

Yeah, I don't think this is feasible given Dask's execution model.

rewrite the scheduler in another language such as Rust

We're currently using Cython (you might find this work interesting, see recent PRs from @jakirkham). If we need to go to C++ we're open to that, and the state machine logic is being isolated with this option in mind.

However I'm also not convinced that that logic is the slow part of the system. I am comfortable rewriting in C++ if necessary, but I want to avoid the situation where we do that, and then find that it's not significantly faster. John has been able to increase the performance of the tricky scheduling logic considerably (the stuff that I would expect to be slow in Python) but this hasn't resulted in significant performance improvements overall, which has me concerned.

I'm open to moving to a lower level language, but I first want to understand the scope of code that we need to move over to have a good effect.

@pitrou
Copy link
Member

pitrou commented Jan 21, 2021

However I'm also not convinced that that logic is the slow part of the system.

I didn't mean that. There's probably no obvious hot spot (which is why an entire rewrite may be the solution, rather than some select optimizations in Cython).

@mrocklin
Copy link
Member Author

There are two options for the scheduler, I think:

  1. Rewrite everything in C++
  2. Keep the networking and event management in Python, and rewrite only the scheduling logic in C++

Option 2 would be nice, if it makes sense.

@pitrou
Copy link
Member

pitrou commented Jan 21, 2021

My intuition is that the "death of a thousand cuts" that's the main performance limiter applies as much to the networking and event management as to the core scheduling logic.

But don't take my word for it, I haven't tried to profile the dask scheduler in years.

@mrocklin
Copy link
Member Author

Adding @jcrist , who might find this conversation interesting. I think that he was looking at efficient Python networking recently.

@jakirkham
Copy link
Member

Well one observation I've shared with Ben and maybe you as well is within the Scheduler there are a bunch of sends in transitions themselves. PR ( #4365 ) fixes that by moving the sends out of the transitions and grouping them together. IDK if it would help the issues identified here, but I do think grouping communication a bit more closely together may generally be helpful regardless of what we do next.

Another thing to consider is that when we send a serialized message currently we do a few sends. First to tell how many buffers need to be sent. Next how big those buffers are (with UCX we also include whether they are on the CPU or GPU). Finally the buffers themselves. We may want to figure out how we can aggregate all of these into one message. Admittedly it's not entirely clear how one would do this with TCP. I believe (but could be wrong about this) there is a way to do this with UCX.

@pitrou
Copy link
Member

pitrou commented Jan 21, 2021

I do think grouping communication a bit more closely together may generally be helpful regardless of what we do next.

Agreed.

Another thing to consider is that when we send a serialized message currently we do a few sends.

Does IOStream buffer writes internally? I don't remember.

@jakirkham
Copy link
Member

jakirkham commented Jan 21, 2021

Sorry responded too quickly. I think we are doing the buffering with BatchedSend. Not sure if Tornado does some buffering as well. Though I must admit that part of the code I don't understand that well 😅

Forgot to add that Scheduler messages themselves usually have a simpler form and so don't necessarily need to be serialized as if they were something more complex (like a DataFrame or Array). Maybe we can exploit this when sending? Some thoughts on that in issue ( #4376 ).

@mrocklin
Copy link
Member Author

Results from running with you ref_trans2 branch . No strong difference or information here, but I did start adding some more detail in the higher quantiles.

--------------------
send : 15056 events
total 4.61 s
average 305.99 us

0.01 %:  0.95 us
25.0 %:  26.70 us
50.0 %:  37.19 us
75.0 %:  50.07 us
90.0 %:  196.58 us
99.0 %:  6.74 ms
99.9 %:  19.10 ms

Right, so it might make sense to start looking into protecting calling socket.send a ton. However what I'm really learning from this exercise is that socket.send isn't the problem, but rather that there is something else happening on another thread that we need to identify. I can get my laptop to spend 50% of its time on socket.send. I think that this means that some other activity that we don't understand is taking up 50% of our schedulign time.

I've turned off offloading to a separate thread for this. Something else is still going on.

@jakirkham
Copy link
Member

Did that include Cythonizing it or merely run it as Python?

@mrocklin
Copy link
Member Author

Just Python for now

@jakirkham
Copy link
Member

It's also worth noting that we would no longer need to transmit the length of the frames since that is part of this message format. What I'm still trying to understand is if we might be able to get rid of transmitting the number of frames, at which point we really just have one send/recv occurring.

@mrocklin
Copy link
Member Author

mrocklin commented Feb 12, 2021 via email

@pitrou
Copy link
Member

pitrou commented Feb 12, 2021

It's also worth noting that we would no longer need to transmit the length of the frames since that is part of this message format.

Why do you say that? You definitely need to know the lengths of the frames in order to presize the buffers you give to recvmsg_into.

@jakirkham
Copy link
Member

jakirkham commented Feb 12, 2021

What if we use recvmsg?

Edit: Would add that the example in the docs of recvmsg_into, seems to suggest that we don't need to line up the same sized buffers on both sides. IOW we could allocate one big buffer and cut it up after

@jakirkham
Copy link
Member

It looks like someone started to implement this for Tornado ( tornadoweb/tornado#2734 )

@jakirkham
Copy link
Member

jakirkham commented Feb 17, 2021

One other thing worth considering here is that Dask uses multiple send and recv calls to transmit several buffers.

At least for changes in Distributed, PR ( #4506 ) gets this down to one IOStream write call for sending and two read calls for receiving (one for the message size and another for the message itself).

Under-the-hood the writes may still translate to multiple sends, but I don't think we can do too much there without PR ( tornadoweb/tornado#2734 ) or similar. However the reads should translate pretty closely to an equivalent number of recv calls (with a small delta based on what may already be buffered).

@mrocklin
Copy link
Member Author

An updated run. I left Dask profiling on, which excacerbated the socket.send issue.

image

@mrocklin
Copy link
Member Author

Some observations

  • extract_serialize and transition remain our largest single issues to resolve.
  • I'm curious about the time spent in handle_stream that isn't spent as part of another call
  • the msgpack/loads/socket.recv_into/... networking stack probably comes next. This is spread around into many functions, but together they add up to something. I think that we should consider things here top-down before micro-optimizing individual parts
  • However, please recall that the socket.send issue is larger in this run than typical, both because I left Dask profiling on, and because I'm over-saturating my machine (I suspect that process switching is coming up here). Some of the networking stack probably comes up hotter here than is appropriate. It would be good to run this on a proper cluster and without profiling.

@mrocklin
Copy link
Member Author

Here is another run, this time without Dask profiling turned on (also, this run covered less time, so numbers are less. Check the grey bars overlaying the numbers to get a sense of scope)

image

@jakirkham
Copy link
Member

This is still using a sampling window where only a small part of the run is kept, right?

To contrast this a bit, would take a look at this info from the Scheduler performance report ( quasiben/dask-scheduler-performance#109 (comment) )

@jakirkham
Copy link
Member

jakirkham commented Feb 19, 2021

Spotted transition_memory_released in that list. Found a few easy fixes there.

Also saw a pattern in transition_memory_released where a method, TaskState.get_nbytes(), was getting called repeatedly in a loop. Previously we had run into issues with get_nbytes and so optimized it to pretty efficient C code. However it still shouldn't be called in a loop when we can call it once beforehand. Fixed this occurrence. Also found several others like it and fixed them as well. Should cutdown some overhead (even in the pure Python case).

These are all in PR ( #4516 )

@mrocklin
Copy link
Member Author

Running a rechunk operation

I find that I'm not able to saturate my local machine's scheduler, but the profile is still interesting. It's somewhat different (also, this is with the PR that removes extract_serialize

image

@jakirkham
Copy link
Member

This is starting to look like the task completion thrashing that @quasiben mentioned on Friday we might run into

@mrocklin
Copy link
Member Author

I think that @quasiben mentioned that because the workers have been given very very short tasks that they might overwhelm the scheduler. Is this what you are referring to?

if so, then that is intentional. We want the workers to pester the scheduler as much as they can so that we're able to simulate the traffic on a larger cluster while testing on a very small machine.

Or in summary, all of this is about thrashing. Thrashing is the goal of these benchmarks :)

@jakirkham
Copy link
Member

What I mean is there is a bunch of work happening due to task completion. At least when we came into this work , the problem was the Scheduler was taking a long time dispatching the initial chunk of work. It doesn't seem we are seeing that any more (at least not for this use case).

@mrocklin
Copy link
Member Author

My method of profiling is only catching the tail end of the computation, so unfortunately I'm blind to the update_graph section. This is again a case where different methods are bad in different ways :/

@jakirkham
Copy link
Member

Yep no worries.

FWIW if you are looking to try more things, PR ( #4526 ) batches more communications through things like task completion (amongst other things), which weren't handled before.

@jakirkham
Copy link
Member

Under-the-hood the writes may still translate to multiple sends, but I don't think we can do too much there without PR ( tornadoweb/tornado#2734 ) or similar. However the reads should translate pretty closely to an equivalent number of recv calls (with a small delta based on what may already be buffered).

FWIW asyncio has a writelines API, which could be used for sending multiple buffers. It does not appear to use sendmsg currently though ( https://bugs.python.org/issue40007 ). There doesn't seem to be a place in the asyncio API for recvmsg to be used with Transport & Protocols (though please correct me if I'm missing something).

Under-the-hood libuv has the ability to use sendmsg and recvmsg, but uvloop doesn't seem to make use of these currently. Raised issue ( MagicStack/uvloop#404 ) on handling multiple buffers in uvloop

@jakirkham
Copy link
Member

jakirkham commented Sep 26, 2022

Under-the-hood libuv has the ability to use sendmsg and recvmsg, but uvloop doesn't seem to make use of these currently. Raised issue ( MagicStack/uvloop#404 ) on handling multiple buffers in uvloop

This is partially resolved in that sending uses vectorized IO in uvloop 0.17.0. Receiving remains an open question

Edit: Python 3.12 will include a similar change ( python/cpython#31871 )

@kumaraditya303
Copy link

kumaraditya303 commented Jan 3, 2023

Hi, CPython core dev here; @jakirkham I suggest you to file issues upstream for asyncio performance bottlenecks. In 3.12+ with python/cpython#31871 the gil contention should be reduced as now it uses sendmsg so gil is released only once.

@jcrist
Copy link
Member

jcrist commented Jan 3, 2023

Thanks for the update. When 3.12 comes out we'll want to revisit our workaround for doing zero-copy writes with asyncio here:

class _ZeroCopyWriter:
.

@mrocklin
Copy link
Member Author

mrocklin commented Jan 3, 2023

That's really exciting. Thank you @kumaraditya303 for the work and for keeping us informed.

@jjerphan
Copy link
Contributor

jjerphan commented Jan 3, 2023

Note that Python 3.12 experimentally supports perf(1), combining CPython and C profiling which might help inspecting GIL contention — kudos to @pablogsal for this feature!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants