From b6c267a858d8cdf366db9c8797df3a543eabb900 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Sat, 8 May 2021 17:33:02 +0800 Subject: [PATCH] [NSE-285] ColumnarWindow: Support Date input in MAX/MIN (#286) Closes #285 --- arrow-data-source/pom.xml | 56 +-- native-sql-engine/core/pom.xml | 28 +- .../com/intel/oap/ColumnarGuardRule.scala | 2 +- .../scala/com/intel/oap/ColumnarPlugin.scala | 39 +- .../oap/execution/ColumnarWindowExec.scala | 347 ++++++++++++------ .../com/intel/oap/tpc/ds/TPCDSSuite.scala | 10 +- .../codegen/arrow_compute/ext/actions_impl.cc | 10 + .../codegen/arrow_compute/ext/kernels_ext.h | 4 + .../arrow_compute/ext/window_kernel.cc | 7 + pom.xml | 71 ++++ 10 files changed, 350 insertions(+), 224 deletions(-) diff --git a/arrow-data-source/pom.xml b/arrow-data-source/pom.xml index c3cafb0f4..f6c1f368e 100644 --- a/arrow-data-source/pom.xml +++ b/arrow-data-source/pom.xml @@ -100,26 +100,6 @@ ${scala.version} provided - - org.apache.spark - spark-sql_2.12 - ${spark.version} - - - org.apache.arrow - arrow-vector - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - provided - junit junit @@ -128,44 +108,24 @@ org.apache.spark - spark-core_2.12 - ${spark.version} + spark-sql_${scala.binary.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} test-jar test - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - org.apache.spark - spark-catalyst_2.12 - ${spark.version} - - - org.apache.arrow - arrow-vector - - + spark-catalyst_${scala.binary.version} test-jar test org.apache.spark - spark-sql_2.12 - ${spark.version} - - - org.apache.arrow - arrow-vector - - + spark-sql_${scala.binary.version} test-jar test diff --git a/native-sql-engine/core/pom.xml b/native-sql-engine/core/pom.xml index 15b0eff21..a6cbb89f5 100644 --- a/native-sql-engine/core/pom.xml +++ b/native-sql-engine/core/pom.xml @@ -48,50 +48,24 @@ org.apache.spark - spark-core_${scala.binary.version} - ${spark.version} + spark-sql_${scala.binary.version} provided - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - org.apache.spark spark-core_${scala.binary.version} - ${spark.version} test-jar test org.apache.spark spark-catalyst_${scala.binary.version} - ${spark.version} - provided - - - org.apache.spark - spark-catalyst_${scala.binary.version} - ${spark.version} test-jar test org.apache.spark spark-sql_${scala.binary.version} - ${spark.version} - provided - - - org.apache.spark - spark-sql_${scala.binary.version} - ${spark.version} test-jar test diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala index 39bd2465c..e8877acc7 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala @@ -170,7 +170,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] { plan.isSkewJoin) case plan: WindowExec => if (!enableColumnarWindow) return false - val window = ColumnarWindowExec.create( + val window = ColumnarWindowExec.createWithOptimizations( plan.windowExpression, plan.partitionSpec, plan.orderSpec, diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala index 34bcd55e2..e0734e987 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala @@ -221,36 +221,17 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] { } case plan: WindowExec => - if (columnarConf.enableColumnarWindow) { - val sortRemoved = plan.child match { - case sort: SortExec => // remove ordering requirements - replaceWithColumnarPlan(sort.child) - case _ => - replaceWithColumnarPlan(plan.child) - } - // disable CoalesceBatchesExec to reduce Netty direct memory usage - val coalesceBatchRemoved = sortRemoved match { - case s: CoalesceBatchesExec => - s.child - case _ => sortRemoved - } - logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - try { - val window = ColumnarWindowExec.create( - plan.windowExpression, - plan.partitionSpec, - plan.orderSpec, - coalesceBatchRemoved) - return window - } catch { - case _: Throwable => - logInfo("Columnar Window: Falling back to regular Window...") - } + try { + ColumnarWindowExec.createWithOptimizations( + plan.windowExpression, + plan.partitionSpec, + plan.orderSpec, + replaceWithColumnarPlan(plan.child)) + } catch { + case _: Throwable => + logInfo("Columnar Window: Falling back to regular Window...") + plan } - logDebug(s"Columnar Processing for ${plan.getClass} is not currently supported.") - val children = plan.children.map(replaceWithColumnarPlan) - plan.withNewChildren(children) - case p => val children = plan.children.map(replaceWithColumnarPlan) logDebug(s"Columnar Processing for ${p.getClass} is currently not supported.") diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala index 79907245a..8c23f0e50 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala @@ -21,41 +21,60 @@ import java.util.concurrent.TimeUnit import com.google.flatbuffers.FlatBufferBuilder import com.intel.oap.ColumnarPluginConfig -import com.intel.oap.expression.{CodeGeneration, ColumnarLiteral, ConverterUtils} +import com.intel.oap.expression.{CodeGeneration, ConverterUtils} import com.intel.oap.vectorized.{ArrowWritableColumnVector, CloseableColumnBatchIterator, ExpressionEvaluator} import org.apache.arrow.gandiva.expression.TreeBuilder -import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, Cast, Descending, Expression, Literal, MakeDecimal, NamedExpression, Rank, SortOrder, UnscaledValue, WindowExpression, WindowFunction, WindowSpecDefinition} -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Average, Count, Max, Min, Sum} -import org.apache.spark.sql.execution.window.WindowExec -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, Cast, Descending, Expression, Literal, MakeDecimal, NamedExpression, PredicateHelper, Rank, SortOrder, UnscaledValue, WindowExpression, WindowFunction, WindowSpecDefinition} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.{SortExec, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.window.WindowExecBase import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, BooleanType, DataType, DecimalType, DoubleType, FloatType, IntegerType, LongType} +import org.apache.spark.sql.types.{DataType, DateType, DecimalType, DoubleType, IntegerType, LongType, StringType, TimestampType} import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ExecutorManager import scala.collection.JavaConverters._ +import scala.collection.immutable.Stream.Empty import scala.collection.mutable.ListBuffer import scala.util.Random -class ColumnarWindowExec(windowExpression: Seq[NamedExpression], +case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], - child: SparkPlan) extends WindowExec(windowExpression, + child: SparkPlan) extends WindowExecBase(windowExpression, partitionSpec, orderSpec, child) { - override def supportsColumnar = true + override def supportsColumnar: Boolean = true override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute) + override def requiredChildDistribution: Seq[Distribution] = { + if (partitionSpec.isEmpty) { + // Only show warning when the number of bytes is larger than 100 MiB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " + + "partition, this can cause serious performance degradation.") + AllTuples :: Nil + } else ClusteredDistribution(partitionSpec) :: Nil + } + // We no longer require for sorted input for columnar window override def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"), @@ -82,75 +101,78 @@ class ColumnarWindowExec(windowExpression: Seq[NamedExpression], // leave it empty for now } - val windowFunctions: Seq[(String, Expression)] = windowExpression - .map(e => e.asInstanceOf[Alias]) - .map(a => a.child.asInstanceOf[WindowExpression]) - .map(w => (w, w.windowFunction)) - .map { - case (expr, func) => - (expr, func match { - case a: AggregateExpression => a.aggregateFunction - case b: WindowFunction => b - case f => - throw new UnsupportedOperationException("unsupported window function type: " + - f) - }) - } - .map { - case (expr, func) => - val name = func match { - case _: Sum => - checkAggFunctionSpec(expr.windowSpec) - "sum" - case _: Average => - checkAggFunctionSpec(expr.windowSpec) - "avg" - case _: Min => - checkAggFunctionSpec(expr.windowSpec) - "min" - case _: Max => - checkAggFunctionSpec(expr.windowSpec) - "max" - case c: Count => - checkAggFunctionSpec(expr.windowSpec) - if (c.children.exists(_.isInstanceOf[Literal])) { - "count_literal" - } else { - "count" - } - case _: Rank => - checkRankSpec(expr.windowSpec) - val desc: Option[Boolean] = orderSpec.foldLeft[Option[Boolean]](None) { - (desc, s) => - val currentDesc = s.direction match { - case Ascending => false - case Descending => true - case _ => throw new IllegalStateException - } - if (desc.isEmpty) { - Some(currentDesc) - } else if (currentDesc == desc.get) { - Some(currentDesc) - } else { - throw new UnsupportedOperationException("Rank: clashed rank order found") - } - } - desc match { - case Some(true) => "rank_desc" - case Some(false) => "rank_asc" - case None => "rank_asc" - } - case f => throw new UnsupportedOperationException("unsupported window function: " + f) - } - (name, func) - } - - if (windowFunctions.isEmpty) { - throw new UnsupportedOperationException("zero window functions" + - "specified in window") + def validateWindowFunctions(): Seq[(String, Expression)] = { + val windowFunctions = windowExpression + .map(e => e.asInstanceOf[Alias]) + .map(a => a.child.asInstanceOf[WindowExpression]) + .map(w => (w, w.windowFunction)) + .map { + case (expr, func) => + (expr, func match { + case a: AggregateExpression => a.aggregateFunction + case b: WindowFunction => b + case f => + throw new UnsupportedOperationException("unsupported window function type: " + + f) + }) + } + .map { + case (expr, func) => + val name = func match { + case _: Sum => + checkAggFunctionSpec(expr.windowSpec) + "sum" + case _: Average => + checkAggFunctionSpec(expr.windowSpec) + "avg" + case _: Min => + checkAggFunctionSpec(expr.windowSpec) + "min" + case _: Max => + checkAggFunctionSpec(expr.windowSpec) + "max" + case c: Count => + checkAggFunctionSpec(expr.windowSpec) + if (c.children.exists(_.isInstanceOf[Literal])) { + "count_literal" + } else { + "count" + } + case _: Rank => + checkRankSpec(expr.windowSpec) + val desc: Option[Boolean] = orderSpec.foldLeft[Option[Boolean]](None) { + (desc, s) => + val currentDesc = s.direction match { + case Ascending => false + case Descending => true + case _ => throw new IllegalStateException + } + if (desc.isEmpty) { + Some(currentDesc) + } else if (currentDesc == desc.get) { + Some(currentDesc) + } else { + throw new UnsupportedOperationException("Rank: clashed rank order found") + } + } + desc match { + case Some(true) => "rank_desc" + case Some(false) => "rank_asc" + case None => "rank_asc" + } + case f => throw new UnsupportedOperationException("unsupported window function: " + f) + } + (name, func) + } + if (windowFunctions.isEmpty) { + throw new UnsupportedOperationException("zero window functions" + + "specified in window") + } + windowFunctions } override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + val windowFunctions = validateWindowFunctions() child.executeColumnar().mapPartitionsWithIndex { (partIndex, iter) => ExecutorManager.tryTaskSet(numaBindingInfo) if (!iter.hasNext) { @@ -228,24 +250,27 @@ class ColumnarWindowExec(windowExpression: Seq[NamedExpression], SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => evaluator.close()) val windowFinishCost = System.nanoTime() - prev3 totalTime += TimeUnit.NANOSECONDS.toMillis(windowFinishCost) - val itr = batches.zipWithIndex.map { case (recordBatch, i) => { - val prev4 = System.nanoTime() - val length = recordBatch.getLength - val vectors = try { - ArrowWritableColumnVector.loadColumns(length, resultSchema, recordBatch) - } finally { - recordBatch.close() - } - val correspondingInputBatch = inputCache(i) - val batch = new ColumnarBatch( - (0 until correspondingInputBatch.numCols()).map(i => correspondingInputBatch.column(i)).toArray - ++ vectors, correspondingInputBatch.numRows()) - val emitCost = System.nanoTime() - prev4 - totalTime += TimeUnit.NANOSECONDS.toMillis(emitCost) - numOutputRows += batch.numRows() - numOutputBatches += 1 - batch - }}.toIterator + val itr = batches.zipWithIndex.map { + case (recordBatch, i) => + val prev4 = System.nanoTime() + val length = recordBatch.getLength + val vectors = try { + ArrowWritableColumnVector.loadColumns(length, resultSchema, recordBatch) + } finally { + recordBatch.close() + } + val correspondingInputBatch = inputCache(i) + val batch = new ColumnarBatch( + (0 until correspondingInputBatch.numCols()) + .map(i => correspondingInputBatch.column(i)) + .toArray + ++ vectors, correspondingInputBatch.numRows()) + val emitCost = System.nanoTime() - prev4 + totalTime += TimeUnit.NANOSECONDS.toMillis(emitCost) + numOutputRows += batch.numRows() + numOutputBatches += 1 + batch + }.toIterator new CloseableColumnBatchIterator(itr) } } @@ -284,26 +309,27 @@ class ColumnarWindowExec(windowExpression: Seq[NamedExpression], override def isComplex: Boolean = false } -} -object ColumnarWindowExec { + override protected def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException() + } +} - def createWithProjection( - windowExpression: Seq[NamedExpression], - partitionSpec: Seq[Expression], - orderSpec: Seq[SortOrder], - child: SparkPlan): SparkPlan = { +object ColumnarWindowExec extends Logging { + object AddProjectionsAroundWindow extends Rule[SparkPlan] with PredicateHelper { def makeInputProject(ex: Expression, inputProjects: ListBuffer[NamedExpression]): Expression = { ex match { - case ae: AggregateExpression => ae.withNewChildren(ae.children.map(makeInputProject(_, inputProjects))) - case ae: WindowExpression => ae.withNewChildren(ae.children.map(makeInputProject(_, inputProjects))) + case ae: AggregateExpression => ae.withNewChildren( + ae.children.map(makeInputProject(_, inputProjects))) + case ae: WindowExpression => ae.withNewChildren( + ae.children.map(makeInputProject(_, inputProjects))) case func @ (_: AggregateFunction | _: WindowFunction) => val params = func.children // rewrite val rewritten = func match { case _: Average => - // rewrite params for AVG + // rewrite params for AVG params.map { param => param.dataType match { @@ -338,7 +364,8 @@ object ColumnarWindowExec { DataType.equalsStructurally(from, to) } - def makeOutputProject(ex: Expression, windows: ListBuffer[NamedExpression], inputProjects: ListBuffer[NamedExpression]): Expression = { + def makeOutputProject(ex: Expression, windows: ListBuffer[NamedExpression], + inputProjects: ListBuffer[NamedExpression]): Expression = { val out = ex match { case we: WindowExpression => val aliasName = "__alias_%d__".format(Random.nextLong()) @@ -357,34 +384,118 @@ object ColumnarWindowExec { } } catch { case t: Throwable => + // scalastyle:off println System.err.println("Warning: " + t.getMessage) Cast(out, ex.dataType) + // scalastyle:on println } casted } - val windows = ListBuffer[NamedExpression]() - val inProjectExpressions = ListBuffer[NamedExpression]() - val outProjectExpressions = windowExpression.map(e => e.asInstanceOf[Alias]) - .map { a => - a.withNewChildren(List(makeOutputProject(a.child, windows, inProjectExpressions))) + override def apply(plan: SparkPlan): SparkPlan = plan transformUp { + case p @ ColumnarWindowExec(windowExpression, partitionSpec, orderSpec, child) => + val windows = ListBuffer[NamedExpression]() + val inProjectExpressions = ListBuffer[NamedExpression]() + val outProjectExpressions = windowExpression.map(e => e.asInstanceOf[Alias]) + .map { a => + a.withNewChildren(List(makeOutputProject(a.child, windows, inProjectExpressions))) .asInstanceOf[NamedExpression] - } + } + val inputProject = ColumnarConditionProjectExec(null, + child.output ++ inProjectExpressions, child) + val window = new ColumnarWindowExec(windows, partitionSpec, orderSpec, inputProject) + val outputProject = ColumnarConditionProjectExec(null, + child.output ++ outProjectExpressions, window) + outputProject + } + } + + object RemoveSort extends Rule[SparkPlan] with PredicateHelper { + override def apply(plan: SparkPlan): SparkPlan = plan transform { + case p1 @ ColumnarWindowExec(_, _, _, p2 @ (_: SortExec | _: ColumnarSortExec)) => + p1.withNewChildren(p2.children) + } + } - val inputProject = ColumnarConditionProjectExec(null, child.output ++ inProjectExpressions, child) + object RemoveCoalesceBatches extends Rule[SparkPlan] with PredicateHelper { + override def apply(plan: SparkPlan): SparkPlan = plan transform { + case p1 @ ColumnarWindowExec(_, _, _, p2: CoalesceBatchesExec) => + p1.withNewChildren(p2.children) + } + } + + /** + * FIXME casting solution for timestamp/date32 support + */ + object CastMutableTypes extends Rule[SparkPlan] with PredicateHelper { + override def apply(plan: SparkPlan): SparkPlan = plan transform { + case p: ColumnarWindowExec => p.transformExpressionsDown { + case we @ WindowExpression(ae @ AggregateExpression(af, _, _, _, _), _) => af match { + case Min(e) => e.dataType match { + case t @ (_: TimestampType) => + Cast(we.copy( + windowFunction = + ae.copy(aggregateFunction = Min(Cast(e, LongType)))), TimestampType) + case t @ (_: DateType) => + Cast( + Cast(we.copy( + windowFunction = + ae.copy(aggregateFunction = Min(Cast(Cast(e, TimestampType, + Some(DateTimeUtils.TimeZoneUTC.getID)), LongType)))), + TimestampType), DateType, Some(DateTimeUtils.TimeZoneUTC.getID)) + case _ => we + } + case Max(e) => e.dataType match { + case t @ (_: TimestampType) => + Cast(we.copy( + windowFunction = + ae.copy(aggregateFunction = Max(Cast(e, LongType)))), TimestampType) + case t @ (_: DateType) => + Cast( + Cast(we.copy( + windowFunction = + ae.copy(aggregateFunction = Max(Cast(Cast(e, TimestampType, + Some(DateTimeUtils.TimeZoneUTC.getID)), LongType)))), + TimestampType), DateType, Some(DateTimeUtils.TimeZoneUTC.getID)) + case _ => we + } + case _ => we + } + } + } + } - val window = new ColumnarWindowExec(windows, partitionSpec, orderSpec, inputProject) + object Validate extends Rule[SparkPlan] with PredicateHelper { + override def apply(plan: SparkPlan): SparkPlan = plan transform { + case w: ColumnarWindowExec => + w.validateWindowFunctions() + w + } + } - val outputProject = ColumnarConditionProjectExec(null, child.output ++ outProjectExpressions, window) + object ColumnarWindowOptimizations extends RuleExecutor[SparkPlan] { + override protected def batches: Seq[ColumnarWindowOptimizations.Batch] = + Batch("Remove Sort", FixedPoint(10), RemoveSort) :: + Batch("Remove Coalesce Batches", FixedPoint(10), RemoveCoalesceBatches) :: +// Batch("Cast Mutable Types", Once, CastMutableTypes) :: + Batch("Add Projections", FixedPoint(1), AddProjectionsAroundWindow) :: + Batch("Validate", Once, Validate) :: + Nil + } - outputProject + def optimize(plan: ColumnarWindowExec): SparkPlan = { + ColumnarWindowOptimizations.execute(plan) } - def create( - windowExpression: Seq[NamedExpression], + def createWithOptimizations(windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], child: SparkPlan): SparkPlan = { - createWithProjection(windowExpression, partitionSpec, orderSpec, child) + val columnar = new ColumnarWindowExec( + windowExpression, + partitionSpec, + orderSpec, + child) + ColumnarWindowExec.optimize(columnar) } } diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala index e4ef15cfd..128f68bb7 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala @@ -105,7 +105,15 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { df.show() } - test("window function with decimal input 2") { + test("window function with date input") { + val df = spark.sql("SELECT MAX(cc_rec_end_date) OVER (PARTITION BY cc_company)," + + "MIN(cc_rec_end_date) OVER (PARTITION BY cc_company)" + + "FROM call_center LIMIT 100") + df.explain() + df.show() + } + + ignore("window function with decimal input 2") { val df = spark.sql("SELECT i_item_sk, i_class_id, RANK()" + " OVER (PARTITION BY i_class_id ORDER BY i_current_price) FROM item LIMIT 1000") df.explain() diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc index c930d5cce..43f530cb2 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/actions_impl.cc @@ -3696,6 +3696,11 @@ arrow::Status MakeMinAction(arrow::compute::ExecContext* ctx, *out = std::dynamic_pointer_cast(action_ptr); \ } break; PROCESS_SUPPORTED_TYPES(PROCESS) + case arrow::Date32Type::type_id: { + using CType = typename arrow::TypeTraits::CType; + auto action_ptr = std::make_shared>(ctx, type); + *out = std::dynamic_pointer_cast(action_ptr); + } break; case arrow::Decimal128Type::type_id: { auto action_ptr = std::make_shared>(ctx, @@ -3721,6 +3726,11 @@ arrow::Status MakeMaxAction(arrow::compute::ExecContext* ctx, *out = std::dynamic_pointer_cast(action_ptr); \ } break; PROCESS_SUPPORTED_TYPES(PROCESS) + case arrow::Date32Type::type_id: { + using CType = typename arrow::TypeTraits::CType; + auto action_ptr = std::make_shared>(ctx, type); + *out = std::dynamic_pointer_cast(action_ptr); + } break; case arrow::Decimal128Type::type_id: { auto action_ptr = std::make_shared>(ctx, diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h index 4921141c6..734d8c727 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/kernels_ext.h @@ -143,6 +143,10 @@ class WindowAggregateFunctionKernel : public KernalBase { arrow::Result>> createBuilder(std::shared_ptr data_type); + template + typename arrow::enable_if_date>> + createBuilder(std::shared_ptr data_type); + template typename arrow::enable_if_number>> createBuilder(std::shared_ptr data_type); diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc index 003c22f63..ce01fc870 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/window_kernel.cc @@ -153,6 +153,7 @@ arrow::Status WindowAggregateFunctionKernel::Evaluate(const ArrayList& in) { PROC(arrow::Int64Type, arrow::Int64Builder, arrow::Int64Array) \ PROC(arrow::FloatType, arrow::FloatBuilder, arrow::FloatArray) \ PROC(arrow::DoubleType, arrow::DoubleBuilder, arrow::DoubleArray) \ + PROC(arrow::Date32Type, arrow::Date32Builder, arrow::Date32Array) \ PROC(arrow::Decimal128Type, arrow::Decimal128Builder, arrow::Decimal128Array) arrow::Status WindowAggregateFunctionKernel::Finish(ArrayList* out) { @@ -211,6 +212,12 @@ WindowAggregateFunctionKernel::createBuilder(std::shared_ptr da return std::make_shared(data_type, ctx_->memory_pool()); } +template +typename arrow::enable_if_date>> +WindowAggregateFunctionKernel::createBuilder(std::shared_ptr data_type) { + return std::make_shared(ctx_->memory_pool()); +} + template typename arrow::enable_if_number>> WindowAggregateFunctionKernel::createBuilder(std::shared_ptr data_type) { diff --git a/pom.xml b/pom.xml index f2ff9888d..10cd1bdbb 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,77 @@ native-sql-engine/core + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + org.apache.arrow + arrow-vector + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + test-jar + test + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + + + org.apache.arrow + * + + + test-jar + test + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + + + org.apache.arrow + * + + + test-jar + test + + + + hadoop-3.2