-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support GpuSubqueryBroadcast for DPP [databricks] #4150
Support GpuSubqueryBroadcast for DPP [databricks] #4150
Conversation
build |
build |
build |
build |
add |
build |
build |
@pytest.mark.parametrize('store_format', ['parquet', 'orc'], ids=idfn) | ||
@pytest.mark.parametrize('s_index', list(range(len(_statements))), ids=idfn) | ||
@pytest.mark.skipif(is_before_spark_320(), reason="Only in Spark 3.2.0+ AQE and DPP can be both enabled") | ||
def test_dpp_reuse_broadcast_exchange(aqe_on, store_format, s_index, spark_tmp_table_factory): | ||
def test_dpp_reuse_broadcast_exchange_aqe_on(store_format, s_index, spark_tmp_table_factory): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we miss the corresponding test with AQE off, or is that covered in some other existing test and was not really needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I named the test of AQE off as test_dpp_reuse_broadcast_exchange
. I appended the suffix _aqe_off
to clarify the intention of the tests.
# When AQE enabled, the broadcast exchange can not be reused in current, because spark-rapids | ||
# will plan GpuBroadcastToCpu for exchange reuse. Meanwhile, the original broadcast exchange is | ||
# simply replaced by GpuBroadcastExchange. Therefore, the reuse can not work since | ||
# GpuBroadcastToCpu is not semantically equal to GpuBroadcastExchange. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this something that we should fix? Should be combine the two classes together so that they are the same thing and it does not matter if you are reading the data on the CPU or the GPU?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. IMO, with the help of the new method SerializeConcatHostBuffersDeserializeBatch.hostBatches
, we can change the role of GpuBroadcastToCpu
, making it as a wrapper of GpuBroadcastExchangeExec
. Therefore, we can reuse the GpuBroadcast in terms of serialized host buffers. I tried in my local environment, it works. I would like to create a separate PR for this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like a good plan.
willNotWorkOnGpu("underlying BroadcastExchange can not run on the GPU.") | ||
} | ||
case _ => | ||
willNotWorkOnGpu("no available BroadcastExchange for reuse.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am really confused by this. We cannot run the SubqueryBroadcastExec
on the GPU because "no available BroadcastExchange for reuse."? Can we have a better explanation? Our end users will read this and get confused. I am also a little concerned that they will think it is an issue that they need to try and fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I refined the reason here.
override val childPlans: Seq[SparkPlanMeta[SparkPlan]] = Nil | ||
|
||
override def tagPlanForGpu(): Unit = s.child match { | ||
case ex @ BroadcastExchangeExec(_, c2r: GpuColumnarToRowExecParent) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am more than a little confused. When exactly does this happen? Moving the comments from below up closer to the top would be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
// GpuSubqueryBroadcast -> GpuBroadcastExchange -> GpuPlanStack... | ||
override def convertToGpu(): GpuExec = s.child match { | ||
case ex @ BroadcastExchangeExec(_, c2r: GpuColumnarToRowExecParent) => | ||
val exMeta = new GpuBroadcastMeta(ex.copy(child = c2r.child), conf, p, r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we do this twice. Once to tag and once here. It would be nice if we could cache it so we are not wasting work in the common case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refined.
SQLExecution.withExecutionId(sparkSession, executionId) { | ||
withResource(new NvtxWithMetrics("broadcast collect", NvtxColor.GREEN, | ||
collectTime)) { _ => | ||
val serBatch = child.executeBroadcast[SerializeConcatHostBuffersDeserializeBatch]().value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is running on the driver, but assumes that it has access to a GPU. It does not. We have to do any/all transformation on the CPU.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @revans2, I overlooked the access of GPU at the first time. I refactored the implementation. For now, the GpuSubqueryBroadcast is entirely on host.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we please try to test this on the YARN cluster or some place where there is no GPU and ideally no CUDA on the nodes that are running the driver? I just want to be sure that we don't accidentally initialize the CUDA context when we try to touch the HostColumnVectors. I think in the other places we only touched buffers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @revans2, I ran a spark yarn test with a driver image which didn't contain NVIDIA-driver and CUDA. The GpuSubqueryBroadcast
didn't throw any exception.
build |
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is better than we have today, but because this is related to DPP and AQE I really would like more eyes on this. @jlowe and @andygrove could you also take a look?
# When AQE enabled, the broadcast exchange can not be reused in current, because spark-rapids | ||
# will plan GpuBroadcastToCpu for exchange reuse. Meanwhile, the original broadcast exchange is | ||
# simply replaced by GpuBroadcastExchange. Therefore, the reuse can not work since | ||
# GpuBroadcastToCpu is not semantically equal to GpuBroadcastExchange. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like a good plan.
build |
Sorry, I missed this notification. I am going to review this today. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I would like to review the follow-on PR related to refactoring how we handle GpuBroadcastToCpu
as well.
Signed-off-by: sperlingxx lovedreamf@gmail.com
Closes #4027
Current PR is to support reusing broadcast exchange for SubqueryBroadcast (which inserted by DPP) on the GPU. To achieve this goal, following steps are essential:
FileSourceScanExec
. Captures, tags and convertsSubqueryBroadcastExec
insideDynamicPruningExpression
. We shall build independent RapidsMeta forSubqueryBroadcastExec
inside dynamic partiton filters rather than adding them as the children of scan meta, because it is possible that the FileSourceScan is on the CPU, while the dynamic partitionFilters are on the GPU. And vice versa.SubqueryBroadcastExec
and underlying exchange to GPU if possible. The rulePlanDynamicPruningFilters
will insertSubqueryBroadcastExec
if there exists available broadcast exchange for reuse. The plan stack looks like:SubqueryBroadcast -> BroadcastExchange -> executedPlan
Since the GPU overrides rule has been applied on executedPlan, if the wrapped subquery can run on the GPU, the plan stack becomes:
SubqueryBroadcast -> BroadcastExchange -> GpuColumnarToRow -> GpuPlanStack...
To reuse BroadcastExchange on the GPU, we shall transform above pattern into:
GpuSubqueryBroadcast -> GpuBroadcastExchange -> GpuPlanStack...
GpuSubqueryBroadcastExec
, which is similiar toGpuBroadcastToCpuExec
. The major difference is whether to reuse existingGpuBroadcastExec
.In addition, current PR can only reuse GpuBroadcast when AQE is off. We need to modify
GpuBroadcastToCpuExec
to reuseGpuBroadcast with AQE on.