diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 4bdb13f9ad3d0..bd17dae14d3a8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1176,13 +1176,17 @@ abstract class RDD[T: ClassTag]( def saveAsTextFile(path: String) { // https://issues.apache.org/jira/browse/SPARK-2075 // NullWritable is a Comparable rather than Comparable[NullWritable] in Hadoop 1.+, - // so the compiler cannot find an implicit Ordering for it. It will generate different - // anonymous classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. Therefore, here we - // provide an Ordering for NullWritable so that the compiler will generate same codes. - implicit val nullWritableOrdering = new Ordering[NullWritable] { + // so the compiler cannot find an implicit Ordering for it and will use the default `null`. + // It will generate different anonymous classes for `saveAsTextFile` in Hadoop 1.+ and + // Hadoop 2.+. Therefore, here we provide an Ordering for NullWritable so that the compiler + // will generate same bytecode. + val nullWritableOrdering = new Ordering[NullWritable] { override def compare(x: NullWritable, y: NullWritable): Int = 0 } - this.map(x => (NullWritable.get(), new Text(x.toString))) + val nullWritableClassTag = implicitly[ClassTag[NullWritable]] + val textClassTag = implicitly[ClassTag[Text]] + val r = this.map(x => (NullWritable.get(), new Text(x.toString))) + RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, nullWritableOrdering) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) } @@ -1191,10 +1195,13 @@ abstract class RDD[T: ClassTag]( */ def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) { // https://issues.apache.org/jira/browse/SPARK-2075 - implicit val nullWritableOrdering = new Ordering[NullWritable] { + val nullWritableOrdering = new Ordering[NullWritable] { override def compare(x: NullWritable, y: NullWritable): Int = 0 } - this.map(x => (NullWritable.get(), new Text(x.toString))) + val nullWritableClassTag = implicitly[ClassTag[NullWritable]] + val textClassTag = implicitly[ClassTag[Text]] + val r = this.map(x => (NullWritable.get(), new Text(x.toString))) + RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, nullWritableOrdering) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec) }