Skip to content

Commit

Permalink
[SPARK-23926][SQL] Merging current master to the feature branch.
Browse files Browse the repository at this point in the history
  • Loading branch information
mn-mikke authored and mn-mikke committed Apr 16, 2018
2 parents aca511c + 6931022 commit 460fea6
Show file tree
Hide file tree
Showing 93 changed files with 3,643 additions and 2,646 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,17 @@ class HadoopMapReduceCommitProtocol(
logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
for (part <- partitionPaths) {
val finalPartPath = new Path(path, part)
fs.delete(finalPartPath, true)
if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) {
// According to the official hadoop FileSystem API spec, delete op should assume
// the destination is no longer present regardless of return value, thus we do not
// need to double check if finalPartPath exists before rename.
// Also in our case, based on the spec, delete returns false only when finalPartPath
// does not exist. When this happens, we need to take action if parent of finalPartPath
// also does not exist(e.g. the scenario described on SPARK-23815), because
// FileSystem API spec on rename op says the rename dest(finalPartPath) must have
// a parent that exists, otherwise we may get unexpected result on the rename.
fs.mkdirs(finalPartPath.getParent)
}
fs.rename(new Path(stagingDir, part), finalPartPath)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
val previous = new java.util.Date(prevLastReportTimestamp)
logWarning(s"Dropped $droppedEvents events from $name since $previous.")
logWarning(s"Dropped $droppedCount events from $name since $previous.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object KolmogorovSmirnovTest {
dataset: Dataset[_],
sampleCol: String,
cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = {
test(dataset, sampleCol, (x: Double) => cdf.call(x))
test(dataset, sampleCol, (x: Double) => cdf.call(x).toDouble)
}

/**
Expand Down
9 changes: 7 additions & 2 deletions mllib/src/test/java/org/apache/spark/SharedSparkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ public void setUp() throws IOException {

@After
public void tearDown() {
spark.stop();
spark = null;
try {
spark.stop();
spark = null;
} finally {
SparkSession.clearDefaultSession();
SparkSession.clearActiveSession();
}
}
}
15 changes: 15 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2079,6 +2079,21 @@ def size(col):
return Column(sc._jvm.functions.size(_to_java_column(col)))


@since(2.4)
def array_max(col):
"""
Collection function: returns the maximum value of the array.
:param col: name of column or expression
>>> df = spark.createDataFrame([([2, 1, 3],), ([None, 10, -1],)], ['data'])
>>> df.select(array_max(df.data).alias('max')).collect()
[Row(max=3), Row(max=10)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.array_max(_to_java_column(col)))


@since(1.5)
def sort_array(col, asc=True):
"""
Expand Down
87 changes: 77 additions & 10 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,12 @@ def __init__(self, key, value):
self.value = value


class ReusedSQLTestCase(ReusedPySparkTestCase):
@classmethod
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.spark = SparkSession(cls.sc)

@classmethod
def tearDownClass(cls):
ReusedPySparkTestCase.tearDownClass()
cls.spark.stop()
class SQLTestUtils(object):
"""
This util assumes the instance of this to have 'spark' attribute, having a spark session.
It is usually used with 'ReusedSQLTestCase' class but can be used if you feel sure the
the implementation of this class has 'spark' attribute.
"""

@contextmanager
def sql_conf(self, pairs):
Expand All @@ -204,6 +200,7 @@ def sql_conf(self, pairs):
`value` to the configuration `key` and then restores it back when it exits.
"""
assert isinstance(pairs, dict), "pairs should be a dictionary."
assert hasattr(self, "spark"), "it should have 'spark' attribute, having a spark session."

keys = pairs.keys()
new_values = pairs.values()
Expand All @@ -219,6 +216,18 @@ def sql_conf(self, pairs):
else:
self.spark.conf.set(key, old_value)


class ReusedSQLTestCase(ReusedPySparkTestCase, SQLTestUtils):
@classmethod
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.spark = SparkSession(cls.sc)

@classmethod
def tearDownClass(cls):
ReusedPySparkTestCase.tearDownClass()
cls.spark.stop()

def assertPandasEqual(self, expected, result):
msg = ("DataFrames are not equal: " +
"\n\nExpected:\n%s\n%s" % (expected, expected.dtypes) +
Expand Down Expand Up @@ -3066,6 +3075,64 @@ def test_sparksession_with_stopped_sparkcontext(self):
sc.stop()


class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils):
# These tests are separate because it uses 'spark.sql.queryExecutionListeners' which is
# static and immutable. This can't be set or unset, for example, via `spark.conf`.

@classmethod
def setUpClass(cls):
import glob
from pyspark.find_spark_home import _find_spark_home

SPARK_HOME = _find_spark_home()
filename_pattern = (
"sql/core/target/scala-*/test-classes/org/apache/spark/sql/"
"TestQueryExecutionListener.class")
if not glob.glob(os.path.join(SPARK_HOME, filename_pattern)):
raise unittest.SkipTest(
"'org.apache.spark.sql.TestQueryExecutionListener' is not "
"available. Will skip the related tests.")

# Note that 'spark.sql.queryExecutionListeners' is a static immutable configuration.
cls.spark = SparkSession.builder \
.master("local[4]") \
.appName(cls.__name__) \
.config(
"spark.sql.queryExecutionListeners",
"org.apache.spark.sql.TestQueryExecutionListener") \
.getOrCreate()

@classmethod
def tearDownClass(cls):
cls.spark.stop()

def tearDown(self):
self.spark._jvm.OnSuccessCall.clear()

def test_query_execution_listener_on_collect(self):
self.assertFalse(
self.spark._jvm.OnSuccessCall.isCalled(),
"The callback from the query execution listener should not be called before 'collect'")
self.spark.sql("SELECT * FROM range(1)").collect()
self.assertTrue(
self.spark._jvm.OnSuccessCall.isCalled(),
"The callback from the query execution listener should be called after 'collect'")

@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
_pandas_requirement_message or _pyarrow_requirement_message)
def test_query_execution_listener_on_collect_with_arrow(self):
with self.sql_conf({"spark.sql.execution.arrow.enabled": True}):
self.assertFalse(
self.spark._jvm.OnSuccessCall.isCalled(),
"The callback from the query execution listener should not be "
"called before 'toPandas'")
self.spark.sql("SELECT * FROM range(1)").toPandas()
self.assertTrue(
self.spark._jvm.OnSuccessCall.isCalled(),
"The callback from the query execution listener should be called after 'toPandas'")


class SparkSessionTests(PySparkTestCase):

# This test is separate because it's closely related with session's start and stop.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,5 @@ private[spark] object Config extends Logging {
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."

val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource}
import org.apache.spark.internal.config.ConfigEntry

private[spark] sealed trait KubernetesRoleSpecificConf

/*
* Structure containing metadata for Kubernetes logic that builds a Spark driver.
*/
private[spark] case class KubernetesDriverSpecificConf(
mainAppResource: Option[MainAppResource],
mainClass: String,
appName: String,
appArgs: Seq[String]) extends KubernetesRoleSpecificConf

/*
* Structure containing metadata for Kubernetes logic that builds a Spark executor.
*/
private[spark] case class KubernetesExecutorSpecificConf(
executorId: String,
driverPod: Pod)
extends KubernetesRoleSpecificConf

/**
* Structure containing metadata for Kubernetes logic to build Spark pods.
*/
private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
sparkConf: SparkConf,
roleSpecificConf: T,
appResourceNamePrefix: String,
appId: String,
roleLabels: Map[String, String],
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
roleEnvs: Map[String, String]) {

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

def sparkJars(): Seq[String] = sparkConf
.getOption("spark.jars")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])

def sparkFiles(): Seq[String] = sparkConf
.getOption("spark.files")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])

def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)

def imagePullSecrets(): Seq[LocalObjectReference] = {
sparkConf
.get(IMAGE_PULL_SECRETS)
.map(_.split(","))
.getOrElse(Array.empty[String])
.map(_.trim)
.map { secret =>
new LocalObjectReferenceBuilder().withName(secret).build()
}
}

def nodeSelector(): Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)

def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)

def get(conf: String): String = sparkConf.get(conf)

def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue)

def getOption(key: String): Option[String] = sparkConf.getOption(key)
}

private[spark] object KubernetesConf {
def createDriverConf(
sparkConf: SparkConf,
appName: String,
appResourceNamePrefix: String,
appId: String,
mainAppResource: Option[MainAppResource],
mainClass: String,
appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
val sparkConfWithMainAppJar = sparkConf.clone()
mainAppResource.foreach {
case JavaMainAppResource(res) =>
val previousJars = sparkConf
.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty)
if (!previousJars.contains(res)) {
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
}
}

val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations.")
require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations.")
val driverLabels = driverCustomLabels ++ Map(
SPARK_APP_ID_LABEL -> appId,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
val driverAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
val driverSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)

KubernetesConf(
sparkConfWithMainAppJar,
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
appResourceNamePrefix,
appId,
driverLabels,
driverAnnotations,
driverSecretNamesToMountPaths,
driverEnvs)
}

def createExecutorConf(
sparkConf: SparkConf,
executorId: String,
appId: String,
driverPod: Pod): KubernetesConf[KubernetesExecutorSpecificConf] = {
val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
require(
!executorCustomLabels.contains(SPARK_APP_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
require(
!executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
" Spark.")
require(
!executorCustomLabels.contains(SPARK_ROLE_LABEL),
s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
val executorLabels = Map(
SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> appId,
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
executorCustomLabels
val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
val executorSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
val executorEnv = sparkConf.getExecutorEnv.toMap

KubernetesConf(
sparkConf.clone(),
KubernetesExecutorSpecificConf(executorId, driverPod),
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX),
appId,
executorLabels,
executorAnnotations,
executorSecrets,
executorEnv)
}
}
Loading

0 comments on commit 460fea6

Please sign in to comment.