Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/branch-2.1' into branch-2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Dec 2, 2016
2 parents 626f60e + f537632 commit 91e6ea5
Show file tree
Hide file tree
Showing 62 changed files with 989 additions and 517 deletions.
19 changes: 9 additions & 10 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,6 @@ setMethod("predict", signature(object = "KMeansModel"),
#' of L1 and L2. Default is 0.0 which is an L2 penalty.
#' @param maxIter maximum iteration number.
#' @param tol convergence tolerance of iterations.
#' @param fitIntercept whether to fit an intercept term.
#' @param family the name of family which is a description of the label distribution to be used in the model.
#' Supported options:
#' \itemize{
Expand Down Expand Up @@ -747,11 +746,11 @@ setMethod("predict", signature(object = "KMeansModel"),
#' \dontrun{
#' sparkR.session()
#' # binary logistic regression
#' label <- c(1.0, 1.0, 1.0, 0.0, 0.0)
#' feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
#' binary_data <- as.data.frame(cbind(label, feature))
#' label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
#' features <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
#' binary_data <- as.data.frame(cbind(label, features))
#' binary_df <- createDataFrame(binary_data)
#' blr_model <- spark.logit(binary_df, label ~ feature, thresholds = 1.0)
#' blr_model <- spark.logit(binary_df, label ~ features, thresholds = 1.0)
#' blr_predict <- collect(select(predict(blr_model, binary_df), "prediction"))
#'
#' # summary of binary logistic regression
Expand Down Expand Up @@ -783,7 +782,7 @@ setMethod("predict", signature(object = "KMeansModel"),
#' @note spark.logit since 2.1.0
setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, regParam = 0.0, elasticNetParam = 0.0, maxIter = 100,
tol = 1E-6, fitIntercept = TRUE, family = "auto", standardization = TRUE,
tol = 1E-6, family = "auto", standardization = TRUE,
thresholds = 0.5, weightCol = NULL, aggregationDepth = 2,
probabilityCol = "probability") {
formula <- paste(deparse(formula), collapse = "")
Expand All @@ -795,10 +794,10 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula")
jobj <- callJStatic("org.apache.spark.ml.r.LogisticRegressionWrapper", "fit",
data@sdf, formula, as.numeric(regParam),
as.numeric(elasticNetParam), as.integer(maxIter),
as.numeric(tol), as.logical(fitIntercept),
as.character(family), as.logical(standardization),
as.array(thresholds), as.character(weightCol),
as.integer(aggregationDepth), as.character(probabilityCol))
as.numeric(tol), as.character(family),
as.logical(standardization), as.array(thresholds),
as.character(weightCol), as.integer(aggregationDepth),
as.character(probabilityCol))
new("LogisticRegressionModel", jobj = jobj)
})

Expand Down
46 changes: 23 additions & 23 deletions R/pkg/inst/tests/testthat/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,6 @@ test_that("spark.glm and predict", {
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

# binomial family
binomialTraining <- training[training$Species %in% c("versicolor", "virginica"), ]
model <- spark.glm(binomialTraining, Species ~ Sepal_Length + Sepal_Width,
family = binomial(link = "logit"))
prediction <- predict(model, binomialTraining)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
expected <- c("virginica", "virginica", "virginica", "versicolor", "virginica",
"versicolor", "virginica", "versicolor", "virginica", "versicolor")
expect_equal(as.list(take(select(prediction, "prediction"), 10))[[1]], expected)

# poisson family
model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
family = poisson(link = identity))
Expand Down Expand Up @@ -138,10 +128,10 @@ test_that("spark.glm summary", {
expect_equal(stats$aic, rStats$aic)

# Test spark.glm works with weighted dataset
a1 <- c(0, 1, 2, 3, 4)
a2 <- c(5, 2, 1, 3, 2)
w <- c(1, 2, 3, 4, 5)
b <- c(1, 0, 1, 0, 0)
a1 <- c(0, 1, 2, 3)
a2 <- c(5, 2, 1, 3)
w <- c(1, 2, 3, 4)
b <- c(1, 0, 1, 0)
data <- as.data.frame(cbind(a1, a2, w, b))
df <- suppressWarnings(createDataFrame(data))

Expand All @@ -168,7 +158,7 @@ test_that("spark.glm summary", {
data <- as.data.frame(cbind(a1, a2, b))
df <- suppressWarnings(createDataFrame(data))
regStats <- summary(spark.glm(df, b ~ a1 + a2, regParam = 1.0))
expect_equal(regStats$aic, 14.00976, tolerance = 1e-4) # 14.00976 is from summary() result
expect_equal(regStats$aic, 13.32836, tolerance = 1e-4) # 13.32836 is from summary() result

# Test spark.glm works on collinear data
A <- matrix(c(1, 2, 3, 4, 2, 4, 6, 8), 4, 2)
Expand Down Expand Up @@ -646,30 +636,30 @@ test_that("spark.isotonicRegression", {

test_that("spark.logit", {
# test binary logistic regression
label <- c(1.0, 1.0, 1.0, 0.0, 0.0)
label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
binary_data <- as.data.frame(cbind(label, feature))
binary_df <- createDataFrame(binary_data)

blr_model <- spark.logit(binary_df, label ~ feature, thresholds = 1.0)
blr_predict <- collect(select(predict(blr_model, binary_df), "prediction"))
expect_equal(blr_predict$prediction, c(0, 0, 0, 0, 0))
expect_equal(blr_predict$prediction, c("0.0", "0.0", "0.0", "0.0", "0.0"))
blr_model1 <- spark.logit(binary_df, label ~ feature, thresholds = 0.0)
blr_predict1 <- collect(select(predict(blr_model1, binary_df), "prediction"))
expect_equal(blr_predict1$prediction, c(1, 1, 1, 1, 1))
expect_equal(blr_predict1$prediction, c("1.0", "1.0", "1.0", "1.0", "1.0"))

# test summary of binary logistic regression
blr_summary <- summary(blr_model)
blr_fmeasure <- collect(select(blr_summary$fMeasureByThreshold, "threshold", "F-Measure"))
expect_equal(blr_fmeasure$threshold, c(0.8221347, 0.7884005, 0.6674709, 0.3785437, 0.3434487),
expect_equal(blr_fmeasure$threshold, c(0.6565513, 0.6214563, 0.3325291, 0.2115995, 0.1778653),
tolerance = 1e-4)
expect_equal(blr_fmeasure$"F-Measure", c(0.5000000, 0.8000000, 0.6666667, 0.8571429, 0.7500000),
expect_equal(blr_fmeasure$"F-Measure", c(0.6666667, 0.5000000, 0.8000000, 0.6666667, 0.5714286),
tolerance = 1e-4)
blr_precision <- collect(select(blr_summary$precisionByThreshold, "threshold", "precision"))
expect_equal(blr_precision$precision, c(1.0000000, 1.0000000, 0.6666667, 0.7500000, 0.6000000),
expect_equal(blr_precision$precision, c(1.0000000, 0.5000000, 0.6666667, 0.5000000, 0.4000000),
tolerance = 1e-4)
blr_recall <- collect(select(blr_summary$recallByThreshold, "threshold", "recall"))
expect_equal(blr_recall$recall, c(0.3333333, 0.6666667, 0.6666667, 1.0000000, 1.0000000),
expect_equal(blr_recall$recall, c(0.5000000, 0.5000000, 1.0000000, 1.0000000, 1.0000000),
tolerance = 1e-4)

# test model save and read
Expand All @@ -683,6 +673,16 @@ test_that("spark.logit", {
expect_error(summary(blr_model2))
unlink(modelPath)

# test prediction label as text
training <- suppressWarnings(createDataFrame(iris))
binomial_training <- training[training$Species %in% c("versicolor", "virginica"), ]
binomial_model <- spark.logit(binomial_training, Species ~ Sepal_Length + Sepal_Width)
prediction <- predict(binomial_model, binomial_training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
expected <- c("virginica", "virginica", "virginica", "versicolor", "virginica",
"versicolor", "virginica", "versicolor", "virginica", "versicolor")
expect_equal(as.list(take(select(prediction, "prediction"), 10))[[1]], expected)

# test multinomial logistic regression
label <- c(0.0, 1.0, 2.0, 0.0, 0.0)
feature1 <- c(4.845940, 5.64480, 7.430381, 6.464263, 5.555667)
Expand All @@ -694,7 +694,7 @@ test_that("spark.logit", {

model <- spark.logit(df, label ~., family = "multinomial", thresholds = c(0, 1, 1))
predict1 <- collect(select(predict(model, df), "prediction"))
expect_equal(predict1$prediction, c(0, 0, 0, 0, 0))
expect_equal(predict1$prediction, c("0.0", "0.0", "0.0", "0.0", "0.0"))
# Summary of multinomial logistic regression is not implemented yet
expect_error(summary(model))
})
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import java.io._
import java.lang.reflect.Constructor
import java.net.{MalformedURLException, URI}
import java.net.{URI}
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ object HiveCatalogMetrics extends Source {
*/
val METRIC_HIVE_CLIENT_CALLS = metricRegistry.counter(MetricRegistry.name("hiveClientCalls"))

/**
* Tracks the total number of Spark jobs launched for parallel file listing.
*/
val METRIC_PARALLEL_LISTING_JOB_COUNT = metricRegistry.counter(
MetricRegistry.name("parallelListingJobCount"))

/**
* Resets the values of all metrics to zero. This is useful in tests.
*/
Expand All @@ -98,11 +104,13 @@ object HiveCatalogMetrics extends Source {
METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
METRIC_HIVE_CLIENT_CALLS.dec(METRIC_HIVE_CLIENT_CALLS.getCount())
METRIC_PARALLEL_LISTING_JOB_COUNT.dec(METRIC_PARALLEL_LISTING_JOB_COUNT.getCount())
}

// clients can use these to avoid classloader issues with the codahale classes
def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n)
def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n)
def incrementHiveClientCalls(n: Int): Unit = METRIC_HIVE_CLIENT_CALLS.inc(n)
def incrementParallelListingJobCount(n: Int): Unit = METRIC_PARALLEL_LISTING_JOB_COUNT.inc(n)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{InputStream, IOException}
import scala.io.Source

import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
import org.json4s.jackson.JsonMethods._

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -87,6 +88,12 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
// Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1.
// It's safe since no place uses them.
logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
case e: UnrecognizedPropertyException if e.getMessage != null && e.getMessage.startsWith(
"Unrecognized field \"queryStatus\" " +
"(class org.apache.spark.sql.streaming.StreamingQueryListener$") =>
// Ignore events generated by Structured Streaming in Spark 2.0.2
// It's safe since no place uses them.
logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
case jpe: JsonParseException =>
// We can only ignore exception from last line of the file that might be truncated
// the last entry may not be the very last line in the event log, but we treat it
Expand Down
49 changes: 15 additions & 34 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
// if we find that it's okay.
private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)

private val displayPeakExecutionMemory = parent.conf.getBoolean("spark.sql.unsafe.enabled", true)

private def getLocalitySummaryString(stageData: StageUIData): String = {
val localities = stageData.taskData.values.map(_.taskInfo.taskLocality)
val localityCounts = localities.groupBy(identity).mapValues(_.size)
Expand Down Expand Up @@ -252,15 +250,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<span class="additional-metric-title">Getting Result Time</span>
</span>
</li>
{if (displayPeakExecutionMemory) {
<li>
<span data-toggle="tooltip"
title={ToolTips.PEAK_EXECUTION_MEMORY} data-placement="right">
<input type="checkbox" name={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}/>
<span class="additional-metric-title">Peak Execution Memory</span>
</span>
</li>
}}
<li>
<span data-toggle="tooltip"
title={ToolTips.PEAK_EXECUTION_MEMORY} data-placement="right">
<input type="checkbox" name={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}/>
<span class="additional-metric-title">Peak Execution Memory</span>
</span>
</li>
</ul>
</div>
</div>
Expand Down Expand Up @@ -532,13 +528,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{serializationQuantiles}
</tr>,
<tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>,
if (displayPeakExecutionMemory) {
<tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
{peakExecutionMemoryQuantiles}
</tr>
} else {
Nil
},
<tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
{peakExecutionMemoryQuantiles}
</tr>,
if (stageData.hasInput) <tr>{inputQuantiles}</tr> else Nil,
if (stageData.hasOutput) <tr>{outputQuantiles}</tr> else Nil,
if (stageData.hasShuffleRead) {
Expand Down Expand Up @@ -1166,9 +1158,6 @@ private[ui] class TaskPagedTable(
desc: Boolean,
executorsListener: ExecutorsListener) extends PagedTable[TaskTableRowData] {

// We only track peak memory used for unsafe operators
private val displayPeakExecutionMemory = conf.getBoolean("spark.sql.unsafe.enabled", true)

override def tableId: String = "task-table"

override def tableCssClass: String =
Expand Down Expand Up @@ -1217,14 +1206,8 @@ private[ui] class TaskPagedTable(
("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
("GC Time", ""),
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
{
if (displayPeakExecutionMemory) {
Seq(("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY))
} else {
Nil
}
} ++
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME),
("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
{if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
{if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++
{if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
Expand Down Expand Up @@ -1316,11 +1299,9 @@ private[ui] class TaskPagedTable(
<td class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
{UIUtils.formatDuration(task.gettingResultTime)}
</td>
{if (displayPeakExecutionMemory) {
<td class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
{Utils.bytesToString(task.peakExecutionMemoryUsed)}
</td>
}}
<td class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
{Utils.bytesToString(task.peakExecutionMemoryUsed)}
</td>
{if (task.accumulators.nonEmpty) {
<td>{Unparsed(task.accumulators.get)}</td>
}}
Expand Down
16 changes: 3 additions & 13 deletions core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,15 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {

private val peakExecutionMemory = 10

test("peak execution memory only displayed if unsafe is enabled") {
val unsafeConf = "spark.sql.unsafe.enabled"
val conf = new SparkConf(false).set(unsafeConf, "true")
test("peak execution memory should displayed") {
val conf = new SparkConf(false)
val html = renderStagePage(conf).toString().toLowerCase
val targetString = "peak execution memory"
assert(html.contains(targetString))
// Disable unsafe and make sure it's not there
val conf2 = new SparkConf(false).set(unsafeConf, "false")
val html2 = renderStagePage(conf2).toString().toLowerCase
assert(!html2.contains(targetString))
// Avoid setting anything; it should be displayed by default
val conf3 = new SparkConf(false)
val html3 = renderStagePage(conf3).toString().toLowerCase
assert(html3.contains(targetString))
}

test("SPARK-10543: peak execution memory should be per-task rather than cumulative") {
val unsafeConf = "spark.sql.unsafe.enabled"
val conf = new SparkConf(false).set(unsafeConf, "true")
val conf = new SparkConf(false)
val html = renderStagePage(conf).toString().toLowerCase
// verify min/25/50/75/max show task value not cumulative values
assert(html.contains(s"<td>$peakExecutionMemory.0 b</td>" * 5))
Expand Down
45 changes: 27 additions & 18 deletions dev/create-release/release-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ if [[ "$1" == "package" ]]; then
NAME=$1
FLAGS=$2
ZINC_PORT=$3
BUILD_PIP_PACKAGE=$4
cp -r spark spark-$SPARK_VERSION-bin-$NAME

cd spark-$SPARK_VERSION-bin-$NAME
Expand All @@ -170,24 +171,32 @@ if [[ "$1" == "package" ]]; then
# Get maven home set by MVN
MVN_HOME=`$MVN -version 2>&1 | grep 'Maven home' | awk '{print $NF}'`

echo "Creating distribution"
./dev/make-distribution.sh --name $NAME --mvn $MVN_HOME/bin/mvn --tgz --pip $FLAGS \
-DzincPort=$ZINC_PORT 2>&1 > ../binary-release-$NAME.log
cd ..

echo "Copying and signing python distribution"
PYTHON_DIST_NAME=pyspark-$PYSPARK_VERSION.tar.gz
cp spark-$SPARK_VERSION-bin-$NAME/python/dist/$PYTHON_DIST_NAME .

echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --armour \
--output $PYTHON_DIST_NAME.asc \
--detach-sig $PYTHON_DIST_NAME
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
MD5 $PYTHON_DIST_NAME > \
$PYTHON_DIST_NAME.md5
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
SHA512 $PYTHON_DIST_NAME > \
$PYTHON_DIST_NAME.sha
if [ -z "$BUILD_PIP_PACKAGE" ]; then
echo "Creating distribution without PIP package"
./dev/make-distribution.sh --name $NAME --mvn $MVN_HOME/bin/mvn --tgz $FLAGS \
-DzincPort=$ZINC_PORT 2>&1 > ../binary-release-$NAME.log
cd ..
else
echo "Creating distribution with PIP package"
./dev/make-distribution.sh --name $NAME --mvn $MVN_HOME/bin/mvn --tgz --pip $FLAGS \
-DzincPort=$ZINC_PORT 2>&1 > ../binary-release-$NAME.log
cd ..

echo "Copying and signing python distribution"
PYTHON_DIST_NAME=pyspark-$PYSPARK_VERSION.tar.gz
cp spark-$SPARK_VERSION-bin-$NAME/python/dist/$PYTHON_DIST_NAME .

echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --armour \
--output $PYTHON_DIST_NAME.asc \
--detach-sig $PYTHON_DIST_NAME
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
MD5 $PYTHON_DIST_NAME > \
$PYTHON_DIST_NAME.md5
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
SHA512 $PYTHON_DIST_NAME > \
$PYTHON_DIST_NAME.sha
fi

echo "Copying and signing regular binary distribution"
cp spark-$SPARK_VERSION-bin-$NAME/spark-$SPARK_VERSION-bin-$NAME.tgz .
Expand All @@ -211,7 +220,7 @@ if [[ "$1" == "package" ]]; then
make_binary_release "hadoop2.3" "-Phadoop-2.3 $FLAGS" "3033" &
make_binary_release "hadoop2.4" "-Phadoop-2.4 $FLAGS" "3034" &
make_binary_release "hadoop2.6" "-Phadoop-2.6 $FLAGS" "3035" &
make_binary_release "hadoop2.7" "-Phadoop-2.7 $FLAGS" "3036" &
make_binary_release "hadoop2.7" "-Phadoop-2.7 $FLAGS" "3036" "withpip" &
make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn -Pmesos" "3037" &
make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn -Pmesos" "3038" &
wait
Expand Down
Loading

0 comments on commit 91e6ea5

Please sign in to comment.