Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4196][SPARK-4602][Streaming] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles #3457

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@

package org.apache.spark.streaming.dstream

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.{Partitioner, HashPartitioner}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.{Time, Duration}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}

import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.streaming.StreamingContext._

/**
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
Expand Down Expand Up @@ -671,11 +668,13 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf
conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
) {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
}
self.foreachRDD(saveFunc)
}
Expand All @@ -702,11 +701,14 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = new Configuration
conf: Configuration = ssc.sparkContext.hadoopConfiguration
) {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
val serializableConf = new SerializableWritable(conf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you're already on it, yep, that looks good.

val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
rdd.saveAsNewAPIHadoopFile(
file, keyClass, valueClass, outputFormatClass, serializableConf.value)
}
self.foreachRDD(saveFunc)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ import java.nio.charset.Charset

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.google.common.io.Files
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}

import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
Expand Down Expand Up @@ -205,6 +210,51 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(input, operation, output, 7)
}

test("recovery with saveAsHadoopFiles operation") {
val tempDir = Files.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
(s: DStream[String]) => {
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
output.saveAsHadoopFiles(
tempDir.toURI.toString,
"result",
classOf[Text],
classOf[IntWritable],
classOf[TextOutputFormat[Text, IntWritable]])
output
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
3
)
} finally {
Utils.deleteRecursively(tempDir)
}
}

test("recovery with saveAsNewAPIHadoopFiles operation") {
val tempDir = Files.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
(s: DStream[String]) => {
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
output.saveAsNewAPIHadoopFiles(
tempDir.toURI.toString,
"result",
classOf[Text],
classOf[IntWritable],
classOf[NewTextOutputFormat[Text, IntWritable]])
output
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
3
)
} finally {
Utils.deleteRecursively(tempDir)
}
}

// This tests whether the StateDStream's RDD checkpoints works correctly such
// that the system can recover from a master failure. This assumes as reliable,
Expand Down Expand Up @@ -391,7 +441,9 @@ class CheckpointSuite extends TestSuiteBase {
logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds)

val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
}.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
outputStream.output.map(_.flatten)
}
}