Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
fix failed UT
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Sep 1, 2021
1 parent 30baa5b commit 549434b
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import com.intel.oap.expression.ConverterUtils
import com.intel.oap.vectorized.{ArrowColumnarToRowJniWrapper, ArrowWritableColumnVector}
import org.apache.arrow.vector.BaseVariableWidthVector
import org.apache.arrow.vector.types.pojo.{Field, Schema}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
Expand Down Expand Up @@ -95,6 +94,15 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child =
if (batch.numRows == 0) {
logInfo(s"Skip ColumnarBatch of ${batch.numRows} rows, ${batch.numCols} cols")
Iterator.empty
} else if (this.output.size == 0 || (batch.numCols() > 0 &&
!batch.column(0).isInstanceOf[ArrowWritableColumnVector])) {
// Fallback to ColumnarToRow
val localOutput = this.output
numInputBatches += 1
numOutputRows += batch.numRows()

val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
batch.rowIterator().asScala.map(toUnsafe)
} else {
val bufAddrs = new ListBuffer[Long]()
val bufSizes = new ListBuffer[Long]()
Expand Down

0 comments on commit 549434b

Please sign in to comment.