diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 80490fecd978b..d6a4cd42f7604 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -518,7 +518,6 @@ class SparkContext(config: SparkConf) extends Logging { * * @param minPartitions A suggestion value of the minimal splitting number for input data. * - * @note Care must be taken to close the files afterwards * @note Small files are preferred, large file is also allowable, but may cause bad performance. */ @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 3ff95d41de998..373e6f1d12864 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -263,39 +263,6 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork JavaPairRDD[String,PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path,defaultMinPartitions)) - /** - * Read a directory of files as DataInputStream from HDFS, - * a local file system (available on all nodes), or any Hadoop-supported file system URI - * as a byte array. Each file is read as a single record and returned in a - * key-value pair, where the key is the path of each file, the value is the content of each. - * - *

For example, if you have the following files: - * {{{ - * hdfs://a-hdfs-path/part-00000 - * hdfs://a-hdfs-path/part-00001 - * ... - * hdfs://a-hdfs-path/part-nnnnn - * }}} - * - * Do - * `JavaPairRDD rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path")`, - * - *

then `rdd` contains - * {{{ - * (a-hdfs-path/part-00000, its content) - * (a-hdfs-path/part-00001, its content) - * ... - * (a-hdfs-path/part-nnnnn, its content) - * }}} - * - * @note Small files are preferred, large file is also allowable, but may cause bad performance. - * - * @param minPartitions A suggestion value of the minimal splitting number for input data. - */ - def binaryArrays(path: String, minPartitions: Int = defaultMinPartitions): - JavaPairRDD[String, Array[Byte]] = - new JavaPairRDD(sc.binaryFiles(path,minPartitions).mapValues(_.toArray())) - /** * Load data from a flat binary file, assuming each record is a set of numbers * with the specified numerical format (see ByteBuffer), and the number of diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index 9c8121e2a6d14..9cfe3e5c1742a 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -35,9 +35,10 @@ private[spark] abstract class StreamFileInputFormat[T] extends CombineFileInputFormat[String, T] { override protected def isSplitable(context: JobContext, file: Path): Boolean = false /** - * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API + * which is set through setMaxSplitSize */ - def setMaxSplitSize(context: JobContext, minPartitions: Int) { + def setMinPartitions(context: JobContext, minPartitions: Int) { val files = listStatus(context) val totalLen = files.map { file => if (file.isDir) 0L else file.getLen diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 3d1b5f1b543f5..c01196de6bad7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -17,22 +17,21 @@ package org.apache.spark.rdd -import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.conf.{ Configurable, Configuration } import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.apache.spark.input.StreamFileInputFormat -import org.apache.spark.{Partition, SparkContext} +import org.apache.spark.{ Partition, SparkContext } private[spark] class BinaryFileRDD[T]( - sc : SparkContext, - inputFormatClass: Class[_ <: StreamFileInputFormat[T]], - keyClass: Class[String], - valueClass: Class[T], - @transient conf: Configuration, - minPartitions: Int) + sc: SparkContext, + inputFormatClass: Class[_ <: StreamFileInputFormat[T]], + keyClass: Class[String], + valueClass: Class[T], + @transient conf: Configuration, + minPartitions: Int) extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) { - override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance inputFormat match { @@ -41,13 +40,12 @@ private[spark] class BinaryFileRDD[T]( case _ => } val jobContext = newJobContext(conf, jobId) - inputFormat.setMaxSplitSize(jobContext, minPartitions) + inputFormat.setMinPartitions(jobContext, minPartitions) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { result(i) = new NewHadoopPartition( - id, i, rawSplits(i).asInstanceOf[InputSplit with Writable] - ) + id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } result } diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index e265b43fb79d1..3f4c7b0180486 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -243,7 +243,7 @@ class FileSuite extends FunSuite with LocalSparkContext { file.close() val inRdd = sc.binaryFiles(outFileName) - val (infile: String, indata: PortableDataStream) = inRdd.first + val (infile: String, indata: PortableDataStream) = inRdd.collect.head // Make sure the name and array match assert(infile.contains(outFileName)) // a prefix may get added @@ -274,7 +274,7 @@ class FileSuite extends FunSuite with LocalSparkContext { curData: (String, PortableDataStream) => (curData._2.getPath(),curData._2) } - val (infile: String, indata: PortableDataStream) = mappedRdd.first + val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head // Try reading the output back as an object file @@ -305,7 +305,7 @@ class FileSuite extends FunSuite with LocalSparkContext { curData: (String, PortableDataStream) => (curData._2.getPath(),curData._2) } - val (infile: String, indata: PortableDataStream) = mappedRdd.first + val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head // Try reading the output back as an object file @@ -376,7 +376,7 @@ class FileSuite extends FunSuite with LocalSparkContext { assert(inRdd.count == testOutputCopies) // now just compare the first one - val indata: Array[Byte] = inRdd.first + val indata: Array[Byte] = inRdd.collect.head assert(indata === testOutput) }