Skip to content

Commit

Permalink
[CELEBORN] Add config to control celeborn fallback for CI (#6230)
Browse files Browse the repository at this point in the history
  • Loading branch information
kerwin-zk authored Jun 27, 2024
1 parent 32808dd commit 3a42e8f
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.shuffle.gluten.celeborn;

import org.apache.gluten.GlutenConfig;
import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.exception.GlutenException;

Expand Down Expand Up @@ -194,9 +195,14 @@ public <K, V, C> ShuffleHandle registerShuffle(
if (dependency instanceof ColumnarShuffleDependency) {
if (fallbackPolicyRunner.applyAllFallbackPolicy(
lifecycleManager, dependency.partitioner().numPartitions())) {
logger.warn("Fallback to ColumnarShuffleManager!");
columnarShuffleIds.add(shuffleId);
return columnarShuffleManager().registerShuffle(shuffleId, dependency);
if (GlutenConfig.getConf().enableCelebornFallback()) {
logger.warn("Fallback to ColumnarShuffleManager!");
columnarShuffleIds.add(shuffleId);
return columnarShuffleManager().registerShuffle(shuffleId, dependency);
} else {
throw new GlutenException(
"The Celeborn service(Master: " + celebornConf.masterHost() + ") is unavailable");
}
} else {
return registerCelebornShuffleHandle(shuffleId, dependency);
}
Expand Down
10 changes: 10 additions & 0 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {
conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)

def enableHiveFileFormatWriter: Boolean = conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)

def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED)
}

object GlutenConfig {
Expand Down Expand Up @@ -2049,4 +2051,12 @@ object GlutenConfig {
.doubleConf
.checkValue(v => v >= 0 && v <= 1, "offheap sizing memory fraction must between [0, 1]")
.createWithDefault(0.6)

val CELEBORN_FALLBACK_ENABLED =
buildStaticConf("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled")
.internal()
.doc("If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable." +
"Otherwise, throw an exception.")
.booleanConf
.createWithDefault(true)
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object Constants {

val VELOX_WITH_CELEBORN_CONF: SparkConf = new SparkConf(false)
.set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
.set("spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled", "false")
.set("spark.sql.parquet.enableVectorizedReader", "true")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set(
Expand Down

0 comments on commit 3a42e8f

Please sign in to comment.