Skip to content

Commit

Permalink
[GLUTEN-7164][VL] Disable background IO threads by default (apache#7165)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Nov 12, 2024
1 parent 56ba51e commit 7a7a335
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public MetricRegistry metricRegistry() {
public SparkConf conf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), "0");
conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY(), "1g");
return conf;
}

Expand Down
4 changes: 4 additions & 0 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ void VeloxBackend::initConnector() {
ioThreads >= 0,
kVeloxIOThreads + " was set to negative number " + std::to_string(ioThreads) + ", this should not happen.");
if (ioThreads > 0) {
LOG(WARNING)
<< "Velox background IO threads is enabled. Which is highly unrecommended as of now, since it may cause"
<< " some unexpected issues like query crash or hanging. Please turn it off if you are unsure about"
<< " this option.";
ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
}
velox::connector::registerConnector(std::make_shared<velox::connector::hive::HiveConnector>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-17091: Convert IN predicate to Parquet filter push-down")
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
.exclude("Filter applied on merged Parquet schema with new column should work")
Expand All @@ -882,6 +884,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-17091: Convert IN predicate to Parquet filter push-down")
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
protected def checkFilterPredicate(
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
Expand Down Expand Up @@ -357,7 +357,7 @@ abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ
}

@ExtendedSQLTest
class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down Expand Up @@ -445,7 +445,7 @@ class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQL
}

@ExtendedSQLTest
class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
.exclude("Filter applied on merged Parquet schema with new column should work")
Expand All @@ -695,6 +697,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
protected def checkFilterPredicate(
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
Expand Down Expand Up @@ -328,7 +328,7 @@ abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ
}

@ExtendedSQLTest
class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down Expand Up @@ -416,7 +416,7 @@ class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQL
}

@ExtendedSQLTest
class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same")
.exclude("filter pushdown - StringPredicate")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
.exclude("Filter applied on merged Parquet schema with new column should work")
Expand All @@ -676,6 +678,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same")
.exclude("filter pushdown - StringPredicate")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
protected def checkFilterPredicate(
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
Expand Down Expand Up @@ -329,7 +329,7 @@ abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ
}

@ExtendedSQLTest
class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down Expand Up @@ -417,7 +417,7 @@ class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQL
}

@ExtendedSQLTest
class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same")
.exclude("filter pushdown - StringPredicate")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
.exclude("Filter applied on merged Parquet schema with new column should work")
Expand All @@ -681,6 +683,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same")
.exclude("filter pushdown - StringPredicate")
// https://github.com/apache/incubator-gluten/issues/7174
.excludeGlutenTest("Filter applied on merged Parquet schema with new column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with GlutenSQLTestsBaseTrait {
protected def checkFilterPredicate(
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
Expand Down Expand Up @@ -328,7 +328,7 @@ abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ
}

@ExtendedSQLTest
class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down Expand Up @@ -416,7 +416,7 @@ class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with GlutenSQL
}

@ExtendedSQLTest
class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with GlutenSQLTestsBaseTrait {
class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are workable.
override def sparkConf: SparkConf =
super.sparkConf
Expand Down
16 changes: 11 additions & 5 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def veloxSsdODirectEnabled: Boolean = conf.getConf(COLUMNAR_VELOX_SSD_ODIRECT_ENABLED)

def veloxConnectorIOThreads: Int = {
conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS).getOrElse(numTaskSlotsPerExecutor)
conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS)
}

def veloxSplitPreloadPerDriver: Integer = conf.getConf(COLUMNAR_VELOX_SPLIT_PRELOAD_PER_DRIVER)
Expand Down Expand Up @@ -710,7 +710,7 @@ object GlutenConfig {
(AWS_S3_RETRY_MODE.key, AWS_S3_RETRY_MODE.defaultValueString),
(
COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key,
conf.getOrElse(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, "-1")),
COLUMNAR_VELOX_CONNECTOR_IO_THREADS.defaultValueString),
(COLUMNAR_SHUFFLE_CODEC.key, ""),
(COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""),
("spark.hadoop.input.connect.timeout", "180000"),
Expand Down Expand Up @@ -1331,13 +1331,19 @@ object GlutenConfig {
.booleanConf
.createWithDefault(false)

// FIXME: May cause issues when toggled on. Examples:
// https://github.com/apache/incubator-gluten/issues/7161
// https://github.com/facebookincubator/velox/issues/10173
val COLUMNAR_VELOX_CONNECTOR_IO_THREADS =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.IOThreads")
.internal()
.doc("The Size of the IO thread pool in the Connector. This thread pool is used for split" +
" preloading and DirectBufferedInput.")
.doc(
"Experimental: The Size of the IO thread pool in the Connector." +
" This thread pool is used for split preloading and DirectBufferedInput." +
" The option is experimental. Toggling on it (setting a non-zero value) may cause some" +
" unexpected issues when application reaches some certain conditions.")
.intConf
.createOptional
.createWithDefault(0)

val COLUMNAR_VELOX_ASYNC_TIMEOUT =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping")
Expand Down

0 comments on commit 7a7a335

Please sign in to comment.