Skip to content

Commit

Permalink
Fix host memory leak for R2C (#9196)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Sep 7, 2023
1 parent bdd7f68 commit 4e06922
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ private class GpuRowToColumnConverter(schema: StructType) extends Serializable {
*/
final def convertBatch(rows: Array[InternalRow], schema: StructType): ColumnarBatch = {
val numRows = rows.length
val builders = new GpuColumnarBatchBuilder(schema, numRows)
rows.foreach(convert(_, builders))
builders.build(numRows)
withResource(new GpuColumnarBatchBuilder(schema, numRows)) { builders =>
rows.foreach(convert(_, builders))
builders.build(numRows)
}
}
}

Expand Down Expand Up @@ -625,8 +626,7 @@ class RowToColumnarIterator(
}
}

val builders = new GpuColumnarBatchBuilder(localSchema, targetRows)
try {
withResource(new GpuColumnarBatchBuilder(localSchema, targetRows)) { builders =>
var rowCount = 0
// Double because validity can be < 1 byte, and this is just an estimate anyways
var byteCount: Double = 0
Expand Down Expand Up @@ -672,8 +672,6 @@ class RowToColumnarIterator(

// The returned batch will be closed by the consumer of it
ret
} finally {
builders.close()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,12 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer {
// The input batch from CPU must NOT be closed, because the columns inside it
// will be reused, and Spark expects the producer to close its batches.
val numRows = batch.numRows()
val gcbBuilder = new GpuColumnarBatchBuilder(structSchema, numRows)
for (i <- 0 until batch.numCols()) {
gcbBuilder.copyColumnar(batch.column(i), i, numRows)
withResource(new GpuColumnarBatchBuilder(structSchema, numRows)) { gcbBuilder =>
for (i <- 0 until batch.numCols()) {
gcbBuilder.copyColumnar(batch.column(i), i, numRows)
}
gcbBuilder.build(numRows)
}
gcbBuilder.build(numRows)
} else {
batch
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,7 @@ private class ExternalRowToColumnarIterator(
}
}

val builders = new GpuColumnarBatchBuilder(localSchema, targetRows)
try {
Arm.withResource(new GpuColumnarBatchBuilder(localSchema, targetRows)) { builders =>
var rowCount = 0
// Double because validity can be < 1 byte, and this is just an estimate anyways
var byteCount: Double = 0
Expand Down Expand Up @@ -623,8 +622,6 @@ private class ExternalRowToColumnarIterator(

// The returned batch will be closed by the consumer of it
ret
} finally {
builders.close()
}
}
}
Expand Down

0 comments on commit 4e06922

Please sign in to comment.