-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22032][PySpark] Speed up StructType conversion #19249
Conversation
Test build #81827 has finished for PR 19249 at commit
|
Test build #81828 has finished for PR 19249 at commit
|
Test build #81829 has finished for PR 19249 at commit
|
Do you have some benchmarks and numbers with this? |
To be honest, it looks too trivial that I won't bother. |
I was checking this with my production code. I'll try to create benchmark for this. |
Did it save 6~7% of the total execution time? |
Yep. In real world scenarios. |
Okay, then let's go ahead then. Let'd add some numbers in the PR description. |
python/pyspark/sql/types.py
Outdated
@@ -619,7 +621,8 @@ def fromInternal(self, obj): | |||
# it's already converted by pickler | |||
return obj | |||
if self._needSerializeAnyField: | |||
values = [f.fromInternal(v) for f, v in zip(self.fields, obj)] | |||
values = [f.fromInternal(v) if n else v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. This can be recursive and per-record and we avoid here by pre-computing. I see. That makes much sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's describe this in more details and add some numbers (and in your other PRs too).
I added benchmark for this code. |
Could you mark [PySpark] in the title? cc @ueshin |
python/pyspark/sql/types.py
Outdated
@@ -619,7 +621,8 @@ def fromInternal(self, obj): | |||
# it's already converted by pickler | |||
return obj | |||
if self._needSerializeAnyField: | |||
values = [f.fromInternal(v) for f, v in zip(self.fields, obj)] | |||
values = [f.fromInternal(v) if n else v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we run a benchmark with the worst case, when all columns are needed to be converted? I think here we pay another if and extra element in the zip to prevert function call basically. This one looks okay practically but I guess we should also identify the downside.
Also, let's add a comment here to describe what we are doing here and also add some links to this PR for other guys to refer the benchmarks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked this worst case scenario.
Test
df = spark.range(1000000).selectExpr("current_timestamp as id0", "current_timestamp as id1", "current_timestamp as id2", "current_timestamp as id3", "current_timestamp as id4", "current_timestamp as id5", "current_timestamp as id6", "current_timestamp as id7", "current_timestamp as id8", "current_timestamp as id9").cache()
df.count()
df.rdd.map(lambda x: x).count()
Before
31166064 function calls (31163984 primitive calls) in 150.882 seconds
After
31166064 function calls (31163984 primitive calls) in 153.220 seconds
So it's a little bit slower (2%). But I think with real world data this scenario is almost imposible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for testing this out. Let's add a comment here to explain what we are doing and move the worst case benchmark into the PR description.
python/pyspark/sql/types.py
Outdated
@@ -483,7 +483,8 @@ def __init__(self, fields=None): | |||
self.names = [f.name for f in fields] | |||
assert all(isinstance(f, StructField) for f in fields),\ | |||
"fields should be a list of StructField" | |||
self._needSerializeAnyField = any(f.needConversion() for f in self) | |||
self._needConversion = [f.needConversion() for f in self] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rename this to another, for example, _needConversions
(or others if there are) and leave a comment here why we do this.
python/pyspark/sql/types.py
Outdated
@@ -619,7 +621,8 @@ def fromInternal(self, obj): | |||
# it's already converted by pickler | |||
return obj | |||
if self._needSerializeAnyField: | |||
values = [f.fromInternal(v) for f, v in zip(self.fields, obj)] | |||
values = [f.fromInternal(v) if n else v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for testing this out. Let's add a comment here to explain what we are doing and move the worst case benchmark into the PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minimal change and practically significant improvement. LGTM. @ueshin, do you maybe have some comments on this?
python/pyspark/sql/types.py
Outdated
@@ -619,7 +621,8 @@ def fromInternal(self, obj): | |||
# it's already converted by pickler | |||
return obj | |||
if self._needSerializeAnyField: | |||
values = [f.fromInternal(v) for f, v in zip(self.fields, obj)] | |||
values = [f.fromInternal(v) if n else v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the similar trick on toInternal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, looks we could too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test build #81852 has finished for PR 19249 at commit
|
LGTM |
LGTM too but hey @maver1ck could you add some comments around the codes and move the worst case benchmarks into the PR description? I guess this wouldn't be too demanding. |
Done. |
Test build #81853 has finished for PR 19249 at commit
|
Test build #81854 has finished for PR 19249 at commit
|
Merged to master. |
A late LGTM. Btw, can we use the same idea for |
Thanks for double checking @ueshin. Yes, I noticed that too while reviewing it. I just decided to merge it as is because I am quite sure of this one given struct type is the root type and this case looks quite common, and regarding that it looks the first contribution. Even though this one has a downside, practically the improvement looked better. I am also fine with doing this for others too (I am +0 for other types). |
@ueshin |
We can split two |
What changes were proposed in this pull request?
StructType.fromInternal is calling f.fromInternal(v) for every field.
We can use precalculated information about type to limit the number of function calls. (its calculated once per StructType and used in per record calculations)
Benchmarks (Python profiler)
Before
After
Main difference is types.py:619() and types.py:88(fromInternal) (which is removed in After)
The number of function calls is 100 million less. And performance is 20% better.
Benchmark (worst case scenario.)
Test
Before
After
IMPORTANT:
The benchmark was done on top of #19246.
Without #19246 the performance improvement will be even greater.
How was this patch tested?
Existing tests.
Performance benchmark.