-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
prevent duplicate queueing in the prio semaphore #11389
Conversation
Signed-off-by: Zach Puller <zpuller@nvidia.com>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Zach Puller <zpuller@nvidia.com>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't understand the last part exactly
Sorry, I should have been more clear. The idea is to add a boolean to ThreadInfo that is constructed as false but is set to true by the thread that is going to signal the wakeup, that way the thread waking up can distinguish between a spurious wakeup and an explicit one. A spurious wakeup should not leave the while loop, even if there are permits, because that can cause a priority inversion. Consider the case where a spurious wakeup on a low priority thread occurs at the same time the high priority thread is signaled. If the low priority thread wakeup races ahead of the high priority signal, it can end up grabbing the semaphore ahead of the high priority thread which is incorrect.
So the logic would look something like this:
private case class ThreadInfo(priority: T, condition: Condition, numPermits: Int) {
var signaled: Boolean = false
}
def acquire(numPermits: Int, priority: T): Unit = {
lock.lock()
try {
if (canAcquire(numPermits)) {
commitAcquire(numPermits)
} else {
val info = ThreadInfo(priority, condition, numPermits)
waitingQueue.enqueue(info)
while (!info.signaled) {
info.condition.await()
}
}
} finally {
lock.unlock()
}
}
def release(numPermits: Int): Unit = {
lock.lock()
try {
occupiedSlots -= numPermits
// acquire and wakeup for all threads that now have enough permits
var done = false
while (!done && waitingQueue.nonEmpty) {
val nextThread = waitingQueue.head
if (canAcquire(nextThread.numPermits)) {
val popped = waitingQueue.dequeue()
assert(popped eq nextThread)
commitAcquire(nextThread.numPermits)
nextThread.signaled = true
nextThread.condition.signal()
} else {
done = true
}
}
} finally {
lock.unlock()
}
}
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
@jlowe in your code if there is an IterruptedException thrown when the acquire is waiting on the condition, then we need to make sure we are removed from the queue or the assumption that everything in the queue has a thread that is waiting on the condition is not true. |
Nice catch! That's why I said, "it would look something like this." 😆 However it is important to note that even in the original code, your comment applies. If we leave acquire exceptionally, we need to pull ourselves out of the queue since we're no longer waiting to acquire. |
Signed-off-by: Zach Puller <zpuller@nvidia.com>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/com/nvidia/spark/rapids/PrioritySemaphoreSuite.scala
Show resolved
Hide resolved
Signed-off-by: Zach Puller <zpuller@nvidia.com>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/com/nvidia/spark/rapids/PrioritySemaphoreSuite.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Zach Puller <zpuller@nvidia.com>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Zach Puller <zpuller@nvidia.com>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Zach Puller <zpuller@nvidia.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the work on this, @zpuller!
build |
Thank you for the meticulous review! |
Follow up to #11376 incorporating a fix to prevent duplicate queueing in the event of spurious wakeups