Skip to content

Commit

Permalink
Multi-get_json_object [databricks] (#11200)
Browse files Browse the repository at this point in the history
  • Loading branch information
revans2 authored Jul 20, 2024
1 parent daeaa3c commit 5be4bd5
Show file tree
Hide file tree
Showing 23 changed files with 434 additions and 113 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -100,7 +100,7 @@ abstract class GpuOptimisticTransactionBase
GpuAlias(GpuEmpty2Null(p), p.name)()
case attr => attr
}
if (needConvert) GpuProjectExec(projectList.toList, plan)() else plan
if (needConvert) GpuProjectExec(projectList.toList, plan) else plan
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -80,7 +80,7 @@ abstract class GpuOptimisticTransactionBase(
GpuAlias(GpuEmpty2Null(p), p.name)()
case attr => attr
}
if (needConvert) GpuProjectExec(projectList.toList, plan)() else plan
if (needConvert) GpuProjectExec(projectList.toList, plan) else plan
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan}
import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.aggregate.{CpuToGpuAggregateBufferConverter, CudfAggregate, GpuAggregateExpression, GpuToCpuAggregateBufferConverter}
import org.apache.spark.sql.rapids.execution.{GpuShuffleMeta, TrampolineUtil}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -171,15 +172,16 @@ object AggregateModeInfo {
* the merge steps for each aggregate function
* @param isSorted if the batch is sorted this is set to true and is passed to cuDF
* as an optimization hint
* @param useTieredProject if true, used tiered project for input projections
* @param conf A configuration used to control TieredProject operations in an
* aggregation.
*/
class AggHelper(
inputAttributes: Seq[Attribute],
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[GpuAggregateExpression],
forceMerge: Boolean,
isSorted: Boolean = false,
useTieredProject: Boolean = true) extends Serializable {
conf: SQLConf,
isSorted: Boolean = false) extends Serializable {

private var doSortAgg = isSorted

Expand Down Expand Up @@ -261,11 +263,11 @@ class AggHelper(
inputAttributes
}
val preStepBound = GpuBindReferences.bindGpuReferencesTiered(preStep.toList,
preStepAttributes.toList, useTieredProject)
preStepAttributes.toList, conf)

// a bound expression that is applied after the cuDF aggregate
private val postStepBound = GpuBindReferences.bindGpuReferencesTiered(postStep.toList,
postStepAttr.toList, useTieredProject)
postStepAttr.toList, conf)

/**
* Apply the "pre" step: preMerge for merge, or pass-through in the update case
Expand Down Expand Up @@ -723,7 +725,7 @@ class GpuMergeAggregateIterator(
modeInfo: AggregateModeInfo,
metrics: GpuHashAggregateMetrics,
configuredTargetBatchSize: Long,
useTieredProject: Boolean,
conf: SQLConf,
allowNonFullyAggregatedOutput: Boolean,
skipAggPassReductionRatio: Double,
localInputRowsCount: LocalGpuMetric)
Expand Down Expand Up @@ -937,7 +939,7 @@ class GpuMergeAggregateIterator(

private lazy val concatAndMergeHelper =
new AggHelper(inputAttributes, groupingExpressions, aggregateExpressions,
forceMerge = true, useTieredProject = useTieredProject)
forceMerge = true, conf = conf)

/**
* Concatenate batches together and perform a merge aggregation on the result. The input batches
Expand Down Expand Up @@ -1017,7 +1019,7 @@ class GpuMergeAggregateIterator(

private val mergeSortedHelper =
new AggHelper(inputAttributes, groupingExpressions, aggregateExpressions,
forceMerge = true, isSorted = true, useTieredProject = useTieredProject)
forceMerge = true, conf, isSorted = true)

override def next(): ColumnarBatch = {
// batches coming out of the sort need to be merged
Expand Down Expand Up @@ -1265,7 +1267,6 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
aggregateExpressions.map(_.convertToGpu().asInstanceOf[GpuAggregateExpression])
val gpuGroupingExpressions =
groupingExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression])
val useTiered = conf.isTieredProjectEnabled

// Sorting before we do an aggregation helps if the size of the input data is
// smaller than the size of the output data. But it is an aggregation how can
Expand All @@ -1291,7 +1292,7 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
gpuChild.output, inputAggBufferAttributes)
val preProcessAggHelper = new AggHelper(
inputAttrs, gpuGroupingExpressions, gpuAggregateExpressions,
forceMerge = false, useTieredProject = useTiered)
forceMerge = false, conf = agg.conf)

// We are going to estimate the growth by looking at the estimated size the output could
// be compared to the estimated size of the input (both based off of the schemas).
Expand Down Expand Up @@ -1327,7 +1328,6 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
resultExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]),
gpuChild,
conf.gpuTargetBatchSizeBytes,
useTiered,
estimatedPreProcessGrowth,
conf.forceSinglePassPartialSortAgg,
allowSinglePassAgg,
Expand Down Expand Up @@ -1414,12 +1414,11 @@ abstract class GpuTypedImperativeSupportedAggregateExecMeta[INPUT <: BaseAggrega
retExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]),
childPlans.head.convertIfNeeded(),
conf.gpuTargetBatchSizeBytes,
conf.isTieredProjectEnabled,
// For now we are just going to go with the original hash aggregation
1.0,
false,
false,
false,
forceSinglePassAgg = false,
allowSinglePassAgg = false,
allowNonFullyAggregatedOutput = false,
1)
} else {
super.convertToGpu()
Expand Down Expand Up @@ -1782,7 +1781,6 @@ case class GpuHashAggregateExec(
resultExpressions: Seq[NamedExpression],
child: SparkPlan,
configuredTargetBatchSize: Long,
configuredTieredProjectEnabled: Boolean,
estimatedPreProcessGrowth: Double,
forceSinglePassAgg: Boolean,
allowSinglePassAgg: Boolean,
Expand Down Expand Up @@ -1853,7 +1851,6 @@ case class GpuHashAggregateExec(
val resultExprs = resultExpressions
val modeInfo = AggregateModeInfo(uniqueModes)
val targetBatchSize = configuredTargetBatchSize
val useTieredProject = configuredTieredProjectEnabled

val rdd = child.executeColumnar()

Expand All @@ -1864,7 +1861,7 @@ case class GpuHashAggregateExec(
expectedOrdering) && expectedOrdering.nonEmpty
val localEstimatedPreProcessGrowth = estimatedPreProcessGrowth

val boundGroupExprs = GpuBindReferences.bindGpuReferencesTiered(groupingExprs, inputAttrs, true)
val boundGroupExprs = GpuBindReferences.bindGpuReferencesTiered(groupingExprs, inputAttrs, conf)

rdd.mapPartitions { cbIter =>
val postBoundReferences = GpuAggFinalPassIterator.setupReferences(groupingExprs,
Expand All @@ -1873,7 +1870,7 @@ case class GpuHashAggregateExec(
new DynamicGpuPartialSortAggregateIterator(cbIter, inputAttrs, groupingExprs,
boundGroupExprs, aggregateExprs, aggregateAttrs, resultExprs, modeInfo,
localEstimatedPreProcessGrowth, alreadySorted, expectedOrdering,
postBoundReferences, targetBatchSize, aggMetrics, useTieredProject,
postBoundReferences, targetBatchSize, aggMetrics, conf,
localForcePre, localAllowPre, allowNonFullyAggregatedOutput, skipAggPassReductionRatio)
}
}
Expand Down Expand Up @@ -1990,7 +1987,7 @@ class DynamicGpuPartialSortAggregateIterator(
postBoundReferences: BoundExpressionsModeAggregates,
configuredTargetBatchSize: Long,
metrics: GpuHashAggregateMetrics,
useTiered: Boolean,
conf: SQLConf,
forceSinglePassAgg: Boolean,
allowSinglePassAgg: Boolean,
allowNonFullyAggregatedOutput: Boolean,
Expand Down Expand Up @@ -2092,7 +2089,7 @@ class DynamicGpuPartialSortAggregateIterator(
modeInfo,
metrics,
configuredTargetBatchSize,
useTiered,
conf,
allowNonFullyAggregatedOutput,
skipAggPassReductionRatio,
localInputRowsMetrics)
Expand All @@ -2104,7 +2101,7 @@ class DynamicGpuPartialSortAggregateIterator(
if (aggIter.isEmpty) {
val preProcessAggHelper = new AggHelper(
inputAttrs, groupingExprs, aggregateExprs,
forceMerge = false, isSorted = true, useTieredProject = useTiered)
forceMerge = false, isSorted = true, conf = conf)
val (inputIter, doSinglePassAgg) = if (allowSinglePassAgg) {
if (forceSinglePassAgg || alreadySorted) {
(cbIter, true)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@ import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSeq, Expression, ExprId, NamedExpression, SortOrder}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.catalyst.expressions.GpuEquivalentExpressions
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -130,18 +131,23 @@ object GpuBindReferences extends Logging {

/**
* A helper function to bind given expressions to an input schema where the expressions are
* to be processed on the GPU, and the result type indicates this. If runTiered is true
* Common sub-expressions will be factored out where possible to reduce the runtime and memory.
* If set to false a GpuTieredProject object will still be returned, but no common
* sub-expressions will be factored out.
* to be processed on the GPU, and the result type indicates this.
* Some expressions that can be combined into a single expression call as well as
* common sub-expressions may be factored out where possible to reduce the runtime and memory.
* All of these can be controlled by the configuration passed in.
*/
def bindGpuReferencesTiered[A <: Expression](
expressions: Seq[A],
input: AttributeSeq,
runTiered: Boolean): GpuTieredProject = {
conf: SQLConf): GpuTieredProject = {

if (runTiered) {
val exprTiers = GpuEquivalentExpressions.getExprTiers(expressions)
if (RapidsConf.ENABLE_TIERED_PROJECT.get(conf)) {
val replaced = if (RapidsConf.ENABLE_COMBINED_EXPRESSIONS.get(conf)) {
GpuEquivalentExpressions.replaceMultiExpressions(expressions, conf)
} else {
expressions
}
val exprTiers = GpuEquivalentExpressions.getExprTiers(replaced)
val inputTiers = GpuEquivalentExpressions.getInputTiers(exprTiers, input)
// Update ExprTiers to include the columns that are pass through and drop unneeded columns
val newExprTiers = exprTiers.zipWithIndex.map {
Expand All @@ -158,10 +164,27 @@ object GpuBindReferences extends Logging {
exprTier
}
}
GpuTieredProject(newExprTiers.zip(inputTiers).map {
val tiered = newExprTiers.zip(inputTiers).map {
case (es: Seq[Expression], is: AttributeSeq) =>
es.map(GpuBindReferences.bindGpuReference(_, is)).toList
})
}
logTrace {
"INPUT:\n" +
expressions.zipWithIndex.map {
case (expr, idx) =>
s"\t$idx:\t$expr"
}.mkString("\n") +
"\nOUTPUT:\n" +
tiered.zipWithIndex.map {
case (exprs, tier) =>
s"\tTIER $tier\n" +
exprs.zipWithIndex.map {
case (expr, idx) =>
s"\t\t$idx:\t$expr"
}.mkString("\n")
}.mkString("\n")
}
GpuTieredProject(tiered)
} else {
GpuTieredProject(Seq(GpuBindReferences.bindGpuReferences(expressions, input)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ class GpuExpandExecMeta(
override def convertToGpu(): GpuExec = {
val projections = gpuProjections.map(_.map(_.convertToGpu()))
GpuExpandExec(projections, expand.output, childPlans.head.convertIfNeeded())(
useTieredProject = conf.isTieredProjectEnabled,
preprojectEnabled = conf.isExpandPreprojectEnabled)
}
}
Expand All @@ -71,11 +70,9 @@ case class GpuExpandExec(
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
child: SparkPlan)(
useTieredProject: Boolean = false,
preprojectEnabled: Boolean = false) extends ShimUnaryExecNode with GpuExec {

override def otherCopyArgs: Seq[AnyRef] = Seq[AnyRef](
useTieredProject.asInstanceOf[java.lang.Boolean],
preprojectEnabled.asInstanceOf[java.lang.Boolean])

private val PRE_PROJECT_TIME = "preprojectTime"
Expand All @@ -102,10 +99,10 @@ case class GpuExpandExec(
var projectionsForBind = projections
var attributesForBind = child.output
var preprojectIter = identity[Iterator[ColumnarBatch]] _
if (useTieredProject && preprojectEnabled) {
if (preprojectEnabled) {
// Tiered projection is enabled, check if pre-projection is needed.
val boundPreprojections = GpuBindReferences.bindGpuReferencesTiered(
preprojectionList, child.output, useTieredProject)
preprojectionList, child.output, conf)
if (boundPreprojections.exprTiers.size > 1) {
logDebug("GPU expanding with pre-projection.")
// We got some nested expressions, so pre-projection is good to enable.
Expand All @@ -123,7 +120,7 @@ case class GpuExpandExec(
}

val boundProjections = projectionsForBind.map { pl =>
GpuBindReferences.bindGpuReferencesTiered(pl, attributesForBind, useTieredProject)
GpuBindReferences.bindGpuReferencesTiered(pl, attributesForBind, conf)
}

child.executeColumnar().mapPartitions { it =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ class GpuGenerateExecSparkPlanMeta(
}
val output: Seq[Attribute] = gen.requiredChildOutput ++ gen.generatorOutput.take(numFields)
GpuExpandExec(projections, output, childPlans.head.convertIfNeeded())(
useTieredProject = conf.isTieredProjectEnabled,
preprojectEnabled = conf.isExpandPreprojectEnabled)
}

Expand Down
Loading

0 comments on commit 5be4bd5

Please sign in to comment.