-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7713] [SQL] Use shared broadcast hadoop conf for partitioned table scan. #6252
Conversation
Merged build triggered. |
Merged build started. |
Test build #33045 has started for PR 6252 at commit |
@@ -118,7 +120,7 @@ private[sql] class ParquetRelation2( | |||
private val maybeDataSchema: Option[StructType], | |||
private val maybePartitionSpec: Option[PartitionSpec], | |||
parameters: Map[String, String])( | |||
val sqlContext: SQLContext) | |||
@transient val sqlContext: SQLContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we want to have @transient
here? ParquetRelation2
is not serializable and shouldn't be serialized.
Merged build triggered. |
Merged build started. |
Test build #33052 has started for PR 6252 at commit |
Test build #33045 has finished for PR 6252 at commit
|
Merged build finished. Test FAILed. |
Test FAILed. |
Test build #33052 has finished for PR 6252 at commit
|
Merged build finished. Test PASSed. |
Test PASSed. |
|
||
if (initLocalJobFuncOpt.isDefined) { | ||
sc.clean(initLocalJobFuncOpt.get) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a potential source of performance optimization. Since we provide the closure internally we don't actually have to clean the closure. If we create many of these RDDs the cleaning time might add up. This could buy us a few seconds (same reasoning as in SPARK-7718, or #6256).
By the way this is definitely not critical for the release. We can fix this separately if you prefer.
Merged build triggered. |
Merged build started. |
Test build #33098 has started for PR 6252 at commit |
Test build #33098 has finished for PR 6252 at commit
|
Merged build finished. Test PASSed. |
Test PASSed. |
* @param valueClass Class of the value associated with the inputFormatClass. | ||
* @param conf The Hadoop configuration. | ||
*/ | ||
@DeveloperApi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: might want to remove @DeveloperApi
and all of the documentation from this and just mention that it's a clone + modify of NewHadoopRDD and that its functionality will probably be folded into core in a future release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, unless there's a need I would prefer to make this private (since it's basically a copy of another class)
General comment: the broadcasted object will be shared by all tasks, so if the initJob function modifies the conf then you might run into trouble. Just wanted to make sure that you were aware of the shared state here in case you're mutating it in certain ways. |
Added a comment to explain that |
Merged build triggered. |
Merged build started. |
Test build #33124 has started for PR 6252 at commit |
Test build #33124 has finished for PR 6252 at commit
|
Merged build finished. Test PASSed. |
Test PASSed. |
Thanks for reviewing! I am merging it to master and branch 1.4. |
…able scan. https://issues.apache.org/jira/browse/SPARK-7713 I tested the performance with the following code: ```scala import sqlContext._ import sqlContext.implicits._ (1 to 5000).foreach { i => val df = (1 to 1000).map(j => (j, s"str$j")).toDF("a", "b").save(s"/tmp/partitioned/i=$i") } sqlContext.sql(""" CREATE TEMPORARY TABLE partitionedParquet USING org.apache.spark.sql.parquet OPTIONS ( path '/tmp/partitioned' )""") table("partitionedParquet").explain(true) ``` In our master `explain` takes 40s in my laptop. With this PR, `explain` takes 14s. Author: Yin Huai <yhuai@databricks.com> Closes #6252 from yhuai/broadcastHadoopConf and squashes the following commits: 6fa73df [Yin Huai] Address comments of Josh and Andrew. 807fbf9 [Yin Huai] Make the new buildScan and SqlNewHadoopRDD private sql. e393555 [Yin Huai] Cheng's comments. 2eb53bb [Yin Huai] Use a shared broadcast Hadoop Configuration for partitioned HadoopFsRelations. (cherry picked from commit b631bf7) Signed-off-by: Yin Huai <yhuai@databricks.com>
…able scan. https://issues.apache.org/jira/browse/SPARK-7713 I tested the performance with the following code: ```scala import sqlContext._ import sqlContext.implicits._ (1 to 5000).foreach { i => val df = (1 to 1000).map(j => (j, s"str$j")).toDF("a", "b").save(s"/tmp/partitioned/i=$i") } sqlContext.sql(""" CREATE TEMPORARY TABLE partitionedParquet USING org.apache.spark.sql.parquet OPTIONS ( path '/tmp/partitioned' )""") table("partitionedParquet").explain(true) ``` In our master `explain` takes 40s in my laptop. With this PR, `explain` takes 14s. Author: Yin Huai <yhuai@databricks.com> Closes apache#6252 from yhuai/broadcastHadoopConf and squashes the following commits: 6fa73df [Yin Huai] Address comments of Josh and Andrew. 807fbf9 [Yin Huai] Make the new buildScan and SqlNewHadoopRDD private sql. e393555 [Yin Huai] Cheng's comments. 2eb53bb [Yin Huai] Use a shared broadcast Hadoop Configuration for partitioned HadoopFsRelations.
…able scan. https://issues.apache.org/jira/browse/SPARK-7713 I tested the performance with the following code: ```scala import sqlContext._ import sqlContext.implicits._ (1 to 5000).foreach { i => val df = (1 to 1000).map(j => (j, s"str$j")).toDF("a", "b").save(s"/tmp/partitioned/i=$i") } sqlContext.sql(""" CREATE TEMPORARY TABLE partitionedParquet USING org.apache.spark.sql.parquet OPTIONS ( path '/tmp/partitioned' )""") table("partitionedParquet").explain(true) ``` In our master `explain` takes 40s in my laptop. With this PR, `explain` takes 14s. Author: Yin Huai <yhuai@databricks.com> Closes apache#6252 from yhuai/broadcastHadoopConf and squashes the following commits: 6fa73df [Yin Huai] Address comments of Josh and Andrew. 807fbf9 [Yin Huai] Make the new buildScan and SqlNewHadoopRDD private sql. e393555 [Yin Huai] Cheng's comments. 2eb53bb [Yin Huai] Use a shared broadcast Hadoop Configuration for partitioned HadoopFsRelations.
…able scan. https://issues.apache.org/jira/browse/SPARK-7713 I tested the performance with the following code: ```scala import sqlContext._ import sqlContext.implicits._ (1 to 5000).foreach { i => val df = (1 to 1000).map(j => (j, s"str$j")).toDF("a", "b").save(s"/tmp/partitioned/i=$i") } sqlContext.sql(""" CREATE TEMPORARY TABLE partitionedParquet USING org.apache.spark.sql.parquet OPTIONS ( path '/tmp/partitioned' )""") table("partitionedParquet").explain(true) ``` In our master `explain` takes 40s in my laptop. With this PR, `explain` takes 14s. Author: Yin Huai <yhuai@databricks.com> Closes apache#6252 from yhuai/broadcastHadoopConf and squashes the following commits: 6fa73df [Yin Huai] Address comments of Josh and Andrew. 807fbf9 [Yin Huai] Make the new buildScan and SqlNewHadoopRDD private sql. e393555 [Yin Huai] Cheng's comments. 2eb53bb [Yin Huai] Use a shared broadcast Hadoop Configuration for partitioned HadoopFsRelations.
https://issues.apache.org/jira/browse/SPARK-7713
I tested the performance with the following code:
In our master
explain
takes 40s in my laptop. With this PR,explain
takes 14s.