-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13353] [SQL] fast serialization for collecting DataFrame/Dataset #11664
Conversation
Test build #52962 has finished for PR 11664 at commit
|
} | ||
// Collect the byte arrays back to driver, then decode them as UnsafeRows. | ||
val nFields = schema.length | ||
byteArrayRdd.collect().flatMap { bytes => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this block would be more readable if we just write it imperatively, e.g.
val results = new ArrayBuffer
byteArrayRdd.collect().foreach { bytes =>
var sizeOfNextRow = bytes.getInt()
while (sizeOfNextRow >= 0) {
val row = new UnsafeRow(nFields)
row.pointTo(buffer.array(), Platform.BYTE_ARRAY_OFFSET + buffer.position(), sizeInBytes)
buffer.position(buffer.position() + sizeOfNextRow)
results += row
sizeOfNextRow = bytes.getInt()
}
}
results.toArray
Test build #53106 has finished for PR 11664 at commit
|
Test build #53109 has finished for PR 11664 at commit
|
LGTM (pending Jenkins) |
Test build #2635 has finished for PR 11664 at commit
|
Test build #2637 has finished for PR 11664 at commit
|
Test build #53156 has finished for PR 11664 at commit
|
Merging in master. |
} | ||
out.writeInt(-1) | ||
out.flush() | ||
out.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be wrapped in finally block ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of these happen in memory, there should not be any exception expected, also no resource will leak.
## What changes were proposed in this pull request? JIRA: https://issues.apache.org/jira/browse/SPARK-13930 Recently the fast serialization has been introduced to collecting DataFrame/Dataset (#11664). The same technology can be used on collect limit operator too. ## How was this patch tested? Add a benchmark for collect limit to `BenchmarkWholeStageCodegen`. Without this patch: model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) collect limit: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- collect limit 1 million 3413 / 3768 0.3 3255.0 1.0X collect limit 2 millions 9728 / 10440 0.1 9277.3 0.4X With this patch: model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) collect limit: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- collect limit 1 million 833 / 1284 1.3 794.4 1.0X collect limit 2 millions 3348 / 4005 0.3 3193.3 0.2X Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #11759 from viirya/execute-take.
## What changes were proposed in this pull request? When we call DataFrame/Dataset.collect(), Java serializer (or Kryo Serializer) will be used to serialize the UnsafeRows in executor, then deserialize them into UnsafeRows in driver. Java serializer (and Kyro serializer) are slow on millions rows, because they try to find out the same rows, but usually there is no same rows. This PR will serialize the UnsafeRows as byte array by packing them together, then Java serializer (or Kyro serializer) serialize the bytes very fast (there are fewer blocks and byte array are not compared by content). The UnsafeRow format is highly compressible, the serialized bytes are also compressed (configurable by spark.io.compression.codec). ## How was this patch tested? Existing unit tests. Add a benchmark for collect, before this patch: ``` Intel(R) Core(TM) i7-4558U CPU 2.80GHz collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- collect 1 million 3991 / 4311 0.3 3805.7 1.0X collect 2 millions 10083 / 10637 0.1 9616.0 0.4X collect 4 millions 29551 / 30072 0.0 28182.3 0.1X ``` ``` Intel(R) Core(TM) i7-4558U CPU 2.80GHz collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- collect 1 million 775 / 1170 1.4 738.9 1.0X collect 2 millions 1153 / 1758 0.9 1099.3 0.7X collect 4 millions 4451 / 5124 0.2 4244.9 0.2X ``` We can see about 5-7X speedup. Author: Davies Liu <davies@databricks.com> Closes apache#11664 from davies/serialize_row.
## What changes were proposed in this pull request? JIRA: https://issues.apache.org/jira/browse/SPARK-13930 Recently the fast serialization has been introduced to collecting DataFrame/Dataset (apache#11664). The same technology can be used on collect limit operator too. ## How was this patch tested? Add a benchmark for collect limit to `BenchmarkWholeStageCodegen`. Without this patch: model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) collect limit: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- collect limit 1 million 3413 / 3768 0.3 3255.0 1.0X collect limit 2 millions 9728 / 10440 0.1 9277.3 0.4X With this patch: model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) collect limit: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- collect limit 1 million 833 / 1284 1.3 794.4 1.0X collect limit 2 millions 3348 / 4005 0.3 3193.3 0.2X Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes apache#11759 from viirya/execute-take.
What changes were proposed in this pull request?
When we call DataFrame/Dataset.collect(), Java serializer (or Kryo Serializer) will be used to serialize the UnsafeRows in executor, then deserialize them into UnsafeRows in driver. Java serializer (and Kyro serializer) are slow on millions rows, because they try to find out the same rows, but usually there is no same rows.
This PR will serialize the UnsafeRows as byte array by packing them together, then Java serializer (or Kyro serializer) serialize the bytes very fast (there are fewer blocks and byte array are not compared by content).
The UnsafeRow format is highly compressible, the serialized bytes are also compressed (configurable by spark.io.compression.codec).
How was this patch tested?
Existing unit tests.
Add a benchmark for collect, before this patch:
We can see about 5-7X speedup.