Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically adjust spark.rapids.sql.multiThreadedRead.numThreads to the same as spark.executor.cores #6016

Merged
merged 15 commits into from
Jul 27, 2022

Conversation

sinkinben
Copy link
Contributor

@sinkinben sinkinben commented Jul 19, 2022

Close #5524

If spark.rapids.sql.multiThreadedRead.numThreads is not set explicitly, or it is set explicitly as value 0, then we derive it from other settings. Otherwise, we keep the users' setting.

One of choices is to derive from spark.executor.cores, if spark.executor.cores is set by users, then we will try to assign spark.rapids.sql.multiThreadedRead.numThreads with value max(20, spark.executor.cores).

…ark.executor.cores

Signed-off-by: sinkinben <sinkinben@outlook.com>
Signed-off-by: sinkinben <sinkinben@outlook.com>

// We let spark.rapids.sql.multiThreadedRead.numThreads be same as spark.executor.cores.
// See: https://github.com/NVIDIA/spark-rapids/issues/5524
conf.set(RapidsConf.MULTITHREAD_READ_NUM_THREADS.key,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only set the default value as max(20, executor_cores) when the user doesn't set it explicitly. So, you have to check if it's set or not first.

Copy link
Collaborator

@abellina abellina Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @GaryShen2008. I think we want RapidsConf.MULTITHREAD_READ_NUM_THREADS to have a default of 0 (or auto). When it is set to 0, we try to derive it from the other configs.

That said, if the user sets it to something > 0, I'd expect us to honor that config, therefore we wouldn't overwrite it in this case.

Copy link
Contributor Author

@sinkinben sinkinben Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @GaryShen2008. I think we want RapidsConf.MULTITHREAD_READ_NUM_THREADS to have a default of 0 (or auto). When it is set to 0, we try to derive it from the other configs.

That said, if the user sets it to something > 0, I'd expect us to honor that config, therefore we wouldn't overwrite it in this case.

@abellina That means: if we set default value of RapidsConf.MULTITHREAD_READ_NUM_THREADS as 0, then we must assign it a value from the user configs?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you should get the MULTITHREAD_READ_NUM_THREADS first, if it's 0, which means a default value, then we'll use the max(20, executor-cores), otherwise, it means the user set it explicitly, then we don't need to change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default value of MULTITHREAD_READ_NUM_THREADS should keep 20.

If the user does not set both spark.executor.cores and spark.rapids.sql.multiThreadedRead.numThreads, then MULTITHREAD_READ_NUM_THREADS still has a valid value.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 0 helps you to understand if the user set the value or not.
If we keep 20, how do we know it's from user's setting or from the default value.

Copy link
Contributor Author

@sinkinben sinkinben Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep 20, how do we know it's from user's setting or from the default value.

We can judge this via conf.contains("spark.rapids.sql.multiThreadedRead.numThreads"). If the user set spark.rapids.sql.multiThreadedRead.numThreads = 20 explicitly (same as the default value), conf.contains("spark.rapids.sql.multiThreadedRead.numThreads") will be true.

@GaryShen2008
Copy link
Collaborator

BTW, you'd better change the title of this PR to remove "[FEA] "


// We let spark.rapids.sql.multiThreadedRead.numThreads be same as spark.executor.cores.
// See: https://github.com/NVIDIA/spark-rapids/issues/5524
conf.set(RapidsConf.MULTITHREAD_READ_NUM_THREADS.key,
Copy link
Collaborator

@abellina abellina Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @GaryShen2008. I think we want RapidsConf.MULTITHREAD_READ_NUM_THREADS to have a default of 0 (or auto). When it is set to 0, we try to derive it from the other configs.

That said, if the user sets it to something > 0, I'd expect us to honor that config, therefore we wouldn't overwrite it in this case.

@sinkinben sinkinben changed the title [FEA] Automatically adjust spark.rapids.sql.multiThreadedRead.numThreads to the same as spark.executor.cores Automatically adjust spark.rapids.sql.multiThreadedRead.numThreads to the same as spark.executor.cores Jul 19, 2022
Signed-off-by: sinkinben <sinkinben@outlook.com>
Signed-off-by: sinkinben <sinkinben@outlook.com>
@tgravescs
Copy link
Collaborator

It would be nice to have more description here. the issue with the executor.cores is that it is not set properly on all resource managers. Standalone in particular defaults to the all the cores on the box, but I think spark.executor.cores is set to 1. This may be ok here since we are doing the max, but it should at least be documented in the PR and I would also like to see comment in the code.

How was this tested?

@sinkinben
Copy link
Contributor Author

sinkinben commented Jul 20, 2022

How was this tested?

@tgravescs

I tested it locally by running spark-rapids in standalone mode.

spark-shell  --conf spark.executor.cores=26 --jars ${RAPIDS_JAR}

scala> val key = "spark.rapids.sql.multiThreadedRead.numThreads"
scala> spark.conf.get(key)
res0: String = 26

In standalone mode, spark.executor.cores can be set via --conf spark.executor.cores in the command line.

In yarn mode, spark.executor.cores is set by config file /etc/spark/conf/spark-default.conf.

For both of them, the config items will be stored in the variable pluginContext.conf (see the code modification).

Signed-off-by: sinkinben <sinkinben@outlook.com>
Signed-off-by: sinkinben <sinkinben@outlook.com>
Signed-off-by: sinkinben <sinkinben@outlook.com>
@abellina
Copy link
Collaborator

I tested it locally by running spark-rapids in standalone mode.

spark-shell --conf spark.executor.cores=26 --jars ${RAPIDS_JAR}

scala> val key = "spark.rapids.sql.multiThreadedRead.numThreads"
scala> spark.conf.get(key)
res0: String = 26

nit: this is local mode not standalone mode. If you want to try this out in standalone mode a --master spark://[host]:7077 argument needs to be added. Additionally master/worker need to be started ahead of time:

$SPARK_HOME/sbin/start-master.sh 
$SPARK_HOME/sbin/start-worker.sh --master spark://[host]:7077

So when I try this without setting --conf spark.executor.cores=[any number], in yarn, local or standalone mode, the configuration is not present in the driver:

scala> spark.conf.get("spark.executor.cores")
java.util.NoSuchElementException: spark.executor.cores
  at org.apache.spark.sql.errors.QueryExecutionErrors$.noSuchElementExceptionError(QueryExecutionErrors.scala:1494)
  at org.apache.spark.sql.internal.SQLConf.$anonfun$getConfString$3(SQLConf.scala:4128)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.internal.SQLConf.getConfString(SQLConf.scala:4128)
  at org.apache.spark.sql.RuntimeConfig.get(RuntimeConfig.scala:72)
  ... 47 elided

So I am not sure this is set unless the user forces it. @tgravescs that seems different than what you got, what am I doing wrong?

@abellina
Copy link
Collaborator

Tested a bit more: If I set spark.executor.cores in yarn, standalone, or local mode, we can get that value in the driver.

If I start local mode with --master local[N], the value is not present (so this is one case where spark is giving you N cores and spark.executor.cores isn't representing what I'd hope.

And just to recap, any time the conf isn't explicitly set, it's not present in the driver.

docs/configs.md Outdated
@@ -118,7 +118,7 @@ Name | Description | Default Value
<a name="sql.json.read.float.enabled"></a>spark.rapids.sql.json.read.float.enabled|JSON reading is not 100% compatible when reading floats.|true
<a name="sql.metrics.level"></a>spark.rapids.sql.metrics.level|GPU plans can produce a lot more metrics than CPU plans do. In very large queries this can sometimes result in going over the max result size limit for the driver. Supported values include DEBUG which will enable all metrics supported and typically only needs to be enabled when debugging the plugin. MODERATE which should output enough metrics to understand how long each part of the query is taking and how much data is going to each part of the query. ESSENTIAL which disables most metrics except those Apache Spark CPU plans will also report or their equivalents.|MODERATE
<a name="sql.mode"></a>spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu
<a name="sql.multiThreadedRead.numThreads"></a>spark.rapids.sql.multiThreadedRead.numThreads|The maximum number of threads on each executor to use for reading small files in parallel. This can not be changed at runtime after the executor has started. Used with COALESCING and MULTITHREADED readers, see spark.rapids.sql.format.parquet.reader.type, spark.rapids.sql.format.orc.reader.type, or spark.rapids.sql.format.avro.reader.type for a discussion of reader types.|20
<a name="sql.multiThreadedRead.numThreads"></a>spark.rapids.sql.multiThreadedRead.numThreads|The maximum number of threads on each executor to use for reading small files in parallel. This can not be changed at runtime after the executor has started. Used with COALESCING and MULTITHREADED readers, see spark.rapids.sql.format.parquet.reader.type, spark.rapids.sql.format.orc.reader.type, or spark.rapids.sql.format.avro.reader.type for a discussion of reader types. If it is not set explicitly, it will be tried to assign value of `max(20, spark.executor.cores)`.|20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a another bit that says, some cluster manager configurations do not set spark.executor.cores

Copy link
Contributor Author

@sinkinben sinkinben Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, the configs.md will be auto-sync with the description of MULTITHREAD_READ_NUM_THREADS (see RapidsConf.scala). So I remove this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I realized this, I meant it as a generic statement, you should update RapidsConf which in turn will generate this file and this file should be in the PR. The premerge checks should fail if it is not.

@tgravescs
Copy link
Collaborator

sorry, I was thinking of internal Spark API for getting this with:

> sparkConf.get(EXECUTOR_CORES)

This api returns the default value of 1

private[spark] val EXECUTOR_CORES = ConfigBuilder(SparkLauncher.EXECUTOR_CORES)
    .version("1.0.0")
    .intConf
    .createWithDefault(1)

that is private api though.

In this particular case we are talking about it doesn't really matter. If the user doesn't explicitly set it we are going to default to 20 and so if someone starts a standalone cluster and uses the default configuration where executor gets all the cores, we won't change the number of threads. I think this is fine as long as we document it.

Side note for the standalone cluster, if you explicitly set the SPARK_WORKER_CORES, I know dynamic allocation is able to work properly without the default of 1, so I would assume that sets the spark.executor.cores but you would have to test to be sure.

Signed-off-by: sinkinben <sinkinben@outlook.com>
@sinkinben
Copy link
Contributor Author

@abellina Only if spark.executor.cores is set explicitly, can it be getted by spark.conf.get("spark.executor.cores"). In spark, its default value is 1.

@sinkinben
Copy link
Contributor Author

sinkinben commented Jul 21, 2022

@tgravescs
I have tested this on standalone mode and local mode according to getting-started-on-prem.md (latter I will test it on yarn mode). It seems ok. Are there anything else I need to update or test? Many thanks.

Signed-off-by: sinkinben <sinkinben@outlook.com>
if (!conf.contains(numThreadsKey) || conf.get(numThreadsKey).toInt == 0) {
// Derive it from spark.executor.cores
// In standalone mode, users set spark.executor.cores via `--conf spark.executor.cores=10`.
// In yarn mode, users set spark.executor.cores via a config file 'spark-default.conf',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually not completely true, you can set spark.executor.cores via command line as well. I think both of these comments are not needed so please remove. If we were to say anything here, I would say that spark.executor.cores is not set on all cluster managers by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the comments.

Signed-off-by: sinkinben <sinkinben@outlook.com>
@tgravescs
Copy link
Collaborator

you have to add configs.md to your pr. Please run mvn verify and then commit it. That is why this build is fialing.

Signed-off-by: sinkinben <sinkinben@outlook.com>
@tgravescs
Copy link
Collaborator

build

// Derive it from spark.executor.cores, since spark.executor.cores is not set on all cluster
// managers by default, we should judge whether if it's set explicitly.
if (conf.contains(EXECUTOR_CORES_KEY)) {
conf.set(numThreadsKey, Math.max(20, conf.get(EXECUTOR_CORES_KEY).toInt).toString)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be nice to log a messages saying what we are setting this to

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would also be nice to make a global variable (maybe something like MULTITHREAD_READ_NUM_THREADS_DEFAULT) for the 20 so that we don't have different values.

// explicitly as value 0, then we derive it from other settings. Otherwise, we keep the
// users' setting.
val numThreadsKey = RapidsConf.MULTITHREAD_READ_NUM_THREADS.key
if (!conf.contains(numThreadsKey) || conf.get(numThreadsKey).toInt == 0) {
Copy link
Collaborator

@abellina abellina Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my comment about setting it to 0 seems to have been addressed by checking conf.contains but now checking for 0 here is weird and raises an issue.

If the user sets this value to 0 and EXECUTOR_CORES_KEY isn't set, what happens? Do we crash or do we hang?

At this rate if at line 161 of this file numThreadsKey is 0, we should probably throw if we are not doing so already when creating the multi threaded reader.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's constraint that can be added to the config entry (RapidsConf) checkValue that would handle this:

  conf("spark.rapids.sql.multiThreadedRead.numThreads")
      .checkValue(v => v > 0, "numThreads must be greater than 0")
      // + what you have now      

thoughts @tgravescs ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that sounds like beter approach

* If spark.rapids.sql.multiThreadedRead.numThreads is set explicitly to 0, and can not be derived
  from other settings, then we throw an exception.
* Add global default value MULTITHREAD_READ_NUM_THREADS_DEFAULT = 20.

Signed-off-by: sinkinben <sinkinben@outlook.com>
Signed-off-by: sinkinben <sinkinben@outlook.com>
Signed-off-by: sinkinben <sinkinben@outlook.com>
@sinkinben
Copy link
Contributor Author

sinkinben commented Jul 26, 2022

@tgravescs @abellina

It seems that there is a little bug in RapidsConf.

For example,

  val MULTITHREAD_READ_NUM_THREADS = conf("spark.rapids.sql.multiThreadedRead.numThreads")
      .doc("The maximum number of threads on each executor to use for reading small " +
        "files in parallel. This can not be changed at runtime after the executor has " +
        "started. Used with COALESCING and MULTITHREADED readers, see " +
        "spark.rapids.sql.format.parquet.reader.type, " +
        "spark.rapids.sql.format.orc.reader.type, or " +
        "spark.rapids.sql.format.avro.reader.type for a discussion of reader types. " +
        "If it is not set explicitly and spark.executor.cores is set, it will be tried to " +
        "assign value of `max(MULTITHREAD_READ_NUM_THREADS_DEFAULT, spark.executor.cores)`, " +
        s"where MULTITHREAD_READ_NUM_THREADS_DEFAULT = $MULTITHREAD_READ_NUM_THREADS_DEFAULT" +
        ".")
      .integerConf
      .checkValue(v => v > 0, "The thread count must be greater than zero.")
      .createWithDefault(0)

even if we set createWithDefault(0), there is no IllegalArgument exception thrown, since checkValue is executed before createWithDefault.

And I run spark-rapids locally:

spark-shell --jars ${RAPIDS_JAR} \
--conf spark.sql.adaptive.enabled=false \ 
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.rapids.sql.multiThreadedRead.numThreads=-1
 
scala> val key = "spark.rapids.sql.multiThreadedRead.numThreads"                                                                    
key: String = spark.rapids.sql.multiThreadedRead.numThreads                                                                         
                                                                                                                                    
scala> spark.conf.get(key)                                                                                                          
res0: String = -1                                                                                                                   
                                                                                                                                    
scala> :quit

The checkValue is not executed in such case.

I am working on the detailed reason.


I create an issue to describe this. See #6086 .

Signed-off-by: sinkinben <sinkinben@outlook.com>
@tgravescs
Copy link
Collaborator

ok thanks for testing that, we can fix it under the issue you filed.

@tgravescs
Copy link
Collaborator

build

Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a nit, it can probably be fixed later

@@ -293,6 +293,7 @@ object RapidsReaderType extends Enumeration {
}

object RapidsConf {
val MULTITHREAD_READ_NUM_THREADS_DEFAULT = 20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, Can we move this to be right above the MULTITHREAD_READ_NUM_THREADS definition below?

Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the config nit I pointed out can be fixed later, approving this now.

@sinkinben sinkinben merged commit 9b345ab into NVIDIA:branch-22.08 Jul 27, 2022
@sameerz sameerz added the ease of use Makes the product simpler to use or configure label Jul 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ease of use Makes the product simpler to use or configure
Projects
None yet
5 participants