From 5be4bd5cd11cba034f370d5fdba6e18230193e66 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 19 Jul 2024 22:18:08 -0500 Subject: [PATCH] Multi-get_json_object [databricks] (#11200) --- .../rapids/GpuOptimisticTransactionBase.scala | 4 +- .../rapids/GpuOptimisticTransactionBase.scala | 4 +- .../spark/rapids/GpuAggregateExec.scala | 41 ++- .../spark/rapids/GpuBoundAttribute.scala | 43 ++- .../nvidia/spark/rapids/GpuExpandExec.scala | 9 +- .../nvidia/spark/rapids/GpuGenerateExec.scala | 1 - .../spark/rapids/GpuGetJsonObject.scala | 252 ++++++++++++++++-- .../nvidia/spark/rapids/GpuOverrides.scala | 2 +- .../rapids/GpuShuffledHashJoinExec.scala | 2 +- .../spark/rapids/GpuSortMergeJoinMeta.scala | 2 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 16 ++ .../spark/rapids/basicPhysicalOperators.scala | 18 +- .../rapids/window/GpuWindowExecMeta.scala | 6 +- .../GpuEquivalentExpressions.scala | 101 ++++++- .../GpuBroadcastNestedLoopJoinExecBase.scala | 2 +- .../sql/rapids/GpuFileFormatWriter.scala | 2 +- .../GpuBroadcastNestedLoopJoinExec.scala | 8 +- .../GpuBroadcastNestedLoopJoinExec.scala | 6 +- .../sql/rapids/GpuFileFormatWriter.scala | 2 +- .../GpuBroadcastNestedLoopJoinExec.scala | 6 +- .../rapids/HashAggregateRetrySuite.scala | 6 +- .../rapids/NonDeterministicRetrySuite.scala | 11 +- .../spark/rapids/ProjectExprSuite.scala | 3 +- 23 files changed, 434 insertions(+), 113 deletions(-) diff --git a/delta-lake/common/src/main/databricks/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransactionBase.scala b/delta-lake/common/src/main/databricks/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransactionBase.scala index 8d5abac35b6..b6e9e11946d 100644 --- a/delta-lake/common/src/main/databricks/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransactionBase.scala +++ b/delta-lake/common/src/main/databricks/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransactionBase.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala * in the Delta Lake project at https://github.com/delta-io/delta. @@ -100,7 +100,7 @@ abstract class GpuOptimisticTransactionBase GpuAlias(GpuEmpty2Null(p), p.name)() case attr => attr } - if (needConvert) GpuProjectExec(projectList.toList, plan)() else plan + if (needConvert) GpuProjectExec(projectList.toList, plan) else plan } /** diff --git a/delta-lake/common/src/main/delta-io/scala/org/apache/spark/sql/delta/rapids/GpuOptimisticTransactionBase.scala b/delta-lake/common/src/main/delta-io/scala/org/apache/spark/sql/delta/rapids/GpuOptimisticTransactionBase.scala index 1d5b5a1f72c..afa5ecbc6ee 100644 --- a/delta-lake/common/src/main/delta-io/scala/org/apache/spark/sql/delta/rapids/GpuOptimisticTransactionBase.scala +++ b/delta-lake/common/src/main/delta-io/scala/org/apache/spark/sql/delta/rapids/GpuOptimisticTransactionBase.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -80,7 +80,7 @@ abstract class GpuOptimisticTransactionBase( GpuAlias(GpuEmpty2Null(p), p.name)() case attr => attr } - if (needConvert) GpuProjectExec(projectList.toList, plan)() else plan + if (needConvert) GpuProjectExec(projectList.toList, plan) else plan } /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index b35e687d185..b5360a62f94 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -45,6 +45,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan} import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.aggregate.{CpuToGpuAggregateBufferConverter, CudfAggregate, GpuAggregateExpression, GpuToCpuAggregateBufferConverter} import org.apache.spark.sql.rapids.execution.{GpuShuffleMeta, TrampolineUtil} import org.apache.spark.sql.types._ @@ -171,15 +172,16 @@ object AggregateModeInfo { * the merge steps for each aggregate function * @param isSorted if the batch is sorted this is set to true and is passed to cuDF * as an optimization hint - * @param useTieredProject if true, used tiered project for input projections + * @param conf A configuration used to control TieredProject operations in an + * aggregation. */ class AggHelper( inputAttributes: Seq[Attribute], groupingExpressions: Seq[NamedExpression], aggregateExpressions: Seq[GpuAggregateExpression], forceMerge: Boolean, - isSorted: Boolean = false, - useTieredProject: Boolean = true) extends Serializable { + conf: SQLConf, + isSorted: Boolean = false) extends Serializable { private var doSortAgg = isSorted @@ -261,11 +263,11 @@ class AggHelper( inputAttributes } val preStepBound = GpuBindReferences.bindGpuReferencesTiered(preStep.toList, - preStepAttributes.toList, useTieredProject) + preStepAttributes.toList, conf) // a bound expression that is applied after the cuDF aggregate private val postStepBound = GpuBindReferences.bindGpuReferencesTiered(postStep.toList, - postStepAttr.toList, useTieredProject) + postStepAttr.toList, conf) /** * Apply the "pre" step: preMerge for merge, or pass-through in the update case @@ -723,7 +725,7 @@ class GpuMergeAggregateIterator( modeInfo: AggregateModeInfo, metrics: GpuHashAggregateMetrics, configuredTargetBatchSize: Long, - useTieredProject: Boolean, + conf: SQLConf, allowNonFullyAggregatedOutput: Boolean, skipAggPassReductionRatio: Double, localInputRowsCount: LocalGpuMetric) @@ -937,7 +939,7 @@ class GpuMergeAggregateIterator( private lazy val concatAndMergeHelper = new AggHelper(inputAttributes, groupingExpressions, aggregateExpressions, - forceMerge = true, useTieredProject = useTieredProject) + forceMerge = true, conf = conf) /** * Concatenate batches together and perform a merge aggregation on the result. The input batches @@ -1017,7 +1019,7 @@ class GpuMergeAggregateIterator( private val mergeSortedHelper = new AggHelper(inputAttributes, groupingExpressions, aggregateExpressions, - forceMerge = true, isSorted = true, useTieredProject = useTieredProject) + forceMerge = true, conf, isSorted = true) override def next(): ColumnarBatch = { // batches coming out of the sort need to be merged @@ -1265,7 +1267,6 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( aggregateExpressions.map(_.convertToGpu().asInstanceOf[GpuAggregateExpression]) val gpuGroupingExpressions = groupingExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]) - val useTiered = conf.isTieredProjectEnabled // Sorting before we do an aggregation helps if the size of the input data is // smaller than the size of the output data. But it is an aggregation how can @@ -1291,7 +1292,7 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( gpuChild.output, inputAggBufferAttributes) val preProcessAggHelper = new AggHelper( inputAttrs, gpuGroupingExpressions, gpuAggregateExpressions, - forceMerge = false, useTieredProject = useTiered) + forceMerge = false, conf = agg.conf) // We are going to estimate the growth by looking at the estimated size the output could // be compared to the estimated size of the input (both based off of the schemas). @@ -1327,7 +1328,6 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( resultExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]), gpuChild, conf.gpuTargetBatchSizeBytes, - useTiered, estimatedPreProcessGrowth, conf.forceSinglePassPartialSortAgg, allowSinglePassAgg, @@ -1414,12 +1414,11 @@ abstract class GpuTypedImperativeSupportedAggregateExecMeta[INPUT <: BaseAggrega retExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]), childPlans.head.convertIfNeeded(), conf.gpuTargetBatchSizeBytes, - conf.isTieredProjectEnabled, // For now we are just going to go with the original hash aggregation 1.0, - false, - false, - false, + forceSinglePassAgg = false, + allowSinglePassAgg = false, + allowNonFullyAggregatedOutput = false, 1) } else { super.convertToGpu() @@ -1782,7 +1781,6 @@ case class GpuHashAggregateExec( resultExpressions: Seq[NamedExpression], child: SparkPlan, configuredTargetBatchSize: Long, - configuredTieredProjectEnabled: Boolean, estimatedPreProcessGrowth: Double, forceSinglePassAgg: Boolean, allowSinglePassAgg: Boolean, @@ -1853,7 +1851,6 @@ case class GpuHashAggregateExec( val resultExprs = resultExpressions val modeInfo = AggregateModeInfo(uniqueModes) val targetBatchSize = configuredTargetBatchSize - val useTieredProject = configuredTieredProjectEnabled val rdd = child.executeColumnar() @@ -1864,7 +1861,7 @@ case class GpuHashAggregateExec( expectedOrdering) && expectedOrdering.nonEmpty val localEstimatedPreProcessGrowth = estimatedPreProcessGrowth - val boundGroupExprs = GpuBindReferences.bindGpuReferencesTiered(groupingExprs, inputAttrs, true) + val boundGroupExprs = GpuBindReferences.bindGpuReferencesTiered(groupingExprs, inputAttrs, conf) rdd.mapPartitions { cbIter => val postBoundReferences = GpuAggFinalPassIterator.setupReferences(groupingExprs, @@ -1873,7 +1870,7 @@ case class GpuHashAggregateExec( new DynamicGpuPartialSortAggregateIterator(cbIter, inputAttrs, groupingExprs, boundGroupExprs, aggregateExprs, aggregateAttrs, resultExprs, modeInfo, localEstimatedPreProcessGrowth, alreadySorted, expectedOrdering, - postBoundReferences, targetBatchSize, aggMetrics, useTieredProject, + postBoundReferences, targetBatchSize, aggMetrics, conf, localForcePre, localAllowPre, allowNonFullyAggregatedOutput, skipAggPassReductionRatio) } } @@ -1990,7 +1987,7 @@ class DynamicGpuPartialSortAggregateIterator( postBoundReferences: BoundExpressionsModeAggregates, configuredTargetBatchSize: Long, metrics: GpuHashAggregateMetrics, - useTiered: Boolean, + conf: SQLConf, forceSinglePassAgg: Boolean, allowSinglePassAgg: Boolean, allowNonFullyAggregatedOutput: Boolean, @@ -2092,7 +2089,7 @@ class DynamicGpuPartialSortAggregateIterator( modeInfo, metrics, configuredTargetBatchSize, - useTiered, + conf, allowNonFullyAggregatedOutput, skipAggPassReductionRatio, localInputRowsMetrics) @@ -2104,7 +2101,7 @@ class DynamicGpuPartialSortAggregateIterator( if (aggIter.isEmpty) { val preProcessAggHelper = new AggHelper( inputAttrs, groupingExprs, aggregateExprs, - forceMerge = false, isSorted = true, useTieredProject = useTiered) + forceMerge = false, isSorted = true, conf = conf) val (inputIter, doSinglePassAgg) = if (allowSinglePassAgg) { if (forceSinglePassAgg || alreadySorted) { (cbIter, true) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala index af50d87bd36..76adcfa11a6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import com.nvidia.spark.rapids.shims.ShimExpression import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSeq, Expression, ExprId, NamedExpression, SortOrder} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.catalyst.expressions.GpuEquivalentExpressions import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -130,18 +131,23 @@ object GpuBindReferences extends Logging { /** * A helper function to bind given expressions to an input schema where the expressions are - * to be processed on the GPU, and the result type indicates this. If runTiered is true - * Common sub-expressions will be factored out where possible to reduce the runtime and memory. - * If set to false a GpuTieredProject object will still be returned, but no common - * sub-expressions will be factored out. + * to be processed on the GPU, and the result type indicates this. + * Some expressions that can be combined into a single expression call as well as + * common sub-expressions may be factored out where possible to reduce the runtime and memory. + * All of these can be controlled by the configuration passed in. */ def bindGpuReferencesTiered[A <: Expression]( expressions: Seq[A], input: AttributeSeq, - runTiered: Boolean): GpuTieredProject = { + conf: SQLConf): GpuTieredProject = { - if (runTiered) { - val exprTiers = GpuEquivalentExpressions.getExprTiers(expressions) + if (RapidsConf.ENABLE_TIERED_PROJECT.get(conf)) { + val replaced = if (RapidsConf.ENABLE_COMBINED_EXPRESSIONS.get(conf)) { + GpuEquivalentExpressions.replaceMultiExpressions(expressions, conf) + } else { + expressions + } + val exprTiers = GpuEquivalentExpressions.getExprTiers(replaced) val inputTiers = GpuEquivalentExpressions.getInputTiers(exprTiers, input) // Update ExprTiers to include the columns that are pass through and drop unneeded columns val newExprTiers = exprTiers.zipWithIndex.map { @@ -158,10 +164,27 @@ object GpuBindReferences extends Logging { exprTier } } - GpuTieredProject(newExprTiers.zip(inputTiers).map { + val tiered = newExprTiers.zip(inputTiers).map { case (es: Seq[Expression], is: AttributeSeq) => es.map(GpuBindReferences.bindGpuReference(_, is)).toList - }) + } + logTrace { + "INPUT:\n" + + expressions.zipWithIndex.map { + case (expr, idx) => + s"\t$idx:\t$expr" + }.mkString("\n") + + "\nOUTPUT:\n" + + tiered.zipWithIndex.map { + case (exprs, tier) => + s"\tTIER $tier\n" + + exprs.zipWithIndex.map { + case (expr, idx) => + s"\t\t$idx:\t$expr" + }.mkString("\n") + }.mkString("\n") + } + GpuTieredProject(tiered) } else { GpuTieredProject(Seq(GpuBindReferences.bindGpuReferences(expressions, input))) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala index 0fc7defd063..e13d680a31d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala @@ -53,7 +53,6 @@ class GpuExpandExecMeta( override def convertToGpu(): GpuExec = { val projections = gpuProjections.map(_.map(_.convertToGpu())) GpuExpandExec(projections, expand.output, childPlans.head.convertIfNeeded())( - useTieredProject = conf.isTieredProjectEnabled, preprojectEnabled = conf.isExpandPreprojectEnabled) } } @@ -71,11 +70,9 @@ case class GpuExpandExec( projections: Seq[Seq[Expression]], output: Seq[Attribute], child: SparkPlan)( - useTieredProject: Boolean = false, preprojectEnabled: Boolean = false) extends ShimUnaryExecNode with GpuExec { override def otherCopyArgs: Seq[AnyRef] = Seq[AnyRef]( - useTieredProject.asInstanceOf[java.lang.Boolean], preprojectEnabled.asInstanceOf[java.lang.Boolean]) private val PRE_PROJECT_TIME = "preprojectTime" @@ -102,10 +99,10 @@ case class GpuExpandExec( var projectionsForBind = projections var attributesForBind = child.output var preprojectIter = identity[Iterator[ColumnarBatch]] _ - if (useTieredProject && preprojectEnabled) { + if (preprojectEnabled) { // Tiered projection is enabled, check if pre-projection is needed. val boundPreprojections = GpuBindReferences.bindGpuReferencesTiered( - preprojectionList, child.output, useTieredProject) + preprojectionList, child.output, conf) if (boundPreprojections.exprTiers.size > 1) { logDebug("GPU expanding with pre-projection.") // We got some nested expressions, so pre-projection is good to enable. @@ -123,7 +120,7 @@ case class GpuExpandExec( } val boundProjections = projectionsForBind.map { pl => - GpuBindReferences.bindGpuReferencesTiered(pl, attributesForBind, useTieredProject) + GpuBindReferences.bindGpuReferencesTiered(pl, attributesForBind, conf) } child.executeColumnar().mapPartitions { it => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala index 1fa49951848..cf83c5b1264 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala @@ -79,7 +79,6 @@ class GpuGenerateExecSparkPlanMeta( } val output: Seq[Attribute] = gen.requiredChildOutput ++ gen.generatorOutput.take(numFields) GpuExpandExec(projections, output, childPlans.head.convertIfNeeded())( - useTieredProject = conf.isTieredProjectEnabled, preprojectEnabled = conf.isExpandPreprojectEnabled) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGetJsonObject.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGetJsonObject.scala index 75fb300b7a2..73b5aeb2902 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGetJsonObject.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGetJsonObject.scala @@ -16,16 +16,23 @@ package com.nvidia.spark.rapids +import scala.collection.mutable import scala.util.parsing.combinator.RegexParsers import ai.rapids.cudf.{ColumnVector, GetJsonObjectOptions, Scalar} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.jni.JSONUtils +import com.nvidia.spark.rapids.shims.ShimExpression import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, GetJsonObject} +import org.apache.spark.sql.catalyst.expressions.{Alias, ExpectsInputTypes, Expression, GetJsonObject} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.GpuGetStructField +import org.apache.spark.sql.rapids.catalyst.expressions.{GpuCombinable, GpuExpressionCombiner, GpuExpressionEquals} import org.apache.spark.sql.rapids.test.CpuGetJsonObject -import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} +import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.SerializableConfiguration @@ -172,42 +179,192 @@ class GpuGetJsonObjectMeta( override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = { if (!conf.isLegacyGetJsonObjectEnabled) { - GpuGetJsonObject(lhs, rhs, + GpuGetJsonObject(lhs, rhs)( conf.testGetJsonObjectSavePath, conf.testGetJsonObjectSaveRows) } else { - GpuGetJsonObjectLegacy(lhs, rhs, + GpuGetJsonObjectLegacy(lhs, rhs)( conf.testGetJsonObjectSavePath, conf.testGetJsonObjectSaveRows) } } } -case class GpuGetJsonObject( - json: Expression, - path: Expression, - savePathForVerify: Option[String], - saveRowsForVerify: Int) - extends GpuBinaryExpressionArgsAnyScalar - with ExpectsInputTypes { - // Get a Hadoop conf for the JSON Object - val hconf: Option[SerializableConfiguration] = savePathForVerify.map { _ => - val spark = SparkSession.active - new SerializableConfiguration(spark.sparkContext.hadoopConfiguration) +case class GpuMultiGetJsonObject(json: Expression, + paths: Seq[Option[List[PathInstruction]]], + output: StructType)(targetBatchSize: Long, + parallel: Option[Int]) + extends GpuExpression with ShimExpression { + + override def otherCopyArgs: Seq[AnyRef] = + targetBatchSize.asInstanceOf[java.lang.Long] :: + parallel :: + Nil + + override def dataType: DataType = output + + override def nullable: Boolean = false + + override def prettyName: String = "multi_get_json_object" + + lazy private val jniInstructions = paths.map { p => + p.map(JsonPathParser.convertToJniObject) } - val seed = System.nanoTime() - override def left: Expression = json - override def right: Expression = path - override def dataType: DataType = StringType - override def inputTypes: Seq[DataType] = Seq(StringType, StringType) - override def nullable: Boolean = true - override def prettyName: String = "get_json_object" + override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { + val nullIndexes = jniInstructions.zipWithIndex.filter { + case (None, _) => true + case _ => false + }.map(_._2) - private var cachedInstructions: - Option[Option[List[PathInstruction]]] = None + val validPathsWithIndexes = jniInstructions.zipWithIndex.flatMap { + case (Some(arr), idx) => Some((java.util.Arrays.asList(arr: _*), idx)) + case _ => None + } + + val validPaths = validPathsWithIndexes.map(_._1) + withResource(new Array[ColumnVector](validPaths.length)) { validPathColumns => + var validPathsIndex = 0 + withResource(json.columnarEval(batch)) { input => + // The get_json_object implementation will allocate an output that is as large + // as the input for each path being processed. This can cause memory to grow + // by a lot. We want to avoid this, but still try to run as many in parallel + // as we can + val p = parallel.getOrElse { + val inputSize = input.getBase.getDeviceMemorySize + // Our memory budget is 4x the target batch size. This is technically going + // to go over that, but in practice it is okay with the default settings + Math.max(Math.ceil((targetBatchSize * 4.0) / inputSize).toInt, 1) + } + validPaths.grouped(p).foreach { validPathChunk => + withResource(JSONUtils.getJsonObjectMultiplePaths(input.getBase, + java.util.Arrays.asList(validPathChunk: _*))) { chunkedResult => + chunkedResult.foreach { cr => + validPathColumns(validPathsIndex) = cr.incRefCount() + validPathsIndex += 1 + } + } + } + withResource(new Array[ColumnVector](paths.length)) { columns => + if (nullIndexes.nonEmpty) { + val nullCol = withResource(GpuScalar.from(null, StringType)) { s => + ColumnVector.fromScalar(s, batch.numRows()) + } + withResource(nullCol) { _ => + nullIndexes.foreach { idx => + columns(idx) = nullCol.incRefCount() + } + } + } + + validPathsWithIndexes.map(_._2).zipWithIndex.foreach { + case (toIndex, fromIndex) => + columns(toIndex) = validPathColumns(fromIndex).incRefCount() + } + GpuColumnVector.from(ColumnVector.makeStruct(batch.numRows(), columns: _*), dataType) + } + } + } + } + + override def children: Seq[Expression] = Seq(json) +} + +class GetJsonObjectCombiner(private val exp: GpuGetJsonObject) extends GpuExpressionCombiner { + private var outputLocation = 0 + /** + * A mapping between an expression and where in the output struct of + * the MultiGetJsonObject will the output be. + */ + private val toCombine = mutable.HashMap.empty[GpuExpressionEquals, Int] + addExpression(exp) + + override def toString: String = s"GetJsonObjCombiner $toCombine" + + override def hashCode: Int = { + // We already know that we are GetJsonObject, and what we can combine is based + // on the json column being the same. + "GetJsonObject".hashCode + (exp.json.semanticHash() * 17) + } + + override def equals(o: Any): Boolean = o match { + case other: GetJsonObjectCombiner => + exp.json.semanticEquals(other.exp.json) && + // We don't support multi-get with the save path for verify yet + exp.savePathForVerify.isEmpty && + other.exp.savePathForVerify.isEmpty + case _ => false + } + + override def addExpression(e: Expression): Unit = { + val localOutputLocation = outputLocation + outputLocation += 1 + val key = GpuExpressionEquals(e) + if (!toCombine.contains(key)) { + toCombine.put(key, localOutputLocation) + } + } + override def useCount: Int = toCombine.size + + private def fieldName(id: Int): String = + s"_mgjo_$id" + + @scala.annotation.tailrec + private def extractLit(exp: Expression): Option[GpuLiteral] = exp match { + case l: GpuLiteral => Some(l) + case a: Alias => extractLit(a.child) + case _ => None + } + + private lazy val multiGet: GpuMultiGetJsonObject = { + val json = toCombine.head._1.e.asInstanceOf[GpuGetJsonObject].json + val fieldsNPaths = toCombine.toSeq.map { + case (k, id) => + (id, k.e) + }.sortBy(_._1).map { + case (id, e: GpuGetJsonObject) => + val parsedPath = extractLit(e.path).flatMap { s => + import GpuGetJsonObject._ + val str = s.value match { + case u: UTF8String => u.toString + case _ => null.asInstanceOf[String] + } + val pathInstructions = parseJsonPath(str) + if (hasSeparateWildcard(pathInstructions)) { + // If has separate wildcard path, should return all nulls + None + } else { + // Filter out the unneeded instructions before we cache it + pathInstructions.map(JsonPathParser.filterInstructionsForJni) + } + } + (StructField(fieldName(id), e.dataType, e.nullable), parsedPath) + } + val dt = StructType(fieldsNPaths.map(_._1)) + val conf = SQLConf.get + val targetBatchSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf) + val tmp = conf.getConfString("spark.sql.test.multiget.parallel", null) + val parallel = Option(tmp).map(_.toInt) + GpuMultiGetJsonObject(json, fieldsNPaths.map(_._2), dt)(targetBatchSize, parallel) + } + + override def getReplacementExpression(e: Expression): Expression = { + val localId = toCombine(GpuExpressionEquals(e)) + GpuGetStructField(multiGet, localId, Some(fieldName(localId))) + } +} + +object GpuGetJsonObject { def parseJsonPath(path: GpuScalar): Option[List[PathInstruction]] = { if (path.isValid) { - val pathStr = path.getValue.toString() + val pathStr = path.getValue.toString + JsonPathParser.parse(pathStr) + } else { + None + } + } + + def parseJsonPath(pathStr: String): Option[List[PathInstruction]] = { + if (pathStr != null) { JsonPathParser.parse(pathStr) } else { None @@ -222,7 +379,7 @@ case class GpuGetJsonObject( * @param instructions query path instructions * @return true if has separated `Wildcard`, false otherwise. */ - private def hasSeparateWildcard(instructions: Option[List[PathInstruction]]): Boolean = { + def hasSeparateWildcard(instructions: Option[List[PathInstruction]]): Boolean = { import PathInstruction._ def hasSeparate(ins: List[PathInstruction], idx: Int): Boolean = { if (idx == 0) { @@ -247,6 +404,37 @@ case class GpuGetJsonObject( list.indices.exists { idx => hasSeparate(list, idx) } } } +} + +case class GpuGetJsonObject( + json: Expression, + path: Expression)( + val savePathForVerify: Option[String], + val saveRowsForVerify: Int) + extends GpuBinaryExpressionArgsAnyScalar + with ExpectsInputTypes + with GpuCombinable { + import GpuGetJsonObject._ + + // Get a Hadoop conf for the JSON Object + val hconf: Option[SerializableConfiguration] = savePathForVerify.map { _ => + val spark = SparkSession.active + new SerializableConfiguration(spark.sparkContext.hadoopConfiguration) + } + val seed = System.nanoTime() + + override def otherCopyArgs: Seq[AnyRef] = Seq(savePathForVerify, + saveRowsForVerify.asInstanceOf[java.lang.Integer]) + + override def left: Expression = json + override def right: Expression = path + override def dataType: DataType = StringType + override def inputTypes: Seq[DataType] = Seq(StringType, StringType) + override def nullable: Boolean = true + override def prettyName: String = "get_json_object" + + private var cachedInstructions: + Option[Option[List[PathInstruction]]] = None override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = { val fromGpu = cachedInstructions.getOrElse { @@ -290,11 +478,16 @@ case class GpuGetJsonObject( doColumnar(expandedLhs, rhs) } } + + @transient + private lazy val combiner = new GetJsonObjectCombiner(this) + + override def getCombiner(): GpuExpressionCombiner = combiner } case class GpuGetJsonObjectLegacy( json: Expression, - path: Expression, + path: Expression)( savePathForVerify: Option[String], saveRowsForVerify: Int) extends GpuBinaryExpressionArgsAnyScalar @@ -306,6 +499,9 @@ case class GpuGetJsonObjectLegacy( } val seed = System.nanoTime() + override def otherCopyArgs: Seq[AnyRef] = Seq(savePathForVerify, + saveRowsForVerify.asInstanceOf[java.lang.Integer]) + override def left: Expression = json override def right: Expression = path override def dataType: DataType = StringType @@ -359,4 +555,4 @@ case class GpuGetJsonObjectLegacy( doColumnar(expandedLhs, rhs) } } -} +} \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index e02ca838e4f..7fa18b2b782 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -4194,7 +4194,7 @@ object GpuOverrides extends Logging { // The GPU does not yet support conditional joins, so conditions are implemented // as a filter after the join when possible. condition.map(c => GpuFilterExec(c.convertToGpu(), - joinExec)(useTieredProject = this.conf.isTieredProjectEnabled)).getOrElse(joinExec) + joinExec)()).getOrElse(joinExec) } }), exec[HashAggregateExec]( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala index 67c7433aed6..aec84c18fee 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala @@ -100,7 +100,7 @@ class GpuShuffledHashJoinMeta( // For inner joins we can apply a post-join condition for any conditions that cannot be // evaluated directly in a mixed join that leverages a cudf AST expression filterCondition.map(c => GpuFilterExec(c, - joinExec)(useTieredProject = conf.isTieredProjectEnabled)).getOrElse(joinExec) + joinExec)()).getOrElse(joinExec) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala index d4b87f0b094..15ff19ae8c9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala @@ -112,6 +112,6 @@ class GpuSortMergeJoinMeta( // For inner joins we can apply a post-join condition for any conditions that cannot be // evaluated directly in a mixed join that leverages a cudf AST expression filterCondition.map(c => GpuFilterExec(c, - joinExec)(useTieredProject = conf.isTieredProjectEnabled)).getOrElse(joinExec) + joinExec)()).getOrElse(joinExec) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 4b95ab4b6a6..c529ced0ab0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -995,6 +995,20 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .booleanConf .createWithDefault(true) + val ENABLE_COMBINED_EXPR_PREFIX = "spark.rapids.sql.expression.combined." + + val ENABLE_COMBINED_EXPRESSIONS = conf("spark.rapids.sql.combined.expressions.enabled") + .doc("For some expressions it can be much more efficient to combine multiple " + + "expressions together into a single kernel call. This enables or disables that " + + s"feature. Note that this requires that $ENABLE_TIERED_PROJECT is turned on or " + + "else there is no performance improvement. You can also disable this feature for " + + "expressions that support it. Each config is expression specific and starts with " + + s"$ENABLE_COMBINED_EXPR_PREFIX followed by the name of the GPU expression class " + + s"similar to what we do for enabling converting individual expressions to the GPU.") + .internal() + .booleanConf + .createWithDefault(true) + val ENABLE_RLIKE_REGEX_REWRITE = conf("spark.rapids.sql.rLikeRegexRewrite.enabled") .doc("Enable the optimization to rewrite rlike regex to contains in some cases.") .internal() @@ -2812,6 +2826,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isTieredProjectEnabled: Boolean = get(ENABLE_TIERED_PROJECT) + lazy val isCombinedExpressionsEnabled: Boolean = get(ENABLE_COMBINED_EXPRESSIONS) + lazy val isRlikeRegexRewriteEnabled: Boolean = get(ENABLE_RLIKE_REGEX_REWRITE) lazy val isLegacyGetJsonObjectEnabled: Boolean = get(ENABLE_GETJSONOBJECT_LEGACY) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index b1d81e91a8c..487611add08 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -65,7 +65,7 @@ class GpuProjectExecMeta( } } } - GpuProjectExec(gpuExprs, gpuChild)(useTieredProject = conf.isTieredProjectEnabled) + GpuProjectExec(gpuExprs, gpuChild) } } @@ -357,12 +357,8 @@ case class GpuProjectExec( // serde: https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/ // immutable/List.scala#L516 projectList: List[NamedExpression], - child: SparkPlan)( - useTieredProject : Boolean = false - ) extends GpuProjectExecLike { + child: SparkPlan) extends GpuProjectExecLike { - override def otherCopyArgs: Seq[AnyRef] = - Seq[AnyRef](useTieredProject.asInstanceOf[java.lang.Boolean]) override def output: Seq[Attribute] = projectList.map(_.toAttribute) override lazy val additionalMetrics: Map[String, GpuMetric] = Map( @@ -373,7 +369,7 @@ case class GpuProjectExec( val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) val opTime = gpuLongMetric(OP_TIME) val boundProjectList = GpuBindReferences.bindGpuReferencesTiered(projectList, child.output, - useTieredProject) + conf) val rdd = child.executeColumnar() rdd.map { cb => @@ -788,20 +784,18 @@ case class GpuFilterExecMeta( ) extends SparkPlanMeta[FilterExec](filter, conf, parentMetaOpt, rule) { override def convertToGpu(): GpuExec = { GpuFilterExec(childExprs.head.convertToGpu(), - childPlans.head.convertIfNeeded())(useTieredProject = this.conf.isTieredProjectEnabled) + childPlans.head.convertIfNeeded())() } } case class GpuFilterExec( condition: Expression, child: SparkPlan)( - useTieredProject : Boolean = false, override val coalesceAfter: Boolean = true) extends ShimUnaryExecNode with ShimPredicateHelper with GpuExec { override def otherCopyArgs: Seq[AnyRef] = - Seq[AnyRef](useTieredProject.asInstanceOf[java.lang.Boolean], - coalesceAfter.asInstanceOf[java.lang.Boolean]) + Seq[AnyRef](coalesceAfter.asInstanceOf[java.lang.Boolean]) override lazy val additionalMetrics: Map[String, GpuMetric] = Map( OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME)) @@ -841,7 +835,7 @@ case class GpuFilterExec( val opTime = gpuLongMetric(OP_TIME) val rdd = child.executeColumnar() val boundCondition = GpuBindReferences.bindGpuReferencesTiered(Seq(condition), child.output, - useTieredProject) + conf) rdd.flatMap { batch => GpuFilter.filterAndClose(batch, boundCondition, numOutputRows, numOutputBatches, opTime) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala index 861f530915a..94d83300942 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala @@ -142,7 +142,7 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W } val input = if (isPreNeeded) { - GpuProjectExec(pre.toList, childPlans.head.convertIfNeeded())() + GpuProjectExec(pre.toList, childPlans.head.convertIfNeeded()) } else { childPlans.head.convertIfNeeded() } @@ -165,9 +165,9 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W } if (isPostNeeded) { - GpuProjectExec(post.toList, windowExpr)() + GpuProjectExec(post.toList, windowExpr) } else if (windowExpr.output != windowExec.output) { - GpuProjectExec(windowExec.output.toList, windowExpr)() + GpuProjectExec(windowExec.output.toList, windowExpr) } else { windowExpr } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuEquivalentExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuEquivalentExpressions.scala index 139a0ecc009..b8592f326d4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuEquivalentExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuEquivalentExpressions.scala @@ -15,18 +15,19 @@ */ /* Note: This is derived from EquivalentExpressions in Apache Spark - * with changes to adapt it for GPU. + * with a lot of changes to adapt it for GPU. */ package org.apache.spark.sql.rapids.catalyst.expressions import scala.annotation.tailrec import scala.collection.mutable -import com.nvidia.spark.rapids.{GpuAlias, GpuCaseWhen, GpuCoalesce, GpuExpression, GpuIf, GpuLeafExpression, GpuUnevaluable} +import com.nvidia.spark.rapids.{GpuAlias, GpuCaseWhen, GpuCoalesce, GpuExpression, GpuIf, GpuLeafExpression, GpuUnevaluable, RapidsConf} import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, AttributeSet, Expression, LeafExpression, PlanExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.internal.SQLConf /** * This class is used to compute equality of (sub)expression trees. Expressions can be added @@ -55,7 +56,7 @@ class GpuEquivalentExpressions { stats.useCount += 1 true case _ => - map.put(wrapper, GpuExpressionStats(expr)()) + map.put(wrapper, new GpuExpressionStats(expr)) false } } else { @@ -374,6 +375,56 @@ object GpuEquivalentExpressions { } } + /** + * This takes a set of expressions and finds all multi-expressions that can be replaced + * and replaces them. + */ + def replaceMultiExpressions(exprs: Seq[Expression], conf: SQLConf): Seq[Expression] = { + // GpuEquivalentExpressions is really find all expressions that will execute + // unconditionally, or at least it will be. This gives us the ability to + // know that if we combine expressions into multi-expressions then we will + // not accidentally cause side effects to happen that should have been hidden + // by a conditional expression. It also guarantees that the dedupe code + // will combine the multi-expressions so we don't run them multiple times. + val equivalentExpressions = new GpuEquivalentExpressions + exprs.foreach(equivalentExpressions.addExprTree(_)) + val combinableMap = mutable.HashMap.empty[GpuExpressionCombiner, GpuExpressionCombiner] + val enabled = mutable.HashMap.empty[Class[_], Boolean] + def isEnabled(clazz: Class[_]): Boolean = { + enabled.getOrElse(clazz, { + val confKey = RapidsConf.ENABLE_COMBINED_EXPR_PREFIX + clazz.getSimpleName + val isEnabled = conf.getConfString(confKey, "true").trim.toBoolean + enabled.put(clazz, isEnabled) + isEnabled + }) + } + equivalentExpressions.equivalenceMap.values.map(_.expr).foreach { + case e: GpuCombinable if isEnabled(e.getClass) => + val key = e.getCombiner() + combinableMap.get(key).map { c => + c.addExpression(e) + }.getOrElse { + combinableMap.put(key, key) + } + case _ => //Noop + } + val filtered = combinableMap.filter { + case (_, v) => v.useCount > 1 + } + + // We now have a set of values that should be replaced + if (filtered.isEmpty) { + exprs + } else { + exprs.map { expr => + expr.transform { + case c: GpuCombinable if filtered.contains(c.getCombiner()) => + filtered(c.getCombiner()).getReplacementExpression(c) + } + } + } + } + def getExprTiers(expressions: Seq[Expression]): Seq[Seq[Expression]] = { // Get tiers of common expressions val expressionTiers = recurseCommonExpressions(expressions, Seq(expressions)) @@ -413,6 +464,45 @@ object GpuEquivalentExpressions { } } +trait GpuExpressionCombiner { + /** + * Get a hash code that can be used to find other ExpressionCombiner instances + * that could be combined together. + */ + def hashCode: Int + + /** + * Check to see if this ExpressionCombiner could be combined with another one. + */ + def equals(o: Any): Boolean + + /** + * Add another expression to this combiner. Note that equals must have returned true + * already. + */ + def addExpression(e: Expression): Unit + + /** + * For a specific expression return a multi-expression that can be used to replace it. + * Note that deduplication of these will happen as a part of tiered project. + * @param e the expression to be replaced + * @return the replacement expression + */ + def getReplacementExpression(e: Expression): Expression + + /** + * Get the number of expressions that can be combined (excluding duplicates) + */ + def useCount: Int +} + +trait GpuCombinable extends GpuExpression { + /** + * Get a combiner that can be used to find candidates to combine + */ + def getCombiner(): GpuExpressionCombiner +} + /** * Wrapper around an Expression that provides semantic equality. */ @@ -432,10 +522,11 @@ case class GpuExpressionEquals(e: Expression) { * Instead of appending to a mutable list/buffer of Expressions, just update the "flattened" * useCount in this wrapper in-place. */ -case class GpuExpressionStats(expr: Expression)(var useCount: Int = 1) { +class GpuExpressionStats(val expr: Expression) { + var useCount: Int = 1 // This is used to do a fast pre-check for child-parent relationship. For example, expr1 can // only be a parent of expr2 if expr1.height is larger than expr2.height. - lazy val height = getHeight(expr) + lazy val height: Int = getHeight(expr) private def getHeight(tree: Expression): Int = { tree.children.map(getHeight).reduceOption(_ max _).getOrElse(0) + 1 diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala index 5c5cbbf65e1..47bcec60674 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala @@ -537,7 +537,7 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( // internalDoExecuteColumnar. This is to workaround especial handle to build broadcast // batch. val proj = GpuBindReferences.bindGpuReferencesTiered( - postBuildCondition, p.child.output, true) + postBuildCondition, p.child.output, conf) withResource(makeBuiltBatchInternal(relation, buildTime, buildDataSize)) { cb => proj.project(cb) } diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala index 549ac204454..71d2892e50e 100644 --- a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala +++ b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala @@ -131,7 +131,7 @@ object GpuFileFormatWriter extends Logging { plan } else { val projectList = GpuV1WriteUtils.convertGpuEmptyToNull(plan.output, partitionSet) - if (projectList.nonEmpty) GpuProjectExec(projectList, plan)() else plan + if (projectList.nonEmpty) GpuProjectExec(projectList, plan) else plan } val writerBucketSpec = BucketingUtilsShim.getWriterBucketSpec(bucketSpec, dataColumns, diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala index 901dc83d576..0fd990133d3 100644 --- a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala +++ b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala @@ -58,16 +58,16 @@ class GpuBroadcastNestedLoopJoinMeta( // If ast-able, try to split if needed. Otherwise, do post-filter val isAstCondition = canJoinCondAstAble() - if(isAstCondition){ + if (isAstCondition) { // Try to extract non-ast-able conditions from join conditions val (remains, leftExpr, rightExpr) = AstUtil.extractNonAstFromJoinCond( conditionMeta, left.output, right.output, true) // Reconstruct the childern with wrapped project node if needed. val leftChild = - if (!leftExpr.isEmpty) GpuProjectExec(leftExpr ++ left.output, left)(true) else left + if (!leftExpr.isEmpty) GpuProjectExec(leftExpr ++ left.output, left) else left val rightChild = - if (!rightExpr.isEmpty) GpuProjectExec(rightExpr ++ right.output, right)(true) else right + if (!rightExpr.isEmpty) GpuProjectExec(rightExpr ++ right.output, right) else right val postBuildCondition = if (gpuBuildSide == GpuBuildLeft) leftExpr ++ left.output else rightExpr ++ right.output @@ -89,7 +89,7 @@ class GpuBroadcastNestedLoopJoinMeta( GpuProjectExec( GpuBroadcastNestedLoopJoinExecBase.output( join.joinType, left.output, right.output).toList, - joinExec)(false) + joinExec) } } else { join.joinType match { diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala index 34d873f26e0..9c9784fc31c 100644 --- a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala @@ -63,9 +63,9 @@ class GpuBroadcastNestedLoopJoinMeta( // Reconstruct the child with wrapped project node if needed. val leftChild = - if (!leftExpr.isEmpty) GpuProjectExec(leftExpr ++ left.output, left)(true) else left + if (!leftExpr.isEmpty) GpuProjectExec(leftExpr ++ left.output, left) else left val rightChild = - if (!rightExpr.isEmpty) GpuProjectExec(rightExpr ++ right.output, right)(true) else right + if (!rightExpr.isEmpty) GpuProjectExec(rightExpr ++ right.output, right) else right val postBuildCondition = if (gpuBuildSide == GpuBuildLeft) leftExpr ++ left.output else rightExpr ++ right.output @@ -88,7 +88,7 @@ class GpuBroadcastNestedLoopJoinMeta( GpuProjectExec( GpuBroadcastNestedLoopJoinExecBase.output( join.joinType, left.output, right.output).toList, - joinExec)(false) + joinExec) } } else { val condition = conditionMeta.map(_.convertToGpu()) diff --git a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala index 2241d6771f8..d5e8637c6d4 100644 --- a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala +++ b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala @@ -228,7 +228,7 @@ object GpuFileFormatWriter extends Logging { plan } else { val projectList = GpuV1WriteUtils.convertGpuEmptyToNull(plan.output, partitionSet) - if (projectList.nonEmpty) GpuProjectExec(projectList, plan)() else plan + if (projectList.nonEmpty) GpuProjectExec(projectList, plan) else plan } writeAndCommit(job, description, committer) { diff --git a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala index b6ae72a8493..a196cc73149 100644 --- a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala +++ b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala @@ -59,9 +59,9 @@ class GpuBroadcastNestedLoopJoinMeta( // Reconstruct the child with wrapped project node if needed. val leftChild = - if (!leftExpr.isEmpty) GpuProjectExec(leftExpr ++ left.output, left)(true) else left + if (!leftExpr.isEmpty) GpuProjectExec(leftExpr ++ left.output, left) else left val rightChild = - if (!rightExpr.isEmpty) GpuProjectExec(rightExpr ++ right.output, right)(true) else right + if (!rightExpr.isEmpty) GpuProjectExec(rightExpr ++ right.output, right) else right val postBuildCondition = if (gpuBuildSide == GpuBuildLeft) leftExpr ++ left.output else rightExpr ++ right.output @@ -83,7 +83,7 @@ class GpuBroadcastNestedLoopJoinMeta( GpuProjectExec( GpuBroadcastNestedLoopJoinExecBase.output( join.joinType, left.output, right.output).toList, - joinExec)(false) + joinExec) } } else { val condition = conditionMeta.map(_.convertToGpu()) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregateRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregateRetrySuite.scala index d58c7a7df7c..58608ed132c 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregateRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregateRetrySuite.scala @@ -24,6 +24,7 @@ import com.nvidia.spark.rapids.jni.RmmSpark import org.mockito.Mockito._ import org.scalatestplus.mockito.MockitoSugar +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.aggregate.{CudfAggregate, CudfSum} import org.apache.spark.sql.types.{DataType, IntegerType, LongType} @@ -58,7 +59,7 @@ class HashAggregateRetrySuite when(mockMetrics.numAggOps).thenReturn(NoopMetric) val aggHelper = spy(new AggHelper( Seq.empty, Seq.empty, Seq.empty, - forceMerge = false, isSorted = false)) + forceMerge = false, new SQLConf(), isSorted = false)) // mock out a reduction on the first column val aggs = new ArrayBuffer[CudfAggregate]() @@ -78,7 +79,8 @@ class HashAggregateRetrySuite def makeGroupByAggHelper(forceMerge: Boolean): AggHelper = { val aggHelper = spy(new AggHelper( Seq.empty, Seq.empty, Seq.empty, - forceMerge = forceMerge, isSorted = false)) + forceMerge = forceMerge, new SQLConf(), + isSorted = false)) // mock out a group by with the first column as key, and second column // as a group by sum diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/NonDeterministicRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/NonDeterministicRetrySuite.scala index f0c71c87fea..d018726ef35 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/NonDeterministicRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/NonDeterministicRetrySuite.scala @@ -22,6 +22,7 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq import com.nvidia.spark.rapids.jni.RmmSpark import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuGreaterThan import org.apache.spark.sql.rapids.catalyst.expressions.GpuRand import org.apache.spark.sql.types.{DoubleType, IntegerType} @@ -68,14 +69,16 @@ class NonDeterministicRetrySuite extends RmmSparkRetrySuiteBase { GpuAlias(GpuRand(GpuLiteral(RAND_SEED)), "rand")()) Seq(true, false).foreach { useTieredProject => + val conf = new SQLConf() + conf.setConfString(RapidsConf.ENABLE_TIERED_PROJECT.key, useTieredProject.toString) // expression should be retryable val boundProjectRand = GpuBindReferences.bindGpuReferencesTiered(projectRand(), - batchAttrs, useTieredProject) + batchAttrs, conf) assert(boundProjectRand.areAllRetryable) // project with and without retry val batches = Seq(true, false).safeMap { forceRetry => val boundProjectList = GpuBindReferences.bindGpuReferencesTiered( - projectRand() ++ batchAttrs, batchAttrs, useTieredProject) + projectRand() ++ batchAttrs, batchAttrs, conf) assert(boundProjectList.areAllRetryable) val sb = closeOnExcept(buildBatch()) { cb => @@ -110,10 +113,12 @@ class NonDeterministicRetrySuite extends RmmSparkRetrySuiteBase { GpuLiteral.create(0.1d, DoubleType))) Seq(true, false).foreach { useTieredProject => + val conf = new SQLConf() + conf.setConfString(RapidsConf.ENABLE_TIERED_PROJECT.key, useTieredProject.toString) // filter with and without retry val tables = Seq(true, false).safeMap { forceRetry => val boundCondition = GpuBindReferences.bindGpuReferencesTiered(filterRand(), - batchAttrs, useTieredProject) + batchAttrs, conf) assert(boundCondition.areAllRetryable) val cb = buildBatch() diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala index 5039032a738..acc2e98c2b9 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal, NamedExpression} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuAdd import org.apache.spark.sql.types._ @@ -102,7 +103,7 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite { val b = AttributeReference("b", LongType)() val simpleAdd = GpuAdd(a, b, false) val fullAdd = GpuAlias(GpuAdd(simpleAdd, simpleAdd, false), "ret")() - val tp = GpuBindReferences.bindGpuReferencesTiered(Seq(fullAdd), Seq(a, b), true) + val tp = GpuBindReferences.bindGpuReferencesTiered(Seq(fullAdd), Seq(a, b), new SQLConf()) val sb = buildProjectBatch() RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,