Skip to content

Commit

Permalink
Revert "Implement SumUnboundedToUnboundedFixer (NVIDIA#8934)"
Browse files Browse the repository at this point in the history
This reverts commit 8927411.
  • Loading branch information
andygrove committed Aug 17, 2023
1 parent d586195 commit 6da7e65
Show file tree
Hide file tree
Showing 7 changed files with 5 additions and 167 deletions.
1 change: 0 additions & 1 deletion docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ Name | Description | Default Value | Applicable at
<a name="sql.window.range.int.enabled"></a>spark.rapids.sql.window.range.int.enabled|When the order-by column of a range based window is int type and the range boundary calculated for a value has overflow, CPU and GPU will get the different results. When set to false disables the range window acceleration for the int type order-by column|true|Runtime
<a name="sql.window.range.long.enabled"></a>spark.rapids.sql.window.range.long.enabled|When the order-by column of a range based window is long type and the range boundary calculated for a value has overflow, CPU and GPU will get the different results. When set to false disables the range window acceleration for the long type order-by column|true|Runtime
<a name="sql.window.range.short.enabled"></a>spark.rapids.sql.window.range.short.enabled|When the order-by column of a range based window is short type and the range boundary calculated for a value has overflow, CPU and GPU will get the different results. When set to false disables the range window acceleration for the short type order-by column|false|Runtime
<a name="sql.window.unboundedFloatAggFixerEnabled"></a>spark.rapids.sql.window.unboundedFloatAggFixerEnabled|When set to false, this disables an optimization on window aggregate functions that operate on floating point inputs with unbounded preceding and unbounded following. When enabled, it is possible that results may differ from run to run due to the order of operations changing|true|Runtime

## Supported GPU Operators and Fine Tuning
_The RAPIDS Accelerator for Apache Spark_ can be configured to enable or disable specific
Expand Down
21 changes: 0 additions & 21 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,6 @@
['child_string', StringGen()]
])]

numeric_gens = [byte_gen, short_gen, int_gen, long_gen,
FloatGen(no_nans=False, special_cases=[]),
DoubleGen(no_nans=False, special_cases=[]),
DecimalGen(precision=18, scale=1),
DecimalGen(precision=38, scale=1)]

_float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true',
'spark.rapids.sql.castStringToFloat.enabled': 'true'
}
Expand Down Expand Up @@ -246,21 +240,6 @@ def test_decimal_running_sum_window_no_part(data_gen):
'from window_agg_table',
conf = {'spark.rapids.sql.batchSizeBytes': '100'})

@ignore_order
@approximate_float
@pytest.mark.parametrize('data_gen', numeric_gens, ids=idfn)
def test_numeric_running_sum_window_no_part_unbounded(data_gen):
assert_gpu_and_cpu_are_equal_sql(
lambda spark: two_col_df(spark, UniqueLongGen(), data_gen),
'window_agg_table',
'select '
' sum(b) over '
' (order by a asc '
' rows between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as sum_b_asc '
'from window_agg_table',
conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true',
'spark.rapids.sql.batchSizeBytes': '100'})

@pytest.mark.xfail(reason="[UNSUPPORTED] Ranges over order by byte column overflow "
"(https://github.com/NVIDIA/spark-rapids/pull/2020#issuecomment-838127070)")
@ignore_order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Attribut
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuAggregateExpression, GpuBasicSum}
import org.apache.spark.sql.rapids.GpuAggregateExpression
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.unsafe.types.CalendarInterval
Expand Down Expand Up @@ -292,7 +291,7 @@ case class BatchedOps(running: Seq[NamedExpression],
def hasDoublePass: Boolean = unboundedToUnbounded.nonEmpty
}

object GpuWindowExec {
object GpuWindowExec {
/**
* As a part of `splitAndDedup` the dedup part adds a layer of indirection. This attempts to
* remove that layer of indirection.
Expand Down Expand Up @@ -1763,34 +1762,13 @@ class GpuCachedDoublePassWindowIterator(
}
}

private lazy val conf: RapidsConf = new RapidsConf(SQLConf.get)

private lazy val fixerIndexMap: Map[Int, FixerPair] =
boundWindowOps.zipWithIndex.flatMap {
case (GpuAlias(GpuWindowExpression(func, _), _), index) =>

val okToFix = func match {
// we may want to make this check more generic when we add unbounded fixers for other
// aggregate functions that operate on floating-point inputs
case f: GpuBasicSum =>
f.child.dataType match {
case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType |
DataTypes.LongType | _: DecimalType =>
true
case DataTypes.FloatType | DataTypes.DoubleType =>
conf.isUnboundedFloatOptimizationEnabled
case _ =>
false
}
case _ =>
true
}

func match {
case f: GpuUnboundToUnboundWindowWithFixer if okToFix =>
case f: GpuUnboundToUnboundWindowWithFixer =>
Some((index, new FixerPair(f)))
case GpuAggregateExpression(f: GpuUnboundToUnboundWindowWithFixer, _, _, _, _)
if okToFix =>
case GpuAggregateExpression(f: GpuUnboundToUnboundWindowWithFixer, _, _, _, _) =>
Some((index, new FixerPair(f)))
case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.rapids.{AddOverflowChecks, GpuAggregateExpression, GpuCount, GpuCreateNamedStruct, GpuDivide, GpuSubtract}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.shims.RapidsErrorUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
Expand Down Expand Up @@ -1314,99 +1312,6 @@ class SumBinaryFixer(toType: DataType, isAnsi: Boolean)
}
}

/**
* Fixes up a sum operation for unbounded preceding to unbounded following
* @param errorOnOverflow if we need to throw an exception when an overflow happens or not.
*/
class SumUnboundedToUnboundedFixer(resultType: DataType, failOnError: Boolean)
extends BatchedUnboundedToUnboundedWindowFixer {

private var previousValue: Option[Scalar] = None

private val numeric = TypeUtils.getNumeric(resultType, failOnError)

override def updateState(scalar: Scalar): Unit = {
if (scalar.isValid) {
previousValue match {
case None =>
previousValue = Some(scalar.incRefCount())
case Some(prev) =>
withResource(prev) { _ =>
previousValue = None
prev.getType.getTypeId match {
case DType.DTypeEnum.INT64 =>
if (failOnError) {
previousValue = Some(Scalar.fromLong(Math.addExact(
scalar.getLong, prev.getLong)))
} else {
previousValue = Some(Scalar.fromLong(scalar.getLong + prev.getLong))
}
case DType.DTypeEnum.FLOAT32 =>
previousValue = Some(Scalar.fromFloat(scalar.getFloat + prev.getFloat))
case DType.DTypeEnum.FLOAT64 =>
previousValue = Some(Scalar.fromDouble(scalar.getDouble + prev.getDouble))
case DType.DTypeEnum.DECIMAL32 | DType.DTypeEnum.DECIMAL64 |
DType.DTypeEnum.DECIMAL128 =>
val decimal = numeric.plus(Decimal(prev.getBigDecimal),
Decimal(scalar.getBigDecimal)).asInstanceOf[Decimal]
val dt = resultType.asInstanceOf[DecimalType]
previousValue = Option(TrampolineUtil.checkDecimalOverflow(
decimal, dt.precision, dt.scale, failOnError))
.map(n => Scalar.fromDecimal(n.toJavaBigDecimal))
case other =>
throw new IllegalStateException(s"unhandled type: $other")
}
}
}
}
}

override def fixUp(
samePartitionMask: Either[ColumnVector, Boolean],
column: ColumnVector): ColumnVector = {

def makeScalar(): Scalar = previousValue match {
case Some(value) =>
value
case _ => resultType match {
case DataTypes.LongType => Scalar.fromNull(DType.INT64)
case DataTypes.FloatType => Scalar.fromNull(DType.FLOAT32)
case DataTypes.DoubleType => Scalar.fromNull(DType.FLOAT64)
case d: DecimalType =>
val dt = if (d.precision > DType.DECIMAL64_MAX_PRECISION) {
DType.DTypeEnum.DECIMAL128.getNativeId
} else if (d.precision > DType.DECIMAL32_MAX_PRECISION) {
DType.DTypeEnum.DECIMAL64.getNativeId
} else {
DType.DTypeEnum.DECIMAL32.getNativeId
}
Scalar.fromNull(DType.fromNative(dt, d.scale))
case other =>
throw new IllegalStateException(s"unhandled type: $other")
}
}

samePartitionMask match {
case scala.Left(cv) =>
cv.ifElse(makeScalar(), column)
case scala.Right(true) =>
ColumnVector.fromScalar(makeScalar(), column.getRowCount.toInt)
case _ =>
column.incRefCount()
}
}

override def close(): Unit = {
reset()
}

override def reset(): Unit = {
previousValue.foreach(_.close())
previousValue = None
}
}


/**
* Rank is more complicated than DenseRank to fix. This is because there are gaps in the
* rank values. The rank value of each group is row number of the first row in the group.
Expand Down
11 changes: 0 additions & 11 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1306,15 +1306,6 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

val ENABLE_UNBOUNDED_OPTIMIZATION_FLOAT: ConfEntryWithDefault[Boolean] =
conf("spark.rapids.sql.window.unboundedFloatAggFixerEnabled")
.doc("When set to false, this disables an optimization on window aggregate functions " +
"that operate on floating point inputs with unbounded preceding and unbounded following. " +
"When enabled, it is possible that results may differ from run to run due to the " +
"order of operations changing")
.booleanConf
.createWithDefault(true)

val ENABLE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] =
conf("spark.rapids.sql.agg.singlePassPartialSortEnabled")
.doc("Enable or disable a single pass partial sort optimization where if a heuristic " +
Expand Down Expand Up @@ -2654,8 +2645,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isRangeWindowDecimalEnabled: Boolean = get(ENABLE_RANGE_WINDOW_DECIMAL)

lazy val isUnboundedFloatOptimizationEnabled: Boolean = get(ENABLE_UNBOUNDED_OPTIMIZATION_FLOAT)

lazy val allowSinglePassPartialSortAgg: Boolean = get(ENABLE_SINGLE_PASS_PARTIAL_SORT_AGG)

lazy val forceSinglePassPartialSortAgg: Boolean = get(FORCE_SINGLE_PASS_PARTIAL_SORT_AGG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import ai.rapids.cudf.{Aggregation128Utils, BinaryOp, ColumnVector, DType, Group
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression
import com.nvidia.spark.rapids.SumUnboundedToUnboundedFixer
import com.nvidia.spark.rapids.shims.{GpuDeterministicFirstLastCollectShim, ShimExpression, ShimUnaryExpression, TypeUtilsShims}

import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -1137,7 +1136,6 @@ abstract class GpuSum(
extends GpuAggregateFunction
with ImplicitCastInputTypes
with GpuBatchedRunningWindowWithFixer
with GpuUnboundToUnboundWindowWithFixer
with GpuAggregateWindowFunction
with GpuRunningWindowFunction
with Serializable {
Expand Down Expand Up @@ -1207,10 +1205,6 @@ abstract class GpuSum(
override def scanCombine(isRunningBatched: Boolean, cols: Seq[ColumnVector]): ColumnVector = {
cols.head.incRefCount()
}

override def newUnboundedToUnboundedFixer: BatchedUnboundedToUnboundedWindowFixer = {
new SumUnboundedToUnboundedFixer(resultType, failOnErrorOverride)
}
}

/** Sum aggregation for non-decimal types */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim
import org.apache.spark.sql.rapids.shims.SparkUpgradeExceptionShims
import org.apache.spark.sql.types.{DataType, Decimal, StructType}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{ShutdownHookManager, Utils}

Expand Down Expand Up @@ -172,10 +172,4 @@ object TrampolineUtil {
def getSparkConf(spark: SparkSession): SQLConf = {
spark.sqlContext.conf
}

def checkDecimalOverflow(value: Decimal, precision: Int, scale: Int,
failOnError: Boolean): Decimal = {
value.toPrecision(precision, scale, roundMode = Decimal.ROUND_HALF_UP,
nullOnOverflow = !failOnError)
}
}

0 comments on commit 6da7e65

Please sign in to comment.