-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support existence join type for broadcast nested loop join #5301
Conversation
Signed-off-by: Chong Gao <res_life@163.com>
build |
JoinGatherer for existence join
This is to add a new JoinGatherer to handle this existence join. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a quick review, and got some comments.
|
||
|
||
@ignore_order | ||
@pytest.mark.parametrize('aqeEnabled', [pytest.param(True, id='aqe:on'), pytest.param(False, id='aqe:off')]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pytest.mark.parametrize('aqeEnabled', [pytest.param(True, id='aqe:on'), pytest.param(False, id='aqe:off')]) | |
@pytest.mark.parametrize('aqeEnabled', [True, False], ids=['AQE_ON', 'AQE_OFF']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a consistent id scheme across the pytest code base regardless of the choice made
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
outOfBoundsPolicy: OutOfBoundsPolicy): JoinGatherer = | ||
new JoinGathererImpl(gatherMap, inputData, outOfBoundsPolicy) | ||
outOfBoundsPolicy: OutOfBoundsPolicy, isExistenceJoin: Boolean = false): JoinGatherer = { | ||
if (!isExistenceJoin) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT:
if (isExistenceJoin) {
new JoinGathererForExistenceJoin(gatherMap, inputData, outOfBoundsPolicy)
} else {
new JoinGathererImpl(gatherMap, inputData, outOfBoundsPolicy)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* </code> | ||
*/ | ||
class JoinGathererForExistenceJoin( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT
class JoinGathererForExistenceJoin( | |
class ExistenceJoinGathererImpl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need now.
// `exists.numRows` == `batch.numRows`, | ||
// with true or false in it indicating if the row is gathered | ||
withResource(genExistsColumn(gatherView, batch.numRows())) { exists => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I perfer to generate the column of existence according to the subTableCbTmp
, not the whole batch. Then there is no need to split it next. Instead you can append the generated 'existence' column to the sub table directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just as @firestarman mentioned, this may lead to a lot of duplicated runs of full-scale scatter. IIUC, if we split the gather map as left table, we need to transform the global offset to the local offset before scattering. Perhaps there is another approach, just cache the exist column for reuse over mini batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emit the batch directly as the exists
boolean column is relatively small.
It's the same as GpuHashJoin
does, no need to split now.
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/JoinGatherer.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/JoinGatherer.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/JoinGatherer.scala
Outdated
Show resolved
Hide resolved
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nits on the class names used but otherwise looking good.
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala
Outdated
Show resolved
Hide resolved
...in/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Chong Gao <res_life@163.com>
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, some nits
@@ -601,7 +572,7 @@ class ExistenceJoinIterator( | |||
* the value "false", scattering "true" into column FC will produce the "exists" | |||
* column of ExistenceJoin | |||
*/ | |||
private def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { | |||
def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { | |
override def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
use(condition) | ||
|
||
def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { | |
override def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
use(condition) | ||
|
||
def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { | |
override def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -601,7 +572,7 @@ class ExistenceJoinIterator( | |||
* the value "false", scattering "true" into column FC will produce the "exists" | |||
* column of ExistenceJoin | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment should be moved to the abstract parent class method declaration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -601,7 +572,7 @@ class ExistenceJoinIterator( | |||
* the value "false", scattering "true" into column FC will produce the "exists" | |||
* column of ExistenceJoin | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment should be moved to the abstract parent class method declaration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
build |
build |
@@ -601,7 +572,7 @@ class ExistenceJoinIterator( | |||
* the value "false", scattering "true" into column FC will produce the "exists" | |||
* column of ExistenceJoin | |||
*/ | |||
private def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { | |||
def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { | |
override def existsScatterMap(leftColumnarBatch: ColumnarBatch): GatherMap = { |
Closes #5034
Support existence join type for broadcast nested loop join
Signed-off-by: Chong Gao res_life@163.com