-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1121: Include avro for yarn-alpha builds #49
Conversation
Merged build triggered. |
Merged build started. |
Merged build triggered. |
Merged build finished. |
All automated tests passed. |
I'll give it a try. Any reason we don't just tie this to the yarn-alpha profile? Or does it not apply to the hadoop 2.0.2 type builds? |
Merged build triggered. |
Merged build started. |
Your suggestion of @tgravescs tying this yarn-alpha I think is strictly better. Just updated the patch. |
This lets us explicitly include Avro based on a profile for 0.23.X builds. It makes me sad how convoluted it is to express this logic in Maven.
Merged build finished. |
One or more automated tests failed |
Merged build triggered. |
Merged build started. |
Merged build finished. |
One or more automated tests failed |
Jenkins, test this please. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
All automated tests passed. |
Hey guys, I tried building for YARN with 2.2.0 and 0.23.9 and seems like it's working. I'd like to get this in as a build fix, but please re-open SPARK-1121 if you have any issues and we can patch it up.
|
Fix Chill serialization of Range objects It used to write out each element one by one, creating very large objects. (cherry picked from commit 320418f) Signed-off-by: Reynold Xin <rxin@apache.org>
This lets us explicitly include Avro based on a profile for 0.23.X builds. It makes me sad how convoluted it is to express this logic in Maven. @tgraves and @sryza curious if this works for you. I'm also considering just reverting to how it was before. The only real problem was that Spark advertised a dependency on Avro even though it only really depends transitively on Avro through other deps. Author: Patrick Wendell <pwendell@gmail.com> Closes apache#49 from pwendell/avro-build-fix and squashes the following commits: 8d6ee92 [Patrick Wendell] SPARK-1121: Add avro to yarn-alpha profile
use release path for uploading jar, not current
## What changes were proposed in this pull request? This PR adds the byte code parser for the closure translation optimization. It translates Java closure like MapFunction, or FilterFunction to an intermediate format:`Node` tree. For example, closure `(v: Int) => { v > 0 }` will be translated to `Node` tree: ``` Arithmetic[Z](>) Argument[I] Constant[I](0) ``` ## How was this patch tested? Unit test. Author: Sean Zhong <seanzhong@databricks.com> Closes apache#49 from clockfly/closure_parser_part1.
…it jars (apache#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (apache#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
…it jars (apache#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (apache#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
…it jars (apache#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (apache#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
fix bash error
…pyspark-kernel to netflix/2.1.1-unstable Squashed commit of the following: commit 831d8d8699ab96e3bc25bdf23ef60a24536b454a Author: Vinitha Gankidi <vgankidi@netflix.com> Date: Thu Jul 25 11:49:07 2019 -0700 Move pyspark kernel.py into spark run.py. sparklyr kernel will be directly called in as `sparklyr` command
Set environment variable OS_USE_OCTAVIA=true in LBaaS job to make terraform knowing the deployment is LBaaS + Octavia. Fixes: apache#49
…tions ### What changes were proposed in this pull request? In order to avoid frequently changing the value of `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions`, we usually set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` much larger than `spark.sql.shuffle.partitions` after enabling adaptive execution, which causes some bucket map join lose efficacy and add more `ShuffleExchange`. How to reproduce: ```scala val bucketedTableName = "bucketed_table" spark.range(10000).write.bucketBy(500, "id").sortBy("id").mode(org.apache.spark.sql.SaveMode.Overwrite).saveAsTable(bucketedTableName) val bucketedTable = spark.table(bucketedTableName) val df = spark.range(8) spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) // Spark 2.4. spark.sql.adaptive.enabled=false // We set spark.sql.shuffle.partitions <= 500 every time based on our data in this case. spark.conf.set("spark.sql.shuffle.partitions", 500) bucketedTable.join(df, "id").explain() // Since 3.0. We enabled adaptive execution and set spark.sql.adaptive.shuffle.maxNumPostShufflePartitions to a larger values to fit more cases. spark.conf.set("spark.sql.adaptive.enabled", true) spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 1000) bucketedTable.join(df, "id").explain() ``` ``` scala> bucketedTable.join(df, "id").explain() == Physical Plan == *(4) Project [id#5L] +- *(4) SortMergeJoin [id#5L], [id#7L], Inner :- *(1) Sort [id#5L ASC NULLS FIRST], false, 0 : +- *(1) Project [id#5L] : +- *(1) Filter isnotnull(id#5L) : +- *(1) ColumnarToRow : +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500 +- *(3) Sort [id#7L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#7L, 500), true, [id=#49] +- *(2) Range (0, 8, step=1, splits=16) ``` vs ``` scala> bucketedTable.join(df, "id").explain() == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- Project [id#5L] +- SortMergeJoin [id#5L], [id#7L], Inner :- Sort [id#5L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#5L, 1000), true, [id=#93] : +- Project [id#5L] : +- Filter isnotnull(id#5L) : +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500 +- Sort [id#7L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#7L, 1000), true, [id=#92] +- Range (0, 8, step=1, splits=16) ``` This PR makes read bucketed tables always obeys `spark.sql.shuffle.partitions` even enabling adaptive execution and set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` to avoid add more `ShuffleExchange`. ### Why are the changes needed? Do not degrade performance after enabling adaptive execution. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Unit test. Closes #26409 from wangyum/SPARK-29655. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… more scenarios such as PartitioningCollection ### What changes were proposed in this pull request? This PR proposes to improve `EnsureRquirement.reorderJoinKeys` to handle the following scenarios: 1. If the keys cannot be reordered to match the left-side `HashPartitioning`, consider the right-side `HashPartitioning`. 2. Handle `PartitioningCollection`, which may contain `HashPartitioning` ### Why are the changes needed? 1. For the scenario 1), the current behavior matches either the left-side `HashPartitioning` or the right-side `HashPartitioning`. This means that if both sides are `HashPartitioning`, it will try to match only the left side. The following will not consider the right-side `HashPartitioning`: ``` val df1 = (0 until 10).map(i => (i % 5, i % 13)).toDF("i1", "j1") val df2 = (0 until 10).map(i => (i % 7, i % 11)).toDF("i2", "j2") df1.write.format("parquet").bucketBy(4, "i1", "j1").saveAsTable("t1")df2.write.format("parquet").bucketBy(4, "i2", "j2").saveAsTable("t2") val t1 = spark.table("t1") val t2 = spark.table("t2") val join = t1.join(t2, t1("i1") === t2("j2") && t1("i1") === t2("i2")) join.explain == Physical Plan == *(5) SortMergeJoin [i1#26, i1#26], [j2#31, i2#30], Inner :- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#69] : +- *(1) Project [i1#26, j1#27] : +- *(1) Filter isnotnull(i1#26) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4 +- *(4) Sort [j2#31 ASC NULLS FIRST, i2#30 ASC NULLS FIRST], false, 0. +- Exchange hashpartitioning(j2#31, i2#30, 4), true, [id=#79]. <===== This can be removed +- *(3) Project [i2#30, j2#31] +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30)) +- *(3) ColumnarToRow +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4 ``` 2. For the scenario 2), the current behavior does not handle `PartitioningCollection`: ``` val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1") val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("i2", "j2") val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3") val join = df1.join(df2, df1("i1") === df2("i2") && df1("j1") === df2("j2")) // PartitioningCollection val join2 = join.join(df3, join("j1") === df3("j3") && join("i1") === df3("i3")) join2.explain == Physical Plan == *(9) SortMergeJoin [j1#8, i1#7], [j3#30, i3#29], Inner :- *(6) Sort [j1#8 ASC NULLS FIRST, i1#7 ASC NULLS FIRST], false, 0. <===== This can be removed : +- Exchange hashpartitioning(j1#8, i1#7, 5), true, [id=#58] <===== This can be removed : +- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner : :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#45] : : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8] : : +- *(1) LocalTableScan [_1#2, _2#3] : +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#51] : +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19] : +- *(3) LocalTableScan [_1#13, _2#14] +- *(8) Sort [j3#30 ASC NULLS FIRST, i3#29 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(j3#30, i3#29, 5), true, [id=#64] +- *(7) Project [_1#24 AS i3#29, _2#25 AS j3#30] +- *(7) LocalTableScan [_1#24, _2#25] ``` ### Does this PR introduce _any_ user-facing change? Yes, now from the above examples, the shuffle/sort nodes pointed by `This can be removed` are now removed: 1. Senario 1): ``` == Physical Plan == *(4) SortMergeJoin [i1#26, i1#26], [i2#30, j2#31], Inner :- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#67] : +- *(1) Project [i1#26, j1#27] : +- *(1) Filter isnotnull(i1#26) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4 +- *(3) Sort [i2#30 ASC NULLS FIRST, j2#31 ASC NULLS FIRST], false, 0 +- *(3) Project [i2#30, j2#31] +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30)) +- *(3) ColumnarToRow +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4 ``` 2. Scenario 2): ``` == Physical Plan == *(8) SortMergeJoin [i1#7, j1#8], [i3#29, j3#30], Inner :- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner : :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#43] : : +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8] : : +- *(1) LocalTableScan [_1#2, _2#3] : +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#49] : +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19] : +- *(3) LocalTableScan [_1#13, _2#14] +- *(7) Sort [i3#29 ASC NULLS FIRST, j3#30 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i3#29, j3#30, 5), true, [id=#58] +- *(6) Project [_1#24 AS i3#29, _2#25 AS j3#30] +- *(6) LocalTableScan [_1#24, _2#25] ``` ### How was this patch tested? Added tests. Closes #29074 from imback82/reorder_keys. Authored-by: Terry Kim <yuminkim@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? Purge pip cache in dockerfile ### Why are the changes needed? to save 4~5G disk space: before https://github.com/zhengruifeng/spark/actions/runs/7541725028/job/20530432798 ``` #45 [39/39] RUN df -h #45 0.090 Filesystem Size Used Avail Use% Mounted on #45 0.090 overlay 84G 70G 15G 83% / #45 0.090 tmpfs 64M 0 64M 0% /dev #45 0.090 shm 64M 0 64M 0% /dev/shm #45 0.090 /dev/root 84G 70G 15G 83% /etc/resolv.conf #45 0.090 tmpfs 7.9G 0 7.9G 0% /proc/acpi #45 0.090 tmpfs 7.9G 0 7.9G 0% /sys/firmware #45 0.090 tmpfs 7.9G 0 7.9G 0% /proc/scsi #45 DONE 2.0s ``` after https://github.com/zhengruifeng/spark/actions/runs/7549204209/job/20552796796 ``` #48 [42/43] RUN python3.12 -m pip cache purge #48 0.670 Files removed: 392 #48 DONE 0.7s #49 [43/43] RUN df -h #49 0.075 Filesystem Size Used Avail Use% Mounted on #49 0.075 overlay 84G 65G 19G 79% / #49 0.075 tmpfs 64M 0 64M 0% /dev #49 0.075 shm 64M 0 64M 0% /dev/shm #49 0.075 /dev/root 84G 65G 19G 79% /etc/resolv.conf #49 0.075 tmpfs 7.9G 0 7.9G 0% /proc/acpi #49 0.075 tmpfs 7.9G 0 7.9G 0% /sys/firmware #49 0.075 tmpfs 7.9G 0 7.9G 0% /proc/scsi ``` ### Does this PR introduce _any_ user-facing change? no, infra-only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #44768 from zhengruifeng/infra_docker_cleanup. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This lets us explicitly include Avro based on a profile for 0.23.X
builds. It makes me sad how convoluted it is to express this logic
in Maven. @tgraves and @sryza curious if this works for you.
I'm also considering just reverting to how it was before. The only
real problem was that Spark advertised a dependency on Avro
even though it only really depends transitively on Avro through
other deps.