Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-90]Refactor HashAggregateExec and CPP kernels #91

Merged
merged 3 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.KVIterator
import scala.collection.JavaConverters._

import scala.collection.Iterator

Expand Down Expand Up @@ -106,34 +108,115 @@ case class ColumnarHashAggregateExec(
buildCheck()

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar().mapPartitionsWithIndex { (partIndex, iter) =>
var eval_elapse: Long = 0
child.executeColumnar().mapPartitions { iter =>
ExecutorManager.tryTaskSet(numaBindingInfo)
val hasInput = iter.hasNext
val res = if (!hasInput) {
// This is a grouped aggregate and the input iterator is empty,
// so return an empty iterator.
Iterator.empty
} else {
var aggregation = ColumnarAggregation.create(
partIndex,
groupingExpressions,
child.output,
aggregateExpressions,
aggregateAttributes,
resultExpressions,
output,
numInputBatches,
numOutputBatches,
numOutputRows,
aggTime,
totalTime,
sparkConf)
SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => {
aggregation.close()
})
new CloseableColumnBatchIterator(aggregation.createIterator(iter))
val native_function = TreeBuilder.makeFunction(
s"standalone",
Lists.newArrayList(getKernelFunction),
new ArrowType.Int(32, true))
val hash_aggr_expr =
TreeBuilder
.makeExpression(native_function, Field.nullable("result", new ArrowType.Int(32, true)))
val hash_aggr_input_schema = ConverterUtils.toArrowSchema(child.output)
val hash_aggr_out_schema = ConverterUtils.toArrowSchema(output)
val resultStructType = ArrowUtils.fromArrowSchema(hash_aggr_out_schema)
val nativeKernel = new ExpressionEvaluator()
nativeKernel
.build(
hash_aggr_input_schema,
Lists.newArrayList(hash_aggr_expr),
hash_aggr_out_schema,
true)
val nativeIterator = nativeKernel.finishByIterator()

def close = {
aggTime += (eval_elapse / 1000000)
totalTime += (eval_elapse / 1000000)
nativeKernel.close
nativeIterator.close
}

var numRowsInput = 0
// now we can return this wholestagecodegen iter
val res = new Iterator[ColumnarBatch] {
var processed = false
var skip_native = false
var count_num_row = 0
def process: Unit = {
while (iter.hasNext) {
val cb = iter.next()
numInputBatches += 1
if (cb.numRows != 0) {
numRowsInput += cb.numRows
val beforeEval = System.nanoTime()
if (hash_aggr_input_schema.getFields.size == 0) {
// This is a special case used by only do count literal
count_num_row += cb.numRows
skip_native = true
} else {
val input_rb =
ConverterUtils.createArrowRecordBatch(cb)
nativeIterator.processAndCacheOne(hash_aggr_input_schema, input_rb)
ConverterUtils.releaseArrowRecordBatch(input_rb)
}
eval_elapse += System.nanoTime() - beforeEval
}
}
processed = true
}
override def hasNext: Boolean = {
if (!processed) process
if (skip_native) {
count_num_row > 0
} else {
nativeIterator.hasNext
}
}

override def next(): ColumnarBatch = {
if (!processed) process
val beforeEval = System.nanoTime()
if (skip_native) {
// special handling for only count literal in this operator
val out_res = count_num_row
count_num_row = 0
val resultColumnVectors =
ArrowWritableColumnVector.allocateColumns(0, resultStructType).toArray
resultColumnVectors.foreach { v =>
{
val numRows = v.dataType match {
case t: IntegerType =>
out_res.asInstanceOf[Number].intValue
case t: LongType =>
out_res.asInstanceOf[Number].longValue
}
v.put(0, numRows)
}
}
return new ColumnarBatch(resultColumnVectors.map(_.asInstanceOf[ColumnVector]), 1)
} else {
val output_rb = nativeIterator.next
if (output_rb == null) {
eval_elapse += System.nanoTime() - beforeEval
val resultColumnVectors =
ArrowWritableColumnVector.allocateColumns(0, resultStructType).toArray
return new ColumnarBatch(resultColumnVectors.map(_.asInstanceOf[ColumnVector]), 0)
}
val outputNumRows = output_rb.getLength
val output = ConverterUtils.fromArrowRecordBatch(hash_aggr_out_schema, output_rb)
ConverterUtils.releaseArrowRecordBatch(output_rb)
eval_elapse += System.nanoTime() - beforeEval
numOutputRows += outputNumRows
numOutputBatches += 1
new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]), outputNumRows)
}
}
}
res
SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => {
close
})
new CloseableColumnBatchIterator(res)
}
}

Expand All @@ -153,7 +236,7 @@ case class ColumnarHashAggregateExec(
try {
ConverterUtils.checkIfTypeSupported(expr.dataType)
} catch {
case e : UnsupportedOperationException =>
case e: UnsupportedOperationException =>
throw new UnsupportedOperationException(
s"${expr.dataType} is not supported in ColumnarAggregation")
}
Expand Down Expand Up @@ -228,7 +311,7 @@ case class ColumnarHashAggregateExec(
// override def canEqual(that: Any): Boolean = false

def getKernelFunction: TreeNode = {
ColumnarHashAggregationWithCodegen.prepareKernelFunction(
ColumnarHashAggregation.prepareKernelFunction(
groupingExpressions,
child.output,
aggregateExpressions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,19 +474,14 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
def process: Unit = {
while (iter.hasNext) {
val cb = iter.next()
if (cb.numRows == 0) {
val resultColumnVectors =
ArrowWritableColumnVector.allocateColumns(0, resultStructType).toArray
return new ColumnarBatch(
resultColumnVectors.map(_.asInstanceOf[ColumnVector]),
0)
if (cb.numRows != 0) {
val beforeEval = System.nanoTime()
val input_rb =
ConverterUtils.createArrowRecordBatch(cb)
nativeIterator.processAndCacheOne(resCtx.inputSchema, input_rb)
ConverterUtils.releaseArrowRecordBatch(input_rb)
eval_elapse += System.nanoTime() - beforeEval
}
val beforeEval = System.nanoTime()
val input_rb =
ConverterUtils.createArrowRecordBatch(cb)
nativeIterator.processAndCacheOne(resCtx.inputSchema, input_rb)
ConverterUtils.releaseArrowRecordBatch(input_rb)
eval_elapse += System.nanoTime() - beforeEval
}
processed = true
}
Expand Down

This file was deleted.

Loading