Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29641][PYTHON][CORE] Stage Level Sched: Add python api's and tests #28085

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
acdf6e8
Stage level scheduling python api support
tgravescs Mar 27, 2020
24c1a96
revert pom changes
tgravescs Mar 27, 2020
5647535
Fix log messages
tgravescs Mar 30, 2020
af69e4b
Try changing way we pass pyspark memory
tgravescs Mar 30, 2020
4a7f39a
Change to use local property to pass pyspark memory
tgravescs Mar 31, 2020
27e1a10
add missing api to get java map
tgravescs Mar 31, 2020
af602b6
Add java api test
tgravescs Mar 31, 2020
2b515c8
cleanup
tgravescs Mar 31, 2020
a052427
fix indentation
tgravescs Mar 31, 2020
6a90fbe
fix newline around markup in python
tgravescs Mar 31, 2020
2d754f7
Update the version added for rdd api's
tgravescs Apr 1, 2020
1071f40
make java return values immutable
tgravescs Apr 2, 2020
e81a480
Try reverting java api suite
tgravescs Apr 6, 2020
ae8e312
Fix minor review comments
tgravescs Apr 6, 2020
0d7c79f
Update to pass the executor cores into PythonRunner
tgravescs Apr 6, 2020
956dc84
move python files to resources module and misc fixes
tgravescs Apr 7, 2020
32bca95
Fix how we pass the pyspark memory and cores
tgravescs Apr 7, 2020
e494c05
Fix python imports
tgravescs Apr 8, 2020
3e15ed9
Fix java api suite test from hanging
tgravescs Apr 8, 2020
c3c885a
change test to not rely on being able to set pyspark memory after
tgravescs Apr 8, 2020
3562539
Add in pyspark.resource module
tgravescs Apr 8, 2020
544119e
review comments
tgravescs Apr 13, 2020
8469038
Changes to allow using resource apis without SparkContext
tgravescs Apr 15, 2020
a0b9137
More changes to call without SparkContext
tgravescs Apr 15, 2020
2235654
cleanup
tgravescs Apr 15, 2020
bf1a215
fix style issues
tgravescs Apr 15, 2020
62cb02c
Change getResourceProfile to return None to match scala side
tgravescs Apr 15, 2020
a6e9ac2
Change to make python versions do same thing as the scala versions as
tgravescs Apr 16, 2020
528094c
add pyspark resource module to testing module
tgravescs Apr 20, 2020
89be02e
Update names of function/variable
tgravescs Apr 22, 2020
354fb0c
Other variable name changes
tgravescs Apr 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.rdd.RDD
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

Expand All @@ -49,6 +50,20 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
*/
def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel))

/**
* Specify a ResourceProfile to use when calculating this RDD. This is only supported on
* certain cluster managers and currently requires dynamic allocation to be enabled.
* It will result in new executors with the resources specified being acquired to
* calculate the RDD.
*/
def withResources(prof: ResourceProfile): JavaRDD[T] = wrapRDD(rdd.withResources(prof))
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved

/**
* Get the ResourceProfile specified with this RDD or None if it wasn't specified.
* @return the user specified ResourceProfile or null if none was specified
*/
def getResourceProfile(): ResourceProfile = rdd.getResourceProfile()

/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
* This method blocks until all blocks are deleted.
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES}
import org.apache.spark.internal.config.Python._
import org.apache.spark.resource.ResourceProfile.PYSPARK_MEMORY_PROPERTY
import org.apache.spark.security.SocketAuthHelper
import org.apache.spark.util._

Expand Down Expand Up @@ -85,9 +86,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
private val conf = SparkEnv.get.conf
protected val bufferSize: Int = conf.get(BUFFER_SIZE)
private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
// each python worker gets an equal part of the allocation. the worker pool will grow to the
// number of concurrent tasks, which is determined by the number of cores in this executor.
private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY).map(_ / conf.get(EXECUTOR_CORES))

// All the Python functions should have the same exec, version and envvars.
protected val envVars: java.util.Map[String, String] = funcs.head.funcs.head.envVars
Expand All @@ -106,6 +104,12 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
// Authentication helper used when serving method calls via socket from Python side.
private lazy val authHelper = new SocketAuthHelper(conf)

// each python worker gets an equal part of the allocation. the worker pool will grow to the
// number of concurrent tasks, which is determined by the number of cores in this executor.
private def getWorkerMemoryMb(mem: Option[Long]): Option[Long] = {
mem.map(_ / conf.get(EXECUTOR_CORES))
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
}

def compute(
inputIterator: Iterator[IN],
partitionIndex: Int,
Expand All @@ -124,8 +128,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
if (reuseWorker) {
envVars.put("SPARK_REUSE_WORKER", "1")
}
if (memoryMb.isDefined) {
envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", memoryMb.get.toString)
val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_PROPERTY)).map(_.toLong)
val workerMemoryMb = getWorkerMemoryMb(memoryMb)
if (workerMemoryMb.isDefined) {
envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", workerMemoryMb.get.toString)
}
envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.resource

import java.util.{Map => JMap}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
Expand All @@ -38,6 +39,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable {

def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap

def requestsJMap: JMap[String, ExecutorResourceRequest] = _executorResources.asScala.asJava
tgravescs marked this conversation as resolved.
Show resolved Hide resolved

/**
* Specify heap memory. The value specified will be converted to MiB.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
import org.apache.spark.util.Utils

/**
* Resource profile to associate with an RDD. A ResourceProfile allows the user to
Expand Down Expand Up @@ -76,6 +75,10 @@ class ResourceProfile(
taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt)
}

private[spark] def getPysparkMemory: Option[Long] = {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
executorResources.get(ResourceProfile.PYSPARK_MEM).map(_.amount.toLong)
}

/*
* This function takes into account fractional amounts for the task resource requirement.
* Spark only supports fractional amounts < 1 to basically allow for multiple tasks
Expand Down Expand Up @@ -325,4 +328,6 @@ object ResourceProfile extends Logging {
private[spark] def getTaskCpusOrDefaultForProfile(rp: ResourceProfile, conf: SparkConf): Int = {
rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK))
}

private[spark] val PYSPARK_MEMORY_PROPERTY = "resource.pyspark.memory"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.resource

import java.util.{Map => JMap}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
Expand All @@ -37,6 +38,8 @@ private[spark] class TaskResourceRequests() extends Serializable {

def requests: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap

def requestsJMap: JMap[String, TaskResourceRequest] = _taskResources.asScala.asJava
tgravescs marked this conversation as resolved.
Show resolved Hide resolved

/**
* Specify number of cpus per Task.
*
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.{RDD, RDDCheckpointData}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, PYSPARK_MEMORY_PROPERTY}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
Expand Down Expand Up @@ -1135,6 +1137,22 @@ private[spark] class DAGScheduler(
}
}

/**
* PythonRunner needs to know what the pyspark memory setting is for the profile being run.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
* Pass it in the local properties of the task if it's set for the stage profile.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
*/
private def addPysparkMemToProperties(stage: Stage, properties: Properties): Unit = {
val pysparkMem = if (stage.resourceProfileId == DEFAULT_RESOURCE_PROFILE_ID) {
logDebug("Using the default pyspark executor memory")
sc.conf.get(PYSPARK_EXECUTOR_MEMORY)
} else {
val rp = sc.resourceProfileManager.resourceProfileFromId(stage.resourceProfileId)
logDebug(s"Using profile ${stage.resourceProfileId} pyspark executor memory")
rp.getPysparkMemory
}
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
pysparkMem.map(m => properties.setProperty(PYSPARK_MEMORY_PROPERTY, m.toString))
}

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
Expand All @@ -1154,6 +1172,7 @@ private[spark] class DAGScheduler(
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties
addPysparkMemToProperties(stage, properties)

runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
Expand Down
16 changes: 16 additions & 0 deletions core/src/test/java/test/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@
import org.apache.spark.partial.BoundedDouble;
import org.apache.spark.partial.PartialResult;
import org.apache.spark.rdd.RDD;
import org.apache.spark.resource.ExecutorResourceRequests;
import org.apache.spark.resource.ResourceProfile;
import org.apache.spark.resource.ResourceProfileBuilder;
import org.apache.spark.resource.TaskResourceRequests;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.util.LongAccumulator;
Expand Down Expand Up @@ -897,6 +901,18 @@ public void persist() {
assertEquals(1, rdd.first().intValue());
}

@Test
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
public void withResources() {
ExecutorResourceRequests ereqs = new ExecutorResourceRequests().cores(4);
TaskResourceRequests treqs = new TaskResourceRequests().cpus(1);
ResourceProfile rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build();
List<String> expected = Arrays.asList("1", "2", "3", "4");
JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
in1.withResources(rp1);
assertEquals(in1.getResourceProfile(), rp1);
assertEquals(expected, in1.collect());
}
tgravescs marked this conversation as resolved.
Show resolved Hide resolved

@Test
public void iterator() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
Expand Down
8 changes: 8 additions & 0 deletions python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@
from pyspark.accumulators import Accumulator, AccumulatorParam
from pyspark.broadcast import Broadcast
from pyspark.resourceinformation import ResourceInformation
from pyspark.taskresourcerequest import TaskResourceRequest
from pyspark.executorresourcerequest import ExecutorResourceRequest
from pyspark.taskresourcerequests import TaskResourceRequests
from pyspark.executorresourcerequests import ExecutorResourceRequests
from pyspark.resourceprofilebuilder import ResourceProfileBuilder
from pyspark.resourceprofile import ResourceProfile
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
from pyspark.serializers import MarshalSerializer, PickleSerializer
from pyspark.status import *
from pyspark.taskcontext import TaskContext, BarrierTaskContext, BarrierTaskInfo
Expand Down Expand Up @@ -120,4 +126,6 @@ def wrapper(self, *args, **kwargs):
"Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer",
"StatusTracker", "SparkJobInfo", "SparkStageInfo", "Profiler", "BasicProfiler", "TaskContext",
"RDDBarrier", "BarrierTaskContext", "BarrierTaskInfo", "ResourceInformation",
"TaskResourceRequest", "TaskResourceRequests", "ExecutorResourceRequest",
"ExecutorResourceRequests", "ResourceProfile"
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
]
73 changes: 73 additions & 0 deletions python/pyspark/executorresourcerequest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


class ExecutorResourceRequest(object):
"""
.. note:: Evolving
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved

An Executor resource request. This is used in conjunction with the ResourceProfile to
programmatically specify the resources needed for an RDD that will be applied at the
stage level.

This is used to specify what the resource requirements are for an Executor and how
Spark can find out specific details about those resources. Not all the parameters are
required for every resource type. Resources like GPUs are supported and have same limitations
as using the global spark configs spark.executor.resource.gpu.*. The amount, discoveryScript,
and vendor parameters for resources are all the same parameters a user would specify through the
configs: spark.executor.resource.{resourceName}.{amount, discoveryScript, vendor}.

For instance, a user wants to allocate an Executor with GPU resources on YARN. The user has
to specify the resource name (gpu), the amount or number of GPUs per Executor,
the discovery script would be specified so that when the Executor starts up it can
discovery what GPU addresses are available for it to use because YARN doesn't tell
Spark that, then vendor would not be used because its specific for Kubernetes.

See the configuration and cluster specific docs for more details.

Use ExecutorResourceRequests class as a convenience API.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved

:param resourceName: Name of the resource
:param amount: Amount requesting
:param discoveryScript: Optional script used to discover the resources. This is required on some
cluster managers that don't tell Spark the addresses of the resources
allocated. The script runs on Executors startup to discover the addresses
of the resources available.
:param vendor: Vendor, required for some cluster managers
"""

def __init__(self, resourceName, amount, discoveryScript="", vendor=""):
"""Create a new ExecutorResourceRequest that wraps the underlying JVM object."""
from pyspark.context import SparkContext
self._jExecRequest = SparkContext._jvm.org.apache.spark.resource.ExecutorResourceRequest(
resourceName, amount, discoveryScript, vendor)

@property
def resourceName(self):
return self._jExecRequest.resourceName()

@property
def amount(self):
return self._jExecRequest.amount()

@property
def discoveryScript(self):
return self._jExecRequest.discoveryScript()

@property
def vendor(self):
return self._jExecRequest.vendor()
65 changes: 65 additions & 0 deletions python/pyspark/executorresourcerequests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pyspark.executorresourcerequest import ExecutorResourceRequest


class ExecutorResourceRequests(object):

"""
.. note:: Evolving

A set of Executor resource requests. This is used in conjunction with the
ResourceProfileBuilder to programmatically specify the resources needed for an RDD
that will be applied at the stage level.
"""

def __init__(self):
"""Create a new ExecutorResourceRequests that wraps the underlying JVM object."""
from pyspark import SparkContext
self._javaExecutorResourceRequests \
= SparkContext._jvm.org.apache.spark.resource.ExecutorResourceRequests()

def memory(self, amount):
self._javaExecutorResourceRequests.memory(amount)
return self

def memoryOverhead(self, amount):
self._javaExecutorResourceRequests.memoryOverhead(amount)
return self

def pysparkMemory(self, amount):
self._javaExecutorResourceRequests.pysparkMemory(amount)
return self

def cores(self, amount):
self._javaExecutorResourceRequests.cores(amount)
return self

def resource(self, resourceName, amount, discoveryScript="", vendor=""):
self._javaExecutorResourceRequests.resource(resourceName, amount, discoveryScript, vendor)
return self

@property
def requests(self):
execRes = self._javaExecutorResourceRequests.requestsJMap()
result = {}
# convert back to python ExecutorResourceRequest
for k, v in execRes.items():
result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(),
v.discoveryScript(), v.vendor())
return result
1 change: 1 addition & 0 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def killChild():
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.ml.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.resource.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.api.python.*")
Expand Down
Loading