diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala index c60d3f32e..28d66cd6b 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowColumnarToRowExec.scala @@ -21,10 +21,9 @@ import com.intel.oap.expression.ConverterUtils import com.intel.oap.vectorized.{ArrowColumnarToRowJniWrapper, ArrowWritableColumnVector} import org.apache.arrow.vector.BaseVariableWidthVector import org.apache.arrow.vector.types.pojo.{Field, Schema} - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan} @@ -95,6 +94,15 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child = if (batch.numRows == 0) { logInfo(s"Skip ColumnarBatch of ${batch.numRows} rows, ${batch.numCols} cols") Iterator.empty + } else if (this.output.size == 0 || (batch.numCols() > 0 && + !batch.column(0).isInstanceOf[ArrowWritableColumnVector])) { + // Fallback to ColumnarToRow + val localOutput = this.output + numInputBatches += 1 + numOutputRows += batch.numRows() + + val toUnsafe = UnsafeProjection.create(localOutput, localOutput) + batch.rowIterator().asScala.map(toUnsafe) } else { val bufAddrs = new ListBuffer[Long]() val bufSizes = new ListBuffer[Long]()