From 2b515c8945d1da64e33c64edc7f6d00433ca7b44 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 31 Mar 2020 14:55:39 -0500 Subject: [PATCH] cleanup --- .../spark/api/python/PythonRunner.scala | 1 - .../spark/resource/ResourceProfile.scala | 1 - .../apache/spark/scheduler/DAGScheduler.scala | 26 ++++++++++++------- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 04591e87a72e4..7cf214865280a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -129,7 +129,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( envVars.put("SPARK_REUSE_WORKER", "1") } val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_PROPERTY)).map(_.toLong) - logInfo(s"task context pyspark memory is: $memoryMb") val workerMemoryMb = getWorkerMemoryMb(memoryMb) if (workerMemoryMb.isDefined) { envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", workerMemoryMb.get.toString) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index 5e041293340da..1739f1c127603 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -29,7 +29,6 @@ import org.apache.spark.annotation.Evolving import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY -import org.apache.spark.util.Utils /** * Resource profile to associate with an RDD. A ResourceProfile allows the user to diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 0aefa82f78ba4..8e015f84f1120 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1137,6 +1137,22 @@ private[spark] class DAGScheduler( } } + /** + * PythonRunner needs to know what the pyspark memory setting is for the profile being run. + * Pass it in the local properties of the task if it's set for the stage profile. + */ + private def addPysparkMemToProperties(stage: Stage, properties: Properties): Unit = { + val pysparkMem = if (stage.resourceProfileId == DEFAULT_RESOURCE_PROFILE_ID) { + logDebug("Using the default pyspark executor memory") + sc.conf.get(PYSPARK_EXECUTOR_MEMORY) + } else { + val rp = sc.resourceProfileManager.resourceProfileFromId(stage.resourceProfileId) + logDebug(s"Using profile ${stage.resourceProfileId} pyspark executor memory") + rp.getPysparkMemory + } + pysparkMem.map(m => properties.setProperty(PYSPARK_MEMORY_PROPERTY, m.toString)) + } + /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int): Unit = { logDebug("submitMissingTasks(" + stage + ")") @@ -1156,15 +1172,7 @@ private[spark] class DAGScheduler( // Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage val properties = jobIdToActiveJob(jobId).properties - val pysparkMem = if (stage.resourceProfileId == DEFAULT_RESOURCE_PROFILE_ID) { - logDebug("Using the default pyspark executor memory") - sc.conf.get(PYSPARK_EXECUTOR_MEMORY) - } else { - val rp = sc.resourceProfileManager.resourceProfileFromId(stage.resourceProfileId) - logDebug(s"Using profile ${stage.resourceProfileId} pyspark executor memory") - rp.getPysparkMemory - } - pysparkMem.map(m => properties.setProperty(PYSPARK_MEMORY_PROPERTY, m.toString)) + addPysparkMemToProperties(stage, properties) runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are