-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multi-threaded shuffle writer for RapidsShuffleManager [databricks] #6052
Multi-threaded shuffle writer for RapidsShuffleManager [databricks] #6052
Conversation
Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
a20efdb
to
c7688ae
Compare
build |
build |
build |
...in/311until320-all/scala/org/apache/spark/sql/rapids/shims/RapidsShuffleThreadedWriter.scala
Outdated
Show resolved
Hide resolved
*/ | ||
class ThreadSafeShuffleWriteMetricsReporter(wrapped: ShuffleWriteMetricsReporter) | ||
extends ShuffleWriteMetrics { | ||
override private[spark] def incBytesWritten(v: Long): Unit = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think most of these are updated in batches, just curious if this synchronization causes us much time waiting? might be something to look at later if you haven't
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked at this. I kind of ran into this late where the tests from spark were actually testing the metrics. Yes absolutely we could do better than this, but I figure that could be a punt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, especially when we consider the impact of synchronized blocks on cache performance. Thread-safe accumulators could be a good option to explore later.
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala
Outdated
Show resolved
Hide resolved
build |
sql-plugin/src/main/java/com/nvidia/spark/rapids/SlicedGpuColumnVector.java
Show resolved
Hide resolved
build |
conf("spark.rapids.shuffle.multiThreaded.writer.threads") | ||
.doc("The number of threads to use for writing shuffle blocks per executor.") | ||
.integerConf | ||
.createWithDefault(20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to need a similar followup issue to try to automatically tune this as we are working on for the multithreaded input readers. 20 may be a very poor choice on an executor with many configured cores.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can fold this into: #5039.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to use ForkJoinPool
class in this case. We can scale the number of threads dynamically using the API maximum pool size.
getStealCount
can be used to evaluate how busy the threads are in the pool. if that returned value is too high, we can increase the number of workers.
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleDependency.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala
Outdated
Show resolved
Hide resolved
Build failed last due to: #6054 |
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala
Outdated
Show resolved
Hide resolved
*/ | ||
class ThreadSafeShuffleWriteMetricsReporter(wrapped: ShuffleWriteMetricsReporter) | ||
extends ShuffleWriteMetrics { | ||
override private[spark] def incBytesWritten(v: Long): Unit = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, especially when we consider the impact of synchronized blocks on cache performance. Thread-safe accumulators could be a good option to explore later.
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) | ||
with ShuffleManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, new line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we are following a rule on adding a line at the end of each file, if I understand you correctly @amahussein. I know this is a nit, but I wanted to make sure I am not missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I added these.
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) | ||
with ShuffleManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, new line
myMapStatus = Some(MapStatus(blockManager.shuffleServerId, partLengths, mapId)) | ||
} catch { | ||
// taken directly from BypassMergeSortShuffleWriter | ||
case e: Exception => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check for specific Exception classes instead of generic Exception. I wonder if generic exception can would hide bugs and crashes that we were not expecting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am changing this code also because of the comment @jlowe had. I'll push in a second.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh wait this code, this is logging the exception, aborting, and re-throwing. It was taken from Apache Spark verbatim. I'd like to keep this as is.
…uffle/helper_threads_writer_latest_rebased
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala
Outdated
Show resolved
Hide resolved
@amahussein @jlowe should be ready for another look. |
build |
build |
build |
build |
This adds an experimental feature for RapidsShuffleManager where it can be configured to write shuffle blocks using multiple threads.
You would do this by:
Where
[spark version]
is something likespark321
(see other class names here https://github.com/NVIDIA/spark-rapids/blob/branch-22.08/docs/additional-functionality/rapids-shuffle.md#spark-app-configuration), and N is the number of threads you want to use (defaults to 20). I removed an internal configspark.rapids.shuffle.transport.enabled
which would have allowed you to run the cache-only shuffle (for testing and have a local-mode app), and addedspark.rapids.shuffle.manager.mode
which can beUCX
(default),CACHE_ONLY
(for testing), or the experimentalMULTI_THREADED
.Note there is no flow control here. The shuffle writer is going to get an iterator and is going to pull on it until it is done, no matter what the consequences are. This is one of the reasons why this is experimental.
This PR copies/adapts some tests from Spark as well, and most of the writer interface is Spark, except this is in scala and has a thread pool.
I've tested this in Apache Spark 3.1.2 and 3.2.1. I haven't tested it in other environments yet. Posting here for comments as I work my way through other sparks that need testing.
Closes #6060