Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix host memory leak for R2C #9196

Merged
merged 1 commit into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ private class GpuRowToColumnConverter(schema: StructType) extends Serializable {
*/
final def convertBatch(rows: Array[InternalRow], schema: StructType): ColumnarBatch = {
val numRows = rows.length
val builders = new GpuColumnarBatchBuilder(schema, numRows)
rows.foreach(convert(_, builders))
builders.build(numRows)
withResource(new GpuColumnarBatchBuilder(schema, numRows)) { builders =>
rows.foreach(convert(_, builders))
builders.build(numRows)
}
}
}

Expand Down Expand Up @@ -625,8 +626,7 @@ class RowToColumnarIterator(
}
}

val builders = new GpuColumnarBatchBuilder(localSchema, targetRows)
try {
withResource(new GpuColumnarBatchBuilder(localSchema, targetRows)) { builders =>
var rowCount = 0
// Double because validity can be < 1 byte, and this is just an estimate anyways
var byteCount: Double = 0
Expand Down Expand Up @@ -672,8 +672,6 @@ class RowToColumnarIterator(

// The returned batch will be closed by the consumer of it
ret
} finally {
builders.close()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,12 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer {
// The input batch from CPU must NOT be closed, because the columns inside it
// will be reused, and Spark expects the producer to close its batches.
val numRows = batch.numRows()
val gcbBuilder = new GpuColumnarBatchBuilder(structSchema, numRows)
for (i <- 0 until batch.numCols()) {
gcbBuilder.copyColumnar(batch.column(i), i, numRows)
withResource(new GpuColumnarBatchBuilder(structSchema, numRows)) { gcbBuilder =>
for (i <- 0 until batch.numCols()) {
gcbBuilder.copyColumnar(batch.column(i), i, numRows)
}
gcbBuilder.build(numRows)
}
gcbBuilder.build(numRows)
} else {
batch
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,7 @@ private class ExternalRowToColumnarIterator(
}
}

val builders = new GpuColumnarBatchBuilder(localSchema, targetRows)
try {
Arm.withResource(new GpuColumnarBatchBuilder(localSchema, targetRows)) { builders =>
var rowCount = 0
// Double because validity can be < 1 byte, and this is just an estimate anyways
var byteCount: Double = 0
Expand Down Expand Up @@ -623,8 +622,6 @@ private class ExternalRowToColumnarIterator(

// The returned batch will be closed by the consumer of it
ret
} finally {
builders.close()
}
}
}
Expand Down