-
Notifications
You must be signed in to change notification settings - Fork 241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add SpillableHostColumnarBatch #9098
Conversation
Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
build |
I don't see regressions in NDS (not that they were expected). |
try { | ||
val channel = fos.getChannel | ||
val fileOffset = channel.position | ||
iter.foreach { bb => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we close bb when we are done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These ByteBuffer
instances are views on top of hostBuffer
. It looks like the jdk will instantiate a DirectByteBuffer
with a null cleaner in this case. We call env->NewDirectByteBuffer(addr, len)
.
As such I think these views are going to stay around in the heap as it is. We could move this implementation to cuDF so we can write directly from the HostMemoryBuffer
to a file, rather than having to work around limitations in the jdk's ByteBuffer
impl with this iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that closing them really ends up being a NOOP and the heap cleans them up. The only reason I mention it, is because if bb
changes in some way to not be backed by a HostMemoryBuffer we would then potentially leak memory. It is just defensive. And a nit.
Adding @jlowe because it would be great if he can take a look at this change also |
sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVector.java
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala
Outdated
Show resolved
Hide resolved
def getHostColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = { | ||
throw new IllegalStateException(s"$this does not support host columnar batches.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Similar comment here. It's nice that this doesn't have the sentinel value, but I'd rather see a trait that defines the ability to provide a HostColumnarBatch and have those that need to use it on their underlying RAPIDS buffer pattern match to downcast the buffer type to get access to this rather than have a method that explodes if you don't carefully know what you're doing. Not a must-fix for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Putting these in a trait is easy, I can do that.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala
Show resolved
Hide resolved
} | ||
|
||
override def getHostColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = { | ||
columnSpillability.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this going to interact with another thread that's trying to call onColumnSpillable at the same time? Could we sometimes end up in a state where we built the host batch but still think this is spillable? I think the answer is yes, but maybe we don't care because of the refcounts? In light of that, seems prudent to first get the refcounts in place before marking this as spillable, otherwise I think another thread could end up calling releaseResources
and close hostCb on this (now spillable) buffer before we get around to locking down hostCb.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getHostColumnarBatch
, like getColumnarBatch
must be called after acquiring the RapidsBuffer
via catalog.acquireBuffer
. It will already have dealt with the RapidsBuffer
reference count, so we are guaranteed that the batch produced isn't pointing to a closed batch.
And yes I see the race you mention. I'll think about that more, to see if we need to lock things further.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing we don't do is mark something as not spillable if it was acquired. It is only non-spillable when the batch or buffer is obtained.
Perhaps we should add a second level of unspillable. While a buffer has RapidsBuffer.refCount > 1, we could mark the buffer as non spillable. When the RapidsBuffer.refCount goes back to 1, we could return to the cuDF-driven spillability checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW this is an issue with device buffers and batches as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While a buffer has RapidsBuffer.refCount > 1, we could mark the buffer as non spillable.
This seems desirable in general, because it's not worth spilling something if we cannot the free memory associated with it after the spill copy. Not something to tackle in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// the columns (the caller is holding onto a ColumnarBatch that will be closed | ||
// after instantiation, triggering onClosed callbacks) | ||
// This hash set contains the columns that are currently spillable. | ||
private val columnSpillability = new ConcurrentHashMap[HostColumnVector, Boolean]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This could be done more efficiently as a BitSet (especially since most column batches will be less than 64 columns), although that would require synchronization when accessing it. Simply need to track the column index in the on close event handler (we're generating a new handler per column anyway) and have onColumnSpillable take a column index instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this one is a nit, would you mind if I do this as a separate issue? This code is really a copy from the device counterpart. I would like to fix both if we are going to change this to a bitset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separate issue is fine.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVector.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would like to hear from others too though.
build |
Closes #8882
This is ready for another pair of eyes to take a look a the approach, especially around serialization/deserialization of host-backed ColumnarBatch.
Note that this doesn't support "unspill", as the path to get a memory buffer is not supported for a batch. We need follow on work after this like unspill and how to deal with GDS being configured. We also are using JCudfSerialization to write and read from a JVM stream to write/read from disk and there are likely faster ways of doing this we can look into, but they should be under the hood.
I am going to run this through NDS just in case, so I'll mark it as a draft for now.