-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49808][SQL] Fix a deadlock in subquery execution due to lazy vals #48391
base: master
Are you sure you want to change the base?
Conversation
7b1b392
to
25644b8
Compare
* the parent object. | ||
* c) If thread 1 waits for thread 2 to join, a deadlock occurs. | ||
*/ | ||
@SerialVersionUID(7964587975756091988L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generated by:
(spark_dev_312) ➜ spark git:(query_plan_lazy_ref) /usr/bin/serialver -classpath /Users/ruifeng.zheng/Dev/spark/core/target/scala-2.13/spark-core_2.13-4.0.0-SNAPSHOT.jar:/Users/ruifeng.zheng/Dev/spark/build/scala-2.12.18/lib/scala-library.jar org.apache.spark.util.Lazy
org.apache.spark.util.Lazy: private static final long serialVersionUID = 7964587975756091988L;
(spark_dev_312) ➜ spark git:(query_plan_lazy_ref) /usr/bin/serialver -classpath /Users/ruifeng.zheng/Dev/spark/core/target/scala-2.13/spark-core_2.13-4.0.0-SNAPSHOT.jar:/Users/ruifeng.zheng/Dev/spark/build/scala-2.13.12/lib/scala-library.jar org.apache.spark.util.Lazy
org.apache.spark.util.Lazy: private static final long serialVersionUID = 7964587975756091988L;
(spark_dev_312) ➜ spark git:(query_plan_lazy_ref) /usr/bin/serialver -classpath /Users/ruifeng.zheng/Dev/spark/core/target/scala-2.13/spark-core_2.13-4.0.0-SNAPSHOT.jar:/Users/ruifeng.zheng/Dev/spark/build/scala-2.13.13/lib/scala-library.jar org.apache.spark.util.Lazy
org.apache.spark.util.Lazy: private static final long serialVersionUID = 7964587975756091988L;
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why doesn't LazyTry
need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this SerialVersionUID
stems indirectly from the unit test case
test("Lazy val serializes the value, even if dereference was never called")
which looks like it is testing for some serialization behaviors related to whether or not we serialize the initializer
closure.
In this PR's current implementation, the stream.defaultWriteObject()
after field initialization means that we actually have strict / eager semantics when serializing these fields and that we won't serialize the initializer
.
To see that more clearly, we can use a tool like cfr-decompiler
to decompile the generated code, from which we see that the initializer
reference gets cleared after its use in the lazy val evaluation
private Function0<T> initializer;
private volatile boolean bitmap$0;
private T value$lzycompute() {
Lazy lazy = this;
synchronized (lazy) {
if (!this.bitmap$0) {
this.value = this.initializer.apply();
this.bitmap$0 = true;
}
}
this.initializer = null;
return this.value;
}
private T value() {
if (!this.bitmap$0) {
return this.value$lzycompute();
}
return this.value;
}
There's a bit of a trade-off space here:
- Eager evaluation upon serialization may reduce the size of the serialized data in case the
initializer
lambda/closure is large in comparison to the value it produces and can avoid problems from non-serializable lambda/closures, or compatibility problems in case wire protocol bidirectional compatibility is needed (e.g. if you are somehow persisting aLazy[T]
and deserializing it with a different classpath than the one that generated it, because it then changes the compatibility surface to include compatibility of the lambda / closure serialization rather than the value). - But this comes at the cost of potentially forcing more evaluations than otherwise would have happened.
In our context of use here, I think that by default it's better to forego the SerialVersionUID
and "eager serialization" semantics completely and instead just do what LazyTry
did, as the "eager evaluation upon serialization" semantic may be unwanted in this context of use or may address a set of problems that we don't have in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, let me remove the SerialVersionUID
and "eager serialization"
(base) ➜ spark-trunk git:(branch-3.4) ✗ serialver -classpath core/target/spark-core_2.12-3.4.4-SNAPSHOT.jar:build/scala-2.12.17/lib/scala-library.jar org.apache.spark.util.Lazy
org.apache.spark.util.Lazy: private static final long serialVersionUID = 7964587975756091988L; |
@panbingkun thanks a lot for confirming the SerialVersionUID! |
969a661
to
d791409
Compare
d791409
to
80f26cb
Compare
cc @JoshRosen would you mind taking another look? thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @zhengruifeng and all.
@@ -94,10 +95,11 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] | |||
* All Attributes that appear in expressions from this operator. Note that this set does not | |||
* include attributes that are implicitly referenced by being passed through to the output tuple. | |||
*/ | |||
@transient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for only spotting this now, but:
Previously we would recompute these attribute references if they were requested on a QueryPlan that had been serialized and shipped to executors, whereas after this PR's change we'll use attributes sent from the driver.
Are there any potentially adverse side effects of this, other than serialized size? I vaguely recall that there was some attribute JVM ID reference stuff somewhere, but I'm unsure offhand if that's a factor here.
I mention this because the @transient
was added in 4483331 and I'm wondering if it was somehow load bearing or whether it's just a perf. optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point!
according to the description of #24866 , it mainly affect the planning time.
Maybe we can add @transient
to _references
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we mark Lazy#value
as @transient
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making Lazy#value
a @transient
field is an interesting proposal and seems promising.
If we do this, I think we should consider changing the name to TransientLazy
or LazyTransient
for the sake of making the transient aspect more salient to a reader / reviewer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for TransientLazy
convert to draft for now to avoid merge by mistake |
What changes were proposed in this pull request?
1, Introduce a helper class
Lazy
to replace the lazy vals2, Fix a deadlock in subquery execution
Why are the changes needed?
we observed a deadlock between
QueryPlan.canonicalized
andQueryPlan.references
:The main thread
TakeOrderedAndProject.doExecute
is trying to computeoutputOrdering
, it top-down traverse the tree, and requires the lock ofQueryPlan.canonicalized
in the path.In this deadlock, it successfully obtained the lock of
WholeStageCodegenExec
and requires the lock ofHashAggregateExec
;Concurrently, a subquery execution thread is performing code generation and bottom-up traverses the tree via
def consume
, which checksWholeStageCodegenExec.usedInputs
and refererences a lazy valQueryPlan.references
. It requires the lock ofQueryPlan.references
in the path.In this deadlock, it successfully obtained the lock of
HashAggregateExec
and requires the lock ofWholeStageCodegenExec
;This is due to Scala's lazy val internally calls this.synchronized on the instance that contains the val. This creates a potential for deadlocks.
Does this PR introduce any user-facing change?
no
How was this patch tested?
manually test:
before the fix, the deadlock happened twice in first 20 runs;
after the fix, the deadlock didn't happen in consecutive 100+ runs
Was this patch authored or co-authored using generative AI tooling?
no