Skip to content

Commit

Permalink
SPARK-5500. Document that feeding hadoopFile into a shuffle operation…
Browse files Browse the repository at this point in the history
… wi...

...ll cause problems

Author: Sandy Ryza <sandy@cloudera.com>

Closes apache#4293 from sryza/sandy-spark-5500 and squashes the following commits:

e9ce742 [Sandy Ryza] Change to warning
cc46e52 [Sandy Ryza] Add instructions and extend to NewHadoopRDD
6e1932a [Sandy Ryza] Throw exception on cache
0f6c4eb [Sandy Ryza] SPARK-5500. Document that feeding hadoopFile into a shuffle operation will cause problems
  • Loading branch information
sryza authored and rxin committed Feb 2, 2015
1 parent 842d000 commit 8309349
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 36 deletions.
69 changes: 39 additions & 30 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -687,9 +687,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @param minPartitions Minimum number of Hadoop Splits to generate.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopRDD[K, V](
conf: JobConf,
Expand All @@ -705,12 +706,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* */
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
Expand Down Expand Up @@ -741,9 +743,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* }}}
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minPartitions: Int)
Expand All @@ -764,9 +767,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* }}}
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
Expand All @@ -788,9 +792,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* and extra configuration options to pass to the input format.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
Expand All @@ -810,9 +815,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* and extra configuration options to pass to the input format.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
Expand All @@ -826,9 +832,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def sequenceFile[K, V](path: String,
keyClass: Class[K],
Expand All @@ -843,9 +850,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = {
assertNotStopped()
Expand All @@ -869,9 +877,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* allow it to figure out the Writable class to use in the subclass case.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.{NextIterator, Utils}
import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
import org.apache.spark.storage.StorageLevel

/**
* A Spark split class that wraps around a Hadoop InputSplit.
Expand Down Expand Up @@ -308,6 +309,15 @@ class HadoopRDD[K, V](
// Do nothing. Hadoop RDD should not be checkpointed.
}

override def persist(storageLevel: StorageLevel): this.type = {
if (storageLevel.deserialized) {
logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" +
" behavior because Hadoop's RecordReader reuses the same Writable object for all records." +
" Use a map transformation to make copies of the records.")
}
super.persist(storageLevel)
}

def getConf: Configuration = getJobConf()
}

Expand Down
17 changes: 12 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,13 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.InterruptibleIterator
import org.apache.spark.Logging
import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.storage.StorageLevel

private[spark] class NewHadoopPartition(
rddId: Int,
Expand Down Expand Up @@ -211,6 +208,16 @@ class NewHadoopRDD[K, V](
locs.getOrElse(split.getLocations.filter(_ != "localhost"))
}

override def persist(storageLevel: StorageLevel): this.type = {
if (storageLevel.deserialized) {
logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" +
" behavior because Hadoop's RecordReader reuses the same Writable object for all records." +
" Use a map transformation to make copies of the records.")
}
super.persist(storageLevel)
}


def getConf: Configuration = confBroadcast.value.value
}

Expand Down

0 comments on commit 8309349

Please sign in to comment.