Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into external-sorter-byp…
Browse files Browse the repository at this point in the history
…ass-cleanup

Conflicts:
	core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
	core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
  • Loading branch information
JoshRosen committed May 29, 2015
2 parents 8b216c4 + 1c5b198 commit bf3f3f6
Show file tree
Hide file tree
Showing 718 changed files with 4,803 additions and 4,633 deletions.
10 changes: 4 additions & 6 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1314,9 +1314,8 @@ setMethod("except",
#' write.df(df, "myfile", "parquet", "overwrite")
#' }
setMethod("write.df",
signature(df = "DataFrame", path = 'character', source = 'character',
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
signature(df = "DataFrame", path = 'character'),
function(df, path, source = NULL, mode = "append", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
Expand All @@ -1338,9 +1337,8 @@ setMethod("write.df",
#' @aliases saveDF
#' @export
setMethod("saveDF",
signature(df = "DataFrame", path = 'character', source = 'character',
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
signature(df = "DataFrame", path = 'character'),
function(df, path, source = NULL, mode = "append", ...){
write.df(df, path, source, mode, ...)
})

Expand Down
5 changes: 5 additions & 0 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,11 @@ read.df <- function(sqlContext, path = NULL, source = NULL, ...) {
if (!is.null(path)) {
options[['path']] <- path
}
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
sdf <- callJMethod(sqlContext, "load", source, options)
dataFrame(sdf)
}
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -482,11 +482,11 @@ setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {

#' @rdname write.df
#' @export
setGeneric("write.df", function(df, path, source, mode, ...) { standardGeneric("write.df") })
setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })

#' @rdname write.df
#' @export
setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") })
setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") })

#' @rdname schema
#' @export
Expand Down
7 changes: 7 additions & 0 deletions bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.bagel

import org.scalatest.{BeforeAndAfter, FunSuite, Assertions}
import org.scalatest.{BeforeAndAfter, Assertions}
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._

Expand All @@ -27,7 +27,7 @@ import org.apache.spark.storage.StorageLevel
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable

class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts {
class BagelSuite extends SparkFunSuite with Assertions with BeforeAndAfter with Timeouts {

var sc: SparkContext = _

Expand Down
6 changes: 1 addition & 5 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,7 @@ if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
export PYTHONHASHSEED=0
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
exec "$PYSPARK_DRIVER_PYTHON" $1
fi
exec "$PYSPARK_DRIVER_PYTHON" -m $1
exit
fi

Expand Down
14 changes: 13 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
<artifactId>selenium-java</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<!-- Added for selenium: -->
Expand Down Expand Up @@ -377,9 +383,15 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<groupId>net.razorvine</groupId>
<artifactId>pyrolite</artifactId>
<version>4.4</version>
<exclusions>
<exclusion>
<groupId>net.razorvine</groupId>
<artifactId>serpent</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.py4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ sorttable = {
this.removeChild(document.getElementById('sorttable_sortfwdind'));
sortrevind = document.createElement('span');
sortrevind.id = "sorttable_sortrevind";
sortrevind.innerHTML = stIsIE ? '&nbsp<font face="webdings">5</font>' : '&nbsp;&#x25B4;';
sortrevind.innerHTML = stIsIE ? '&nbsp<font face="webdings">5</font>' : '&nbsp;&#x25BE;';
this.appendChild(sortrevind);
return;
}
Expand All @@ -113,7 +113,7 @@ sorttable = {
this.removeChild(document.getElementById('sorttable_sortrevind'));
sortfwdind = document.createElement('span');
sortfwdind.id = "sorttable_sortfwdind";
sortfwdind.innerHTML = stIsIE ? '&nbsp<font face="webdings">6</font>' : '&nbsp;&#x25BE;';
sortfwdind.innerHTML = stIsIE ? '&nbsp<font face="webdings">6</font>' : '&nbsp;&#x25B4;';
this.appendChild(sortfwdind);
return;
}
Expand All @@ -134,7 +134,7 @@ sorttable = {
this.className += ' sorttable_sorted';
sortfwdind = document.createElement('span');
sortfwdind.id = "sorttable_sortfwdind";
sortfwdind.innerHTML = stIsIE ? '&nbsp<font face="webdings">6</font>' : '&nbsp;&#x25BE;';
sortfwdind.innerHTML = stIsIE ? '&nbsp<font face="webdings">6</font>' : '&nbsp;&#x25B4;';
this.appendChild(sortfwdind);

// build an array to sort. This is a Schwartzian transform thing,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ function renderDagVizForJob(svgContainer) {
// Use the link from the stage table so it also works for the history server
var attemptId = 0
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
.select("a")
.select("a.name-link")
.attr("href") + "&expandDagViz=true";
container = svgContainer
.append("a")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) {
};

$(this).click(function() {
var jobPagePath = $(getSelectorForJobEntry(this)).find("a").attr("href")
var jobPagePath = $(getSelectorForJobEntry(this)).find("a.name-link").attr("href")
window.location.href = jobPagePath
});

Expand Down Expand Up @@ -105,7 +105,7 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) {
};

$(this).click(function() {
var stagePagePath = $(getSelectorForStageEntry(this)).find("a").attr("href")
var stagePagePath = $(getSelectorForStageEntry(this)).find("a.name-link").attr("href")
window.location.href = stagePagePath
});

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @tparam T result type
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T,T](initialValue, param, name) {
extends Accumulable[T, T](initialValue, param, name) {

def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ case class Aggregator[K, V, C] (
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
if (!isSpillEnabled) {
val combiners = new AppendOnlyMap[K,C]
val combiners = new AppendOnlyMap[K, C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
Expand Down Expand Up @@ -76,7 +76,7 @@ case class Aggregator[K, V, C] (
: Iterator[(K, C)] =
{
if (!isSpillEnabled) {
val combiners = new AppendOnlyMap[K,C]
val combiners = new AppendOnlyMap[K, C]
var kc: Product2[K, C] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
*/
class RangePartitioner[K : Ordering : ClassTag, V](
@transient partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
@transient rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {

Expand Down Expand Up @@ -185,7 +185,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}

override def equals(other: Any): Boolean = other match {
case r: RangePartitioner[_,_] =>
case r: RangePartitioner[_, _] =>
r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
case _ =>
false
Expand Down Expand Up @@ -249,7 +249,7 @@ private[spark] object RangePartitioner {
* @param sampleSizePerPartition max sample size per partition
* @return (total number of items, an array of (partitionId, number of items, sample))
*/
def sketch[K:ClassTag](
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
val shift = rdd.id
Expand All @@ -272,7 +272,7 @@ private[spark] object RangePartitioner {
* @param partitions number of partitions
* @return selected bounds
*/
def determineBounds[K:Ordering:ClassTag](
def determineBounds[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
Expand Down
44 changes: 0 additions & 44 deletions core/src/main/scala/org/apache/spark/SizeEstimator.scala

This file was deleted.

2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ private[spark] object SparkConf extends Logging {
"are no longer accepted. To specify the equivalent now, one may use '64k'.")
)

Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
}

/**
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

_jars =_conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
_jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))
.toSeq.flatten

Expand Down Expand Up @@ -438,7 +438,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager,appName, startTime = startTime))
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
Expand Down Expand Up @@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
classOf[FixedLengthBinaryInputFormat],
classOf[LongWritable],
classOf[BytesWritable],
conf=conf)
conf = conf)
val data = br.map { case (k, v) =>
val bytes = v.getBytes
assert(bytes.length == recordLength, "Byte array does not have correct length")
Expand Down Expand Up @@ -1267,7 +1267,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
(initialValue: R): Accumulable[R, T] = {
val param = new GrowableAccumulableParam[R,T]
val param = new GrowableAccumulableParam[R, T]
val acc = new Accumulable(initialValue, param)
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
Expand Down Expand Up @@ -1316,7 +1316,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val uri = new URI(path)
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => new File(path).getCanonicalFile.toURI.toString
case _ => path
case _ => path
}

val hadoopPath = new Path(schemeCorrectedPath)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ object SparkEnv extends Logging {
}
}

val mapOutputTracker = if (isDriver) {
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf)
} else {
new MapOutputTrackerWorker(conf)
Expand Down Expand Up @@ -348,7 +348,7 @@ object SparkEnv extends Logging {
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val server = new HttpFileServer(conf, securityManager, fileServerPort)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
conf.set("spark.fileserver.uri", server.serverUri)
server
} else {
null
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
private var jID: SerializableWritable[JobID] = null
private var taID: SerializableWritable[TaskAttemptID] = null

@transient private var writer: RecordWriter[AnyRef,AnyRef] = null
@transient private var format: OutputFormat[AnyRef,AnyRef] = null
@transient private var writer: RecordWriter[AnyRef, AnyRef] = null
@transient private var format: OutputFormat[AnyRef, AnyRef] = null
@transient private var committer: OutputCommitter = null
@transient private var jobContext: JobContext = null
@transient private var taskContext: TaskAttemptContext = null
Expand Down Expand Up @@ -114,10 +114,10 @@ class SparkHadoopWriter(@transient jobConf: JobConf)

// ********* Private Functions *********

private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = {
private def getOutputFormat(): OutputFormat[AnyRef, AnyRef] = {
if (format == null) {
format = conf.value.getOutputFormat()
.asInstanceOf[OutputFormat[AnyRef,AnyRef]]
.asInstanceOf[OutputFormat[AnyRef, AnyRef]]
}
format
}
Expand All @@ -138,7 +138,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)

private def getTaskContext(): TaskAttemptContext = {
if (taskContext == null) {
taskContext = newTaskAttemptContext(conf.value, taID.value)
taskContext = newTaskAttemptContext(conf.value, taID.value)
}
taskContext
}
Expand Down
Loading

0 comments on commit bf3f3f6

Please sign in to comment.