Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark
Browse files Browse the repository at this point in the history
  • Loading branch information
phatak-dev committed Apr 24, 2015
2 parents 00bc819 + d3a302d commit 2c997c5
Show file tree
Hide file tree
Showing 49 changed files with 2,317 additions and 1,332 deletions.
8 changes: 7 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -790,9 +790,12 @@ setMethod("$", signature(x = "DataFrame"),

setMethod("$<-", signature(x = "DataFrame"),
function(x, name, value) {
stopifnot(class(value) == "Column")
stopifnot(class(value) == "Column" || is.null(value))
cols <- columns(x)
if (name %in% cols) {
if (is.null(value)) {
cols <- Filter(function(c) { c != name }, cols)
}
cols <- lapply(cols, function(c) {
if (c == name) {
alias(value, name)
Expand All @@ -802,6 +805,9 @@ setMethod("$<-", signature(x = "DataFrame"),
})
nx <- select(x, cols)
} else {
if (is.null(value)) {
return(x)
}
nx <- withColumn(x, name, value)
}
x@sdf <- nx@sdf
Expand Down
5 changes: 5 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,11 @@ test_that("select operators", {
df$age2 <- df$age * 2
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == df$age * 2)), 2)

df$age2 <- NULL
expect_equal(columns(df), c("name", "age"))
df$age3 <- NULL
expect_equal(columns(df), c("name", "age"))
})

test_that("select with column", {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

/**
* A zero-argument function that returns an R.
*/
public interface Function0<R> extends Serializable {
public R call() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
Option(propertiesFile).foreach { filename =>
Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
if (k.startsWith("spark.")) {
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
} else {
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
}
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
}
}
defaultProperties
Expand All @@ -97,6 +93,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
// Populate `sparkProperties` map from properties file
mergeDefaultSparkProperties()
// Remove keys that don't start with "spark." from `sparkProperties`.
ignoreNonSparkProperties()
// Use `sparkProperties` map along with env vars to fill in any missing parameters
loadEnvironmentArguments()

Expand All @@ -117,6 +115,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
}

/**
* Remove keys that don't start with "spark." from `sparkProperties`.
*/
private def ignoreNonSparkProperties(): Unit = {
sparkProperties.foreach { case (k, v) =>
if (!k.startsWith("spark.")) {
sparkProperties -= k
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
}
}
}

/**
* Load arguments from environment variables, Spark properties etc.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}


/**
* A class that provides application history from event logs stored in the file system.
* This provider checks for new finished applications in the background periodically and
Expand Down Expand Up @@ -76,6 +75,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()

// List of applications to be deleted by event log cleaner.
private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]

// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
Expand Down Expand Up @@ -266,34 +268,40 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def cleanLogs(): Unit = {
try {
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000

val now = System.currentTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()

// Scan all logs from the log directory.
// Only completed applications older than the specified max age will be deleted.
applications.values.foreach { info =>
if (now - info.lastUpdated <= maxAge) {
if (now - info.lastUpdated <= maxAge || !info.completed) {
appsToRetain += (info.id -> info)
} else {
appsToClean += info
}
}

applications = appsToRetain

// Scan all logs from the log directory.
// Only directories older than the specified max age will be deleted
statusList.foreach { dir =>
val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
appsToClean.foreach { info =>
try {
if (now - dir.getModificationTime() > maxAge) {
// if path is a directory and set to true,
// the directory is deleted else throws an exception
fs.delete(dir.getPath, true)
val path = new Path(logDir, info.logPath)
if (fs.exists(path)) {
fs.delete(path, true)
}
} catch {
case t: IOException => logError(s"IOException in cleaning logs of $dir", t)
case e: AccessControlException =>
logInfo(s"No permission to delete ${info.logPath}, ignoring.")
case t: IOException =>
logError(s"IOException in cleaning logs of ${info.logPath}", t)
leftToClean += info
}
}

appsToClean = leftToClean
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,12 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()

for (m <- task.metrics) {
m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
m.setExecutorRunTime(taskFinish - taskStart)
// Deserialization happens in two parts: first, we deserialize a Task object, which
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
m.setExecutorDeserializeTime(
(taskStart - deserializeStartTime) + task.executorDeserializeTime)
// We need to subtract Task.run()'s deserialization time to avoid double-counting
m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ private[spark] class ResultTask[T, U](

override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ private[spark] class ShuffleMapTask(

override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,18 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
// initialized when kill() is invoked.
@volatile @transient private var _killed = false

protected var _executorDeserializeTime: Long = 0

/**
* Whether the task has been killed.
*/
def killed: Boolean = _killed

/**
* Returns the amount of time spent deserializing the RDD and function to be run.
*/
def executorDeserializeTime: Long = _executorDeserializeTime

/**
* Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
* code and user code to properly handle the flag. This function should be idempotent so it can
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/ToolTips.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ private[spark] object ToolTips {
scheduler delay is large, consider decreasing the size of tasks or decreasing the size
of task results."""

val TASK_DESERIALIZATION_TIME = "Time spent deserializing the task closure on the executor."
val TASK_DESERIALIZATION_TIME =
"""Time spent deserializing the task closure on the executor, including the time to read the
broadcasted task."""

val SHUFFLE_READ_BLOCKED_TIME =
"Time that the task spent blocked waiting for shuffle data to be read from remote machines."
Expand Down
6 changes: 3 additions & 3 deletions dev/change-version-to-2.10.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
#

# Note that this will not necessarily work as intended with non-GNU sed (e.g. OS X)

find . -name 'pom.xml' | grep -v target \
BASEDIR=$(dirname $0)/..
find $BASEDIR -name 'pom.xml' | grep -v target \
| xargs -I {} sed -i -e 's/\(artifactId.*\)_2.11/\1_2.10/g' {}

# Also update <scala.binary.version> in parent POM
sed -i -e '0,/<scala\.binary\.version>2.11</s//<scala.binary.version>2.10</' pom.xml
sed -i -e '0,/<scala\.binary\.version>2.11</s//<scala.binary.version>2.10</' $BASEDIR/pom.xml
6 changes: 3 additions & 3 deletions dev/change-version-to-2.11.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
#

# Note that this will not necessarily work as intended with non-GNU sed (e.g. OS X)

find . -name 'pom.xml' | grep -v target \
BASEDIR=$(dirname $0)/..
find $BASEDIR -name 'pom.xml' | grep -v target \
| xargs -I {} sed -i -e 's/\(artifactId.*\)_2.10/\1_2.11/g' {}

# Also update <scala.binary.version> in parent POM
sed -i -e '0,/<scala\.binary\.version>2.10</s//<scala.binary.version>2.11</' pom.xml
sed -i -e '0,/<scala\.binary\.version>2.10</s//<scala.binary.version>2.11</' $BASEDIR/pom.xml
2 changes: 1 addition & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,7 @@ the Data Sources API. The following options are supported:
<tr>
<td><code>driver</code></td>
<td>
The class name of the JDBC driver needed to connect to this URL. This class with be loaded
The class name of the JDBC driver needed to connect to this URL. This class will be loaded
on the master and workers before running an JDBC commands to allow the driver to
register itself with the JDBC subsystem.
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object MQTTPublisher {
while (true) {
try {
msgtopic.publish(message)
println(s"Published data. topic: {msgtopic.getName()}; Message: {message}")
println(s"Published data. topic: ${msgtopic.getName()}; Message: $message")
} catch {
case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
Thread.sleep(10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class LDA private (
def getBeta: Double = getTopicConcentration

/** Alias for [[setTopicConcentration()]] */
def setBeta(beta: Double): this.type = setBeta(beta)
def setBeta(beta: Double): this.type = setTopicConcentration(beta)

/**
* Maximum number of iterations for learning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ object RidgeRegressionWithSGD {
numIterations: Int,
stepSize: Double,
regParam: Double): RidgeRegressionModel = {
train(input, numIterations, stepSize, regParam, 0.01)
train(input, numIterations, stepSize, regParam, 1.0)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {
assert(termVertexIds.map(i => LDA.index2term(i.toLong)) === termIds)
assert(termVertexIds.forall(i => LDA.isTermVertex((i.toLong, 0))))
}

test("setter alias") {
val lda = new LDA().setAlpha(2.0).setBeta(3.0)
assert(lda.getAlpha === 2.0)
assert(lda.getDocConcentration === 2.0)
assert(lda.getBeta === 3.0)
assert(lda.getTopicConcentration === 3.0)
}
}

private[clustering] object LDASuite {
Expand Down
14 changes: 14 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,20 @@ def columns(self):
"""
return [f.name for f in self.schema.fields]

@ignore_unicode_prefix
def alias(self, alias):
"""Returns a new :class:`DataFrame` with an alias set.
>>> from pyspark.sql.functions import *
>>> df_as1 = df.alias("df_as1")
>>> df_as2 = df.alias("df_as2")
>>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
>>> joined_df.select(col("df_as1.name"), col("df_as2.name"), col("df_as2.age")).collect()
[Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', name=u'Bob', age=5)]
"""
assert isinstance(alias, basestring), "alias should be a string"
return DataFrame(getattr(self._jdf, "as")(alias), self.sql_ctx)

@ignore_unicode_prefix
def join(self, other, joinExprs=None, joinType=None):
"""Joins with another :class:`DataFrame`, using the given join expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
org.apache.spark.sql.types.UTF8String(${eval.primitiveTerm}.toString)
""".children

case EqualTo(e1: BinaryType, e2: BinaryType) =>
case EqualTo(e1 @ BinaryType(), e2 @ BinaryType()) =>
(e1, e2).evaluateAs (BooleanType) {
case (eval1, eval2) =>
q"""
Expand Down
Loading

0 comments on commit 2c997c5

Please sign in to comment.