diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala index 6a34b882c..0e65abd2b 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.{CodegenSupport, ColumnarToRowTransition, SparkPlan} import org.apache.spark.sql.types._ +import org.apache.spark.TaskContext import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -146,13 +147,21 @@ case class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransit var rowId = 0 val row = new UnsafeRow(batch.numCols()) var closed = false + + TaskContext.get().addTaskCompletionListener[Unit](_ => { + if (!closed) { + jniWrapper.nativeClose(info.instanceID) + closed = true + } + }) + override def hasNext: Boolean = { val result = rowId < batch.numRows() if (!result && !closed) { jniWrapper.nativeClose(info.instanceID) closed = true } - return result + result } override def next: UnsafeRow = {