-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create a PrioritySemaphore to back the GpuSemaphore #11376
Conversation
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some nits
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/com/nvidia/spark/rapids/PrioritySemaphoreSuite.scala
Outdated
Show resolved
Hide resolved
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, I just had a question about a test that is a non blocker.
tests/src/test/scala/com/nvidia/spark/rapids/PrioritySemaphoreSuite.scala
Show resolved
Hide resolved
while (!canAcquire(numPermits)) { | ||
waitingQueue.enqueue(ThreadInfo(priority, condition)) | ||
condition.await() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we have an issue here with spurious wakeups? Seems like we'll enqueue multiple ThreadInfo instances. Seems like we should do something like this to protect against it:
var queued = false
while (!canAcquire(numPermits)) {
if (!queued) {
waitingQueue.enqueue(ThreadInfo(priority, condition))
queued = true
}
condition.await()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you say spurious wakeups, do you mean something triggered by another thread release
ing, or by another means? In the former case, the original ThreadInfo is dequeued. In the latter, I think I agree with you, just not sure how that can happen exactly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spurious wakeups can theoretically occur anytime one waits on a condition variable. It's often OS dependent, but it's documented that it can happen and that programmers need to account for it. See the documentation for java.util.concurrent.locks.Condition.awaitNanos (which Condition.await points to) which mentions spurious wakeup as a possibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, I can push a fix for that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch @jlowe
PrioritySemaphore is a great idea, I like it. I am just wondering which threads should have a higher priority, the most recent one which held the semaphore or the most ancient one? Currently the choice is the most recent one? I'm comparing the two approaches, and I found that the choosing the most ancient one is better in terms of the spill problem, as we well as in terms of end to end time. I don't know why yet, but I think it's worth to investigate. It's also worth mentioning that I deliberately limit the size of spillStorageSize to aggravate spilling. The scripts I'm using: 12:50:31 › cat ./work_0909_compare_semaphore_priority.sh
#!/bin/zsh
# run 2410 latest jar for three times
for i in {1..3}
do
echo "reproduce spill problem, at 2410 latest" && bin/spark-shell \
--master 'local[8]' --driver-memory 20g --conf spark.rapids.sql.concurrentGpuTasks=2 \
--conf spark.celeborn.client.shuffle.compression.codec=zstd --conf spark.io.compression.codec=zstd \
--conf spark.rapids.memory.pinnedPool.size=10G --conf spark.rapids.memory.host.spillStorageSize=10G \
--conf spark.sql.files.maxPartitionBytes=2g \
--conf spark.driver.extraJavaOptions=-Dai.rapids.cudf.nvtx.enabled=true \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.rapids.sql.metrics.level='DEBUG' \
--conf spark.eventLog.enabled=true \
--conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \
--conf spark.celeborn.master.endpoints=10.19.129.151:9097 \
--jars /home/hongbin/develop/spark-3.2.1-bin-hadoop2.7/rapids_jars/2410_by0909.jar -i temp.scala 2>&1 | tee spill_`date +'%Y-%m-%d-%H-%M-%S'`.output
done
# run modified 2410 latest jar (with priority mofified to favor ancient) for three times
for i in {1..3}
do
echo "reproduce spill problem, at 2410 latest + favor ancient" && bin/spark-shell \
--master 'local[8]' --driver-memory 20g --conf spark.rapids.sql.concurrentGpuTasks=2 \
--conf spark.celeborn.client.shuffle.compression.codec=zstd --conf spark.io.compression.codec=zstd \
--conf spark.rapids.memory.pinnedPool.size=10G --conf spark.rapids.memory.host.spillStorageSize=10G \
--conf spark.sql.files.maxPartitionBytes=2g \
--conf spark.driver.extraJavaOptions=-Dai.rapids.cudf.nvtx.enabled=true \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.rapids.sql.metrics.level='DEBUG' \
--conf spark.eventLog.enabled=true \
--conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \
--conf spark.celeborn.master.endpoints=10.19.129.151:9097 \
--jars /home/hongbin/develop/spark-3.2.1-bin-hadoop2.7/rapids_jars/2410_fresh.jar -i temp.scala 2>&1 | tee spill_`date +'%Y-%m-%d-%H-%M-%S'`.output
done with temp.scala being: 13:04:50 › cat temp.scala
spark.conf.set("spark.sql.shuffle.partitions", 8)
spark.conf.set("spark.rapids.sql.agg.singlePassPartialSortEnabled", false)
spark.time(spark.range(0, 3000000000L, 1, 8).selectExpr("cast(CAST(rand(0) * 1000000000 AS LONG) DIV 1 as string) as id", "id % 2 as data").groupBy("id").agg(count(lit(1)), avg(col("data"))).orderBy("id").show())
System.exit(0) duration time snapshot: my code changes: |
The original ideas was to have a consistent priority for all tasks threads. The spill framework uses task-id followed by thread-id to give priority to different threads. https://github.com/NVIDIA/spark-rapids-jni/blob/bb696ae944f286b3fe5eb5774a9abe921c1425a4/src/main/cpp/src/SparkResourceAdaptorJni.cpp#L135-L190 The idea was to do the same for tasks here as the semaphore only lets a task in or not, not individual threads. The code here is using the timestamp of the last access for the priority, but I don't know how much work was done to see which would be the ideal situation. Could you also try to run your tests with using the task-id for the priority instead? so that it is consistent with the spill code? I just want to get a few more data points. |
I didn't consider the case where a task a multiple threads. So in my previous description I was using "task priority" and "thread priority" interchangeably. Sure I can do the test, I'll update it later. Just to make sure we're on the same page, "the task-id for the priority" is very similar to my "favor ancient", right? |
@binmahone yes the task-id as the priority should be very similar to "favor ancient". Spark should hand out task ids in ascending order, so in theory the oldest tasks should have the lowest ids. But because of I/O on reads/etc it is not guaranteed for it to actually end up as the first task that runs on the GPU. The main reason I would like this is because it is a super simpler way to have all operations that might want a priority to be consistent. i.e. if we want to pick a buffer to spill, then we can avoid spilling from tasks that would be first in line to get the GPU, or if we want to pick a task to pause to avoid spilling more memory, pausing a task that is going to be last in line for the GPU and is more likely to have its data spilled feels like something we would want to do. I just want to know for your use case what the impact to performance would be. |
@revans2 I have tested the version using task-id as the priority (please check the code at https://github.com/binmahone/spark-rapids/tree/241008_taskid_as_prioriry, hope I understood your intention correctly)
It's a surprise that "task-id as the priority" has the worst performance. I haven't investigated why. Please let me know if you need anything else. |
Note that even though "favor ancient" outperforms the other two implementations in my single test, I'm still not sure if it's a better choice in general scenarios. What kind of scenarios should we consider before we can confidently make it as default? |
The cases we care about all revolve around situations where tasks end up processing more than a single batch of data. We often call it the round robin problem because a task will get on the GPU; do some processing, but not finish completely; release the GPU so others tasks can get on the GPU leaving partial data in the GPUs memory; do some I/O and then try to get back onto the GPU again. Here are a few that I have seen that would be good to try and cover. It would be nice if we could have a defined repeatable set of tests/benchmarks because there are other related changes that we want to be able to try out. Ideally we can use the metrics from the history files to see how much data was spilled as a part of this too.
Those are the ones that come to my mind. I think we could come up with others, but the general idea is that we want a few different patters of processing. |
this is a quite comprehensive list, thanks! We'll update to you when we have progress on this benchmark. |
Closes #8301
This PR adds a PrioritySemaphore which has a similar interface to the java Semaphore, but also uses a priority queue to determine the order in which to wake blocked threads. It changes the existing GpuSemaphore, which was using a regular Semaphore underneath, to use this new version, specifying a priority based on when a given task most recently held the semaphore. This has the effect of reducing spill as tasks with data loaded on the GPU will be more likely to resume processing on the data before it's evicted by a new task.