Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SpillableHostColumnarBatch #9098

Merged
merged 2 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
jlowe marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package com.nvidia.spark.rapids;

import ai.rapids.cudf.HostColumnVector;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.util.HashSet;

/**
* A GPU accelerated version of the Spark ColumnVector.
* Most of the standard Spark APIs should never be called, as they assume that the data
Expand Down Expand Up @@ -57,6 +60,25 @@ public static RapidsHostColumnVector[] extractColumns(ColumnarBatch batch) {
return vectors;
}

public static ColumnarBatch incRefCounts(ColumnarBatch batch) {
for (RapidsHostColumnVector rapidsHostCv: extractColumns(batch)) {
rapidsHostCv.incRefCount();
}
return batch;
}

public static long getTotalHostMemoryUsed(ColumnarBatch batch) {
long sum = 0;
if (batch.numCols() > 0) {
HashSet<RapidsHostColumnVector> found = new HashSet<>();
for (RapidsHostColumnVector rapidsHostCv: extractColumns(batch)) {
if (found.add(rapidsHostCv)) {
abellina marked this conversation as resolved.
Show resolved Hide resolved
sum += rapidsHostCv.getHostMemoryUsed();
}
}
}
return sum;
}

private final ai.rapids.cudf.HostColumnVector cudfCv;

Expand All @@ -75,6 +97,10 @@ public final RapidsHostColumnVector incRefCount() {
return this;
}

public final long getHostMemoryUsed() {
return cudfCv.getHostMemorySize();
}

public final ai.rapids.cudf.HostColumnVector getBase() {
return cudfCv;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.nvidia.spark.rapids

import java.io.File
import java.io.OutputStream

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -175,7 +176,6 @@ class RapidsBufferCopyIterator(buffer: RapidsBuffer)
} else {
None
}

def isChunked: Boolean = chunkedPacker.isDefined

// this is used for the single shot case to flag when `next` is call
Expand Down Expand Up @@ -245,12 +245,24 @@ trait RapidsBuffer extends AutoCloseable {
def getCopyIterator: RapidsBufferCopyIterator =
new RapidsBufferCopyIterator(this)

/**
* At spill time, the tier we are spilling to may need to hand the rapids buffer an output stream
* to write to. This is the case for a `RapidsHostColumnarBatch`.
* @param outputStream stream that the `RapidsBuffer` will serialize itself to
*/
def serializeToStream(outputStream: OutputStream): Unit = {
throw new IllegalStateException(s"Buffer $this does not support serializeToStream")
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}

/** Descriptor for how the memory buffer is formatted */
def meta: TableMeta

/** The storage tier for this buffer */
val storageTier: StorageTier

/** A RapidsBuffer that needs to be serialized/deserialized at spill/materialization time */
val needsSerialization: Boolean = false

/**
* Get the columnar batch within this buffer. The caller must have
* successfully acquired the buffer beforehand.
Expand All @@ -263,6 +275,21 @@ trait RapidsBuffer extends AutoCloseable {
*/
def getColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch

/**
* Get the host-backed columnar batch from this buffer. The caller must have
* successfully acquired the buffer beforehand.
*
* If this `RapidsBuffer` was added originally to the device tier, or if this is
* a just a buffer (not a batch), this function will throw.
*
* @param sparkTypes the spark data types the batch should have
* @see [[addReference]]
* @note It is the responsibility of the caller to close the batch.
*/
def getHostColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = {
throw new IllegalStateException(s"$this does not support host columnar batches.")
Comment on lines +277 to +278
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Similar comment here. It's nice that this doesn't have the sentinel value, but I'd rather see a trait that defines the ability to provide a HostColumnarBatch and have those that need to use it on their underlying RAPIDS buffer pattern match to downcast the buffer type to get access to this rather than have a method that explodes if you don't carefully know what you're doing. Not a must-fix for me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting these in a trait is easy, I can do that.

}

/**
* Get the underlying memory buffer. This may be either a HostMemoryBuffer or a DeviceMemoryBuffer
* depending on where the buffer currently resides.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,15 @@ class RapidsBufferCatalog(
batch: ColumnarBatch,
initialSpillPriority: Long,
needsSync: Boolean = true): RapidsBufferHandle = {
closeOnExcept(GpuColumnVector.from(batch)) { table =>
addTable(table, initialSpillPriority, needsSync)
require(batch.numCols() > 0,
"Cannot call addBatch with a batch that doesn't have columns")
batch.column(0) match {
case _: RapidsHostColumnVector =>
addHostBatch(batch, initialSpillPriority, needsSync)
case _ =>
closeOnExcept(GpuColumnVector.from(batch)) { table =>
addTable(table, initialSpillPriority, needsSync)
}
}
}

Expand Down Expand Up @@ -381,6 +388,25 @@ class RapidsBufferCatalog(
makeNewHandle(id, initialSpillPriority)
}


/**
* Add a host-backed ColumnarBatch to the catalog. This is only called from addBatch
* after we detect that this is a host-backed batch.
*/
private def addHostBatch(
hostCb: ColumnarBatch,
initialSpillPriority: Long,
needsSync: Boolean): RapidsBufferHandle = {
val id = TempSpillBufferId()
val rapidsBuffer = hostStorage.addBatch(
id,
hostCb,
initialSpillPriority,
needsSync)
registerNewBuffer(rapidsBuffer)
makeNewHandle(id, initialSpillPriority)
}

/**
* Register a degenerate RapidsBufferId given a TableMeta
* @note this is called from the shuffle catalogs only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@

package com.nvidia.spark.rapids

import java.io.{File, FileOutputStream}
import java.io.{File, FileInputStream, FileOutputStream}
import java.nio.channels.FileChannel.MapMode
import java.util.concurrent.ConcurrentHashMap

import ai.rapids.cudf.{Cuda, HostMemoryBuffer, MemoryBuffer}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta

import org.apache.spark.sql.rapids.RapidsDiskBlockManager
import org.apache.spark.sql.rapids.execution.SerializedHostTableUtils
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

/** A buffer store using files on the local disks. */
class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
Expand All @@ -36,6 +39,49 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
incoming: RapidsBuffer,
stream: Cuda.Stream): RapidsBufferBase = {
// assuming that the disk store gets contiguous buffers
val id = incoming.id
val path = if (id.canShareDiskPaths) {
sharedBufferFiles.computeIfAbsent(id, _ => id.getDiskPath(diskBlockManager))
} else {
id.getDiskPath(diskBlockManager)
}

val (fileOffset, diskLength) = if (id.canShareDiskPaths) {
// only one writer at a time for now when using shared files
path.synchronized {
if (incoming.needsSerialization) {
// only shuffle buffers share paths, and adding host-backed ColumnarBatch is
// not the shuffle case, so this is not supported and is never exercised.
throw new IllegalStateException(
s"Attempted spilling to disk a RapidsBuffer $incoming that needs serialization " +
s"while sharing spill paths.")
} else {
copyBufferToPath(incoming, path, append = true)
}
}
} else {
if (incoming.needsSerialization) {
serializeBufferToStream(incoming, path)
} else {
copyBufferToPath(incoming, path, append = false)
}
}

logDebug(s"Spilled to $path $fileOffset:$diskLength")
new RapidsDiskBuffer(
id,
fileOffset,
diskLength,
incoming.meta,
incoming.getSpillPriority,
incoming.needsSerialization)
}

/** Copy a host buffer to a file, returning the file offset at which the data was written. */
private def copyBufferToPath(
incoming: RapidsBuffer,
path: File,
append: Boolean): (Long, Long) = {
val incomingBuffer =
withResource(incoming.getCopyIterator) { incomingCopyIterator =>
incomingCopyIterator.next()
Expand All @@ -45,58 +91,44 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
case h: HostMemoryBuffer => h
case _ => throw new UnsupportedOperationException("buffer without host memory")
}
val id = incoming.id
val path = if (id.canShareDiskPaths) {
sharedBufferFiles.computeIfAbsent(id, _ => id.getDiskPath(diskBlockManager))
} else {
id.getDiskPath(diskBlockManager)
}
val fileOffset = if (id.canShareDiskPaths) {
// only one writer at a time for now when using shared files
path.synchronized {
copyBufferToPath(hostBuffer, path, append = true)
val iter = new HostByteBufferIterator(hostBuffer)
val fos = new FileOutputStream(path, append)
try {
val channel = fos.getChannel
val fileOffset = channel.position
iter.foreach { bb =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we close bb when we are done?

Copy link
Collaborator Author

@abellina abellina Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These ByteBuffer instances are views on top of hostBuffer. It looks like the jdk will instantiate a DirectByteBuffer with a null cleaner in this case. We call env->NewDirectByteBuffer(addr, len).

As such I think these views are going to stay around in the heap as it is. We could move this implementation to cuDF so we can write directly from the HostMemoryBuffer to a file, rather than having to work around limitations in the jdk's ByteBuffer impl with this iterator.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that closing them really ends up being a NOOP and the heap cleans them up. The only reason I mention it, is because if bb changes in some way to not be backed by a HostMemoryBuffer we would then potentially leak memory. It is just defensive. And a nit.

while (bb.hasRemaining) {
channel.write(bb)
}
}
} else {
copyBufferToPath(hostBuffer, path, append = false)
(fileOffset, channel.position())
} finally {
fos.close()
}
logDebug(s"Spilled to $path $fileOffset:${incomingBuffer.getLength}")
new RapidsDiskBuffer(
id,
fileOffset,
incomingBuffer.getLength,
incoming.meta,
incoming.getSpillPriority)
}
}

/** Copy a host buffer to a file, returning the file offset at which the data was written. */
private def copyBufferToPath(
buffer: HostMemoryBuffer,
path: File,
append: Boolean): Long = {
val iter = new HostByteBufferIterator(buffer)
val fos = new FileOutputStream(path, append)
try {
val channel = fos.getChannel
val fileOffset = channel.position
iter.foreach { bb =>
while (bb.hasRemaining) {
channel.write(bb)
}
private def serializeBufferToStream(
incoming: RapidsBuffer,
path: File): (Long, Long) = {
withResource(new FileOutputStream(path, false /*append not supported*/)) { fos =>
withResource(fos.getChannel) { outputChannel =>
val startOffset = outputChannel.position()
incoming.serializeToStream(fos)
val endingOffset = outputChannel.position()
val writtenBytes = endingOffset - startOffset
(startOffset, writtenBytes)
}
fileOffset
} finally {
fos.close()
}
}


class RapidsDiskBuffer(
id: RapidsBufferId,
fileOffset: Long,
size: Long,
meta: TableMeta,
spillPriority: Long)
spillPriority: Long,
override val needsSerialization: Boolean = false)
extends RapidsBufferBase(
id, meta, spillPriority) {
private[this] var hostBuffer: Option[HostMemoryBuffer] = None
Expand All @@ -106,6 +138,8 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
override val storageTier: StorageTier = StorageTier.DISK

override def getMemoryBuffer: MemoryBuffer = synchronized {
require(!needsSerialization,
"Called getMemoryBuffer on a disk buffer that needs deserialization")
if (hostBuffer.isEmpty) {
val path = id.getDiskPath(diskBlockManager)
val mappedBuffer = HostMemoryBuffer.mapFile(path, MapMode.READ_WRITE,
Expand All @@ -117,6 +151,22 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
hostBuffer.get
}

override def getHostColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = {
require(needsSerialization,
"Disk buffer was not serialized yet getHostColumnarBatch is being invoked")
require(fileOffset == 0,
"Attempted to obtain a HostColumnarBatch from a spilled RapidsBuffer that is sharing " +
"paths on disk")
val path = id.getDiskPath(diskBlockManager)
withResource(new FileInputStream(path)) { fis =>
val (header, hostBuffer) = SerializedHostTableUtils.readTableHeaderAndBuffer(fis)
val hostCols = closeOnExcept(hostBuffer) { _ =>
SerializedHostTableUtils.buildHostColumns(header, hostBuffer, sparkTypes)
}
new ColumnarBatch(hostCols.toArray, header.getNumRows)
}
}

override def close(): Unit = synchronized {
if (refcount == 1) {
// free the memory mapping since this is the last active reader
Expand Down
Loading