Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add capability to log spilling #442

Merged
merged 12 commits into from
Apr 5, 2021

Conversation

pentschev
Copy link
Member

This PR allows enabling logs for spilling operations, as requested in #438 . This is definitely not the cleanest solution, but it allows us to test without changing anything in Distributed or Zict, although these changes could be done in those two projects in the future preventing us from creating subclasses. We might also want to have a specific configuration for logging, once we're confident what's the best way to handle that, we would need to decide whether that should be a Distributed or Dask-CUDA configuration.

Here's an example of how logs look like:

distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 0) from Device to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 1) from Device to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 2) from Device to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 3) from Device to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 4) from Device to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 5) from Device to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 6) from Device to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 7) from Device to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 8) from Device to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 9) from Device to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 10) from Device to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 11) from Device to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 12) from Device to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 0) from Host to Disk
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 4) from Host to Device
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 7) from Host to Device
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 6) from Host to Device
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 8) from Host to Device
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 0) from Host to Device
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 0) from Disk to Host
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 2) from Host to Device
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 3) from Host to Device
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 1) from Host to Device
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 12) from Host to Device
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 5) from Host to Device
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 11) from Host to Device
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 10) from Host to Device
distributed.worker - INFO - Worker at <tcp://127.0.0.1:39753>: Spilling key ('random_sample-587bb130aacf2dae8cd3ff7b4309027e', 9) from Host to Device

The LoggedBuffer class is a zict.Buffer subclass, simply adding a
logging capability while keeping original functionality unchanged.
This is composed of two classes LoggedNanny and LoggedWorker, which are
subclasses of Nanny and Worker, respectively. These classes are required
to set the worker's address of the DeviceHostFile object, once the
worker has been started.
@pentschev pentschev requested a review from a team as a code owner November 12, 2020 16:46
@codecov-io
Copy link

codecov-io commented Nov 12, 2020

Codecov Report

Merging #442 (a8f9e5b) into branch-0.19 (cb9bf35) will increase coverage by 29.37%.
The diff coverage is 35.59%.

Impacted file tree graph

@@               Coverage Diff                @@
##           branch-0.19     #442       +/-   ##
================================================
+ Coverage        61.06%   90.43%   +29.37%     
================================================
  Files               22       16        -6     
  Lines             2571     1663      -908     
================================================
- Hits              1570     1504       -66     
+ Misses            1001      159      -842     
Impacted Files Coverage Δ
dask_cuda/device_host_file.py 69.11% <30.61%> (-19.52%) ⬇️
dask_cuda/local_cuda_cluster.py 78.94% <60.00%> (-2.45%) ⬇️
dask_cuda/benchmarks/local_cupy_map_overlap.py
dask_cuda/benchmarks/local_cudf_merge.py
dask_cuda/_version.py
dask_cuda/benchmarks/local_cupy.py
dask_cuda/benchmarks/local_cudf_shuffle.py
dask_cuda/benchmarks/utils.py
dask_cuda/explicit_comms/dataframe/shuffle.py 97.94% <0.00%> (+0.68%) ⬆️
dask_cuda/utils.py 90.90% <0.00%> (+1.06%) ⬆️
... and 7 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update cb9bf35...a8f9e5b. Read the comment docs.

@quasiben
Copy link
Member

cc @EvenOldridge

@quasiben
Copy link
Member

quasiben commented Nov 12, 2020

I don't disagree that parts are a bit hacky and but it's cool to see this functionality. I think it also helps us explore what we want to deliver for users and then we can have a more complete idea before upstreaming.

Should this also include a CLI option ?. Currently, this can only be configure with LocalCUDACluster, correct?

@pentschev
Copy link
Member Author

Currently, this can only be configure with LocalCUDACluster, correct?

Yes, I intended to write that in the description, but ended up forgetting about it. It would be easier to extend that to dask-cuda-worker, but I want some confirmation from @EvenOldridge that this actually works for the use case he needs.

@pentschev
Copy link
Member Author

@EvenOldridge friendly reminder here, is that in line with what you want for #438 ?

@EvenOldridge
Copy link

Looks like a good start. Is there a way to track how much is being spilled and the time spent spilling? And possibly a summary at the end of the total time spent spilling?

@pentschev
Copy link
Member Author

Latest changes add size of the value spilled and time taken, it now looks like the following:

distributed.worker - INFO - Worker at <tcp://127.0.0.1:36875>: Spilled key ('random_sample-27ab5ab3c37729bce9f7dceb52abe9fc', 8) with 16000000 bytes from Host to Device in 0.0028045177459716797 seconds
distributed.worker - INFO - Worker at <tcp://127.0.0.1:36875>: Spilled key ('random_sample-27ab5ab3c37729bce9f7dceb52abe9fc', 1) with 16000000 bytes from Host to Device in 0.00684046745300293 seconds
distributed.worker - INFO - Worker at <tcp://127.0.0.1:36875>: Spilled key ('random_sample-27ab5ab3c37729bce9f7dceb52abe9fc', 10) with 16000000 bytes from Host to Device in 0.0036554336547851562 seconds
distributed.worker - INFO - Worker at <tcp://127.0.0.1:36875>: Spilled key ('random_sample-27ab5ab3c37729bce9f7dceb52abe9fc', 12) with 16000000 bytes from Host to Device in 0.0027742385864257812 seconds
distributed.worker - INFO - Worker at <tcp://127.0.0.1:36875>: Spilled key ('random_sample-27ab5ab3c37729bce9f7dceb52abe9fc', 3) with 16000000 bytes from Host to Device in 0.002573251724243164 seconds
distributed.worker - INFO - Worker at <tcp://127.0.0.1:36875>: Spilled key ('random_sample-27ab5ab3c37729bce9f7dceb52abe9fc', 9) with 16000000 bytes from Host to Device in 0.002595186233520508 seconds
distributed.worker - INFO - Worker at <tcp://127.0.0.1:36875>: Spilled key ('random_sample-27ab5ab3c37729bce9f7dceb52abe9fc', 6) with 16000000 bytes from Host to Device in 0.003015756607055664 seconds
distributed.worker - INFO - Worker at <tcp://127.0.0.1:36875>: Spilled key ('random_sample-27ab5ab3c37729bce9f7dceb52abe9fc', 11) with 16000000 bytes from Host to Device in 0.0034143924713134766 seconds
distributed.worker - INFO - Worker at <tcp://127.0.0.1:36875>: Spilled key ('random_sample-27ab5ab3c37729bce9f7dceb52abe9fc', 7) with 16000000 bytes from Host to Device in 0.0031862258911132812 seconds
distributed.worker - INFO - Worker at <tcp://127.0.0.1:36875>: Spilled key ('random_sample-27ab5ab3c37729bce9f7dceb52abe9fc', 0) with 16000000 bytes from Host to Device in 0.007324934005737305 seconds
distributed.worker - INFO - Worker at <tcp://127.0.0.1:36875>: Spilled key ('random_sample-27ab5ab3c37729bce9f7dceb52abe9fc', 4) with 16000000 bytes from Host to Device in 0.0026671886444091797 seconds
distributed.worker - INFO - Worker at <tcp://127.0.0.1:36875>: Spilled key ('random_sample-27ab5ab3c37729bce9f7dceb52abe9fc', 2) with 16000000 bytes from Host to Device in 0.0031845569610595703 seconds
distributed.worker - INFO - Worker at <tcp://127.0.0.1:36875>: Spilled key ('random_sample-27ab5ab3c37729bce9f7dceb52abe9fc', 5) with 16000000 bytes from Host to Device in 0.0035958290100097656 seconds

@EvenOldridge is that what you had in mind?

@EvenOldridge
Copy link

EvenOldridge commented Dec 1, 2020

Yeah, I like that. Is it possible to get the total spill time summary at the end?

Also might be useful to set a threshold of logging to cut down on filling up the logs.

@pentschev pentschev changed the base branch from branch-0.17 to branch-0.18 December 4, 2020 15:50
@pentschev pentschev added 2 - In Progress Currently a work in progress feature request New feature or request non-breaking Non-breaking change labels Dec 4, 2020
@pentschev
Copy link
Member Author

With the latest commits users can query the total spilling time and print it:

spilling_time = await client.run(lambda dask_worker: dask_worker.data.get_total_spilling_time())
print(spilling_time)

The output looks like the following:

{'tcp://127.0.0.1:35603': {'Total spilling time from Device to Host': 0.14711833000183105, 'Total spilling time from Host to Device': 0.21700596809387207, 'Total spilling time from Host to Disk': 0.13082504272460938, 'Total spilling time from Disk to Host': 0.16785812377929688}}

As for the threshold, I'm not confident in a metric for that. However, if that's really necessary, I think we might want to do it in a different manner, as per #438 (comment) .

In the PeriodicCallback case we could instead of logging to stdout provide a function, similar to the get_total_spilling_time above, which would then return all the non-retrieved spilling log messages, and then the user would be able to to parse and filter as best fits the application. Do you have any ideas on that @EvenOldridge ?

Copy link
Member

@madsbk madsbk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, nice work @pentschev

@pentschev
Copy link
Member Author

Thanks @madsbk for reviewing.

@EvenOldridge could you check with the customers who have raised this issue whether this is good for their use case?

@pentschev pentschev added 3 - Ready for Review Ready for review by team 5 - Ready to Merge Testing and reviews complete, ready to merge and removed 2 - In Progress Currently a work in progress 3 - Ready for Review Ready for review by team labels Jan 6, 2021
@pentschev pentschev changed the base branch from branch-0.18 to branch-0.19 February 10, 2021 20:48
@pentschev
Copy link
Member Author

@EvenOldridge friendly reminder here, could you check with customers if this covers their use case? :)

@EvenOldridge
Copy link

@rjzamora @benfred What do you think about this implementation? The goal here is to highlight to our users of NVTabular when spilling is happening.

@rjzamora
Copy link
Member

rjzamora commented Mar 5, 2021

@rjzamora @benfred What do you think about this implementation? The goal here is to highlight to our users of NVTabular when spilling is happening.

Sorry for missing this. This is great @pentschev - Thanks for the work here! I think this is a reliable way for both developers and users to directly measure the spilling overhead for a given workflow. In the long run, it would be nice for users to have the option to visualize spilling behavior with the dask dashboard, but I think this is even more generally useful than that. It seems easy enough to document the necessary steps for cluster creation and then NVT should be able to automatically query spilling metrics during/after computation (if the user is executing a workflow with a dask-cuda client).

@jakirkham
Copy link
Member

cc @jacobtomlinson (in case you have thoughts on how we might visualize spilling 🙂)

@github-actions github-actions bot added the python python code needed label Mar 8, 2021
@pentschev
Copy link
Member Author

@rjzamora thanks for checking this PR. From your comment, my understanding is that this PR is useful for the use case you mentioned, but long-term it would be nice to also have that in the Dask dashboard, am I correct? If so, I would go ahead and merge this and the dashboard could be addressed later on, probably with the help of @jacobtomlinson .

@pentschev
Copy link
Member Author

@gpucibot merge

@rapids-bot rapids-bot bot merged commit 7a6be90 into rapidsai:branch-0.19 Apr 5, 2021
@pentschev pentschev deleted the spill-logging branch April 23, 2021 21:51
@pentschev pentschev mentioned this pull request Feb 16, 2022
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
5 - Ready to Merge Testing and reviews complete, ready to merge feature request New feature or request non-breaking Non-breaking change python python code needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants