-
Notifications
You must be signed in to change notification settings - Fork 169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support Broadcast HashJoin #211
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is based on #194.
c3ed3ee
to
1b50512
Compare
// Release the previous batch. | ||
// If it is not released, when closing the reader, arrow library will complain about | ||
// memory leak. | ||
if (currentBatch != null) { | ||
currentBatch.close() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to release the batch before loading next batch. Because ArrowStreamReader
loads data into same vectors of root internally. After loading next batch, close
will release the just loaded batch instead of previous batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this related to the memory leak we saw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because ArrowStreamReader loads data into same vectors of root internally. After loading next batch, close will release the just loaded batch instead of previous batch.
This sounds like a data corruption problem. If the just loaded batch is closed/released, the just loaded ColumnarBatch would be corrupted? But it seems like that the CI passes without any issue previously.
When working on #206, I also found out it might be inconvenient to use Arrow Java's memory API. It requires extra caution to allocate and release ArrowBuf correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this related to the memory leak we saw?
It's not, although I suspected it before too. For shuffle, a channel only contains one batch, so ArrowReaderIterator
doesn't hit this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like a data corruption problem. If the just loaded batch is closed/released, the just loaded ColumnarBatch would be corrupted? But it seems like that the CI passes without any issue previously.
When working on #206, I also found out it might be inconvenient to use Arrow Java's memory API. It requires extra caution to allocate and release ArrowBuf correctly.
Due to #211 (comment), this issue is not exposed before.
I feel that Arrow Java API is hard to use and somehow counter-intuitive, especially compared with arrow-rs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I feel the same pain when using Java Arrow. I think in the long term we'd better to switch away from it. It should be relatively easy except the Java Arrow Flight feature.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #211 +/- ##
============================================
+ Coverage 33.32% 33.38% +0.06%
- Complexity 769 776 +7
============================================
Files 107 108 +1
Lines 37037 37099 +62
Branches 8106 8129 +23
============================================
+ Hits 12342 12386 +44
- Misses 22098 22099 +1
- Partials 2597 2614 +17 ☔ View full report in Codecov by Sentry. |
1b50512
to
772588b
Compare
50895b1
to
187ba36
Compare
spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
Show resolved
Hide resolved
cc @sunchao |
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
Outdated
Show resolved
Hide resolved
// Release the previous batch. | ||
// If it is not released, when closing the reader, arrow library will complain about | ||
// memory leak. | ||
if (currentBatch != null) { | ||
currentBatch.close() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this related to the memory leak we saw?
var rowCount = 0 | ||
|
||
batches.foreach { batch => | ||
val (fieldVectors, batchProviderOpt) = getBatchFieldVectors(batch) | ||
val root = schemaRoot.getOrElse(new VectorSchemaRoot(fieldVectors.asJava)) | ||
val root = new VectorSchemaRoot(fieldVectors.asJava) | ||
val provider = batchProviderOpt.getOrElse(dictionaryProvider) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One related question what if incoming batches have different dictionary provider?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the batch has its provider, it should be returned in batchProviderOpt
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the writer is reused. Once the writer is created, new dictionary provider(if different from previous one) from new batches is never used/ written?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see. I suppose that the dictionary provider is same across batches. This seems to be the reason why there is dictionary provider, i.e. to store dictionary values for arrays/batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. It seems getBatchFieldVectors
only checks same dictionary provider across arrays but not batches. Maybe we should add that too? Anyway, it's kind of out of this PR's scope. Maybe in a separate issue to track that.
spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
var rowCount = 0 | ||
|
||
batches.foreach { batch => | ||
val (fieldVectors, batchProviderOpt) = getBatchFieldVectors(batch) | ||
val root = schemaRoot.getOrElse(new VectorSchemaRoot(fieldVectors.asJava)) | ||
val root = new VectorSchemaRoot(fieldVectors.asJava) | ||
val provider = batchProviderOpt.getOrElse(dictionaryProvider) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. It seems getBatchFieldVectors
only checks same dictionary provider across arrays but not batches. Maybe we should add that too? Anyway, it's kind of out of this PR's scope. Maybe in a separate issue to track that.
@sunchao any more comments? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Can we merge #210 first and trigger another CI run?
Merged. Thanks. |
* feat: Support HashJoin * Add comment * Clean up test * Fix join filter * Fix clippy * Use consistent function with sort merge join * Add note about left semi and left anti joins * feat: Support BroadcastHashJoin * Move tests * Remove unused import * Add function to parse join parameters * Remove duplicate code * For review
* feat: Support HashJoin * Add comment * Clean up test * Fix join filter * Fix clippy * Use consistent function with sort merge join * Add note about left semi and left anti joins * feat: Support BroadcastHashJoin * Move tests * Remove unused import * Add function to parse join parameters * Remove duplicate code * For review
Which issue does this PR close?
Closes #202.
Rationale for this change
What changes are included in this PR?
How are these changes tested?