Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
Replace SortAggregate at columnar override
Browse files Browse the repository at this point in the history
  • Loading branch information
PHILO-HE committed Jun 2, 2022
1 parent 07c1c95 commit 931cdaf
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 271 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,14 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.ShufflePartitionSpec
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{ShuffleStageInfo, _}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, ColumnarArrowEvalPythonExec}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf

import org.apache.spark.util.ShufflePartitionUtils

import scala.collection.mutable
Expand Down Expand Up @@ -122,6 +121,17 @@ case class ColumnarPreOverrides(session: SparkSession) extends Rule[SparkPlan] {
plan.initialInputBufferOffset,
plan.resultExpressions,
child)
case plan: SortAggregateExec if (columnarConf.enableHashAggForStringType) =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarHashAggregateExec(
plan.requiredChildDistributionExpressions,
plan.groupingExpressions,
plan.aggregateExpressions,
plan.aggregateAttributes,
plan.initialInputBufferOffset,
plan.resultExpressions,
child)
case plan: UnionExec =>
val children = plan.children.map(replaceWithColumnarPlan)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution

import java.util.Properties

import com.intel.oap.GazellePluginConfig

import scala.collection.mutable
import scala.util.{Random, Try}
import scala.util.control.NonFatal
Expand Down Expand Up @@ -109,8 +107,7 @@ class UnsafeFixedWidthAggregationMapSuite
StructType(StructField("x", DecimalType.USER_DEFAULT) :: Nil)))
assert(supportsAggregationBufferSchema(
StructType(StructField("x", DecimalType.SYSTEM_DEFAULT) :: Nil)))
// Force to use hash agg for string type.
assert(supportsAggregationBufferSchema(StructType(StructField("x", StringType) :: Nil)))
assert(!supportsAggregationBufferSchema(StructType(StructField("x", StringType) :: Nil)))
assert(
!supportsAggregationBufferSchema(StructType(StructField("x", ArrayType(IntegerType)) :: Nil)))
}
Expand Down

0 comments on commit 931cdaf

Please sign in to comment.