Skip to content

Commit

Permalink
[SPARK-4196][SPARK-4602][Streaming] Fix serialization issue in PairDS…
Browse files Browse the repository at this point in the history
…treamFunctions.saveAsNewAPIHadoopFiles

Solves two JIRAs in one shot
- Makes the ForechDStream created by saveAsNewAPIHadoopFiles serializable for checkpoints
- Makes the default configuration object used saveAsNewAPIHadoopFiles be the Spark's hadoop configuration

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#3457 from tdas/savefiles-fix and squashes the following commits:

bb4729a [Tathagata Das] Same treatment for saveAsHadoopFiles
b382ea9 [Tathagata Das] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles.
  • Loading branch information
tdas committed Nov 25, 2014
1 parent bf1a6aa commit 8838ad7
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@

package org.apache.spark.streaming.dstream

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.{Partitioner, HashPartitioner}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.{Time, Duration}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}

import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.streaming.StreamingContext._

/**
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
Expand Down Expand Up @@ -671,11 +668,13 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf
conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
) {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
}
self.foreachRDD(saveFunc)
}
Expand All @@ -702,11 +701,14 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = new Configuration
conf: Configuration = ssc.sparkContext.hadoopConfiguration
) {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
rdd.saveAsNewAPIHadoopFile(
file, keyClass, valueClass, outputFormatClass, serializableConf.value)
}
self.foreachRDD(saveFunc)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ import java.nio.charset.Charset

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.google.common.io.Files
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}

import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
Expand Down Expand Up @@ -205,6 +210,51 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(input, operation, output, 7)
}

test("recovery with saveAsHadoopFiles operation") {
val tempDir = Files.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
(s: DStream[String]) => {
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
output.saveAsHadoopFiles(
tempDir.toURI.toString,
"result",
classOf[Text],
classOf[IntWritable],
classOf[TextOutputFormat[Text, IntWritable]])
output
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
3
)
} finally {
Utils.deleteRecursively(tempDir)
}
}

test("recovery with saveAsNewAPIHadoopFiles operation") {
val tempDir = Files.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
(s: DStream[String]) => {
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
output.saveAsNewAPIHadoopFiles(
tempDir.toURI.toString,
"result",
classOf[Text],
classOf[IntWritable],
classOf[NewTextOutputFormat[Text, IntWritable]])
output
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
3
)
} finally {
Utils.deleteRecursively(tempDir)
}
}

// This tests whether the StateDStream's RDD checkpoints works correctly such
// that the system can recover from a master failure. This assumes as reliable,
Expand Down Expand Up @@ -391,7 +441,9 @@ class CheckpointSuite extends TestSuiteBase {
logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds)

val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
}.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
outputStream.output.map(_.flatten)
}
}

0 comments on commit 8838ad7

Please sign in to comment.