diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index aa6b6ca1448e4..4941477bd09b6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.storage._ import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._ import org.apache.spark.streaming.util.{WriteAheadLogSegment, WriteAheadLogUtils} -import org.apache.spark.util.{Clock, SystemClock} +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} import org.apache.spark.{Logging, SparkConf, SparkException} /** Trait that represents the metadata related to storage of blocks */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 4affdb5c1aff9..113e970ed4961 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -18,10 +18,12 @@ package org.apache.spark.streaming.util import java.io._ import java.nio.ByteBuffer +import java.util import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.{implicitConversions, postfixOps} +import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -146,7 +148,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { } test("FileBasedWriteAheadLog - write rotating logs") { - // Write data using manager + // Write data with rotation using WriteAheadLog class val dataToWrite = generateRandomData() writeDataUsingWriteAheadLog(testDir, dataToWrite) @@ -158,7 +160,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { } test("FileBasedWriteAheadLog - read rotating logs") { - // Write data manually for testing reading through manager + // Write data manually for testing reading through WriteAheadLog val writtenData = (1 to 10).map { i => val data = generateRandomData() val file = testDir + s"/log-$i-$i" @@ -243,10 +245,58 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { val readData = readDataUsingWriteAheadLog(testDir) assert(readData === dataToWrite2) } + + test("WriteAheadLogUtils - log creation") { + val logDir = Utils.createTempDir().getAbsolutePath + + def assertLogClass[T: ClassTag](conf: SparkConf, forDriver: Boolean): Unit = { + val log = if (forDriver) { + WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf, 1, 1) + } else { + WriteAheadLogUtils.createLogForReceiver(conf, logDir, hadoopConf, 1, 1) + } + assert(log.isInstanceOf[T]) + } + + val emptyConf = new SparkConf() // no log configuration + assertLogClass[FileBasedWriteAheadLog](emptyConf, forDriver = true) + assertLogClass[FileBasedWriteAheadLog](emptyConf, forDriver = false) + + val driverWALConf = new SparkConf().set("spark.streaming.driver.writeAheadLog.class", + classOf[MockWriteAheadLog1].getCanonicalName) + assertLogClass[MockWriteAheadLog1](emptyConf, forDriver = true) + assertLogClass[FileBasedWriteAheadLog](emptyConf, forDriver = false) + + val receiverWALConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class", + classOf[MockWriteAheadLog1].getCanonicalName) + assertLogClass[FileBasedWriteAheadLog](emptyConf, forDriver = true) + assertLogClass[MockWriteAheadLog1](emptyConf, forDriver = false) + + val receiverWALConf2 = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class", + classOf[MockWriteAheadLog2].getCanonicalName) + assertLogClass[FileBasedWriteAheadLog](emptyConf, forDriver = true) + assertLogClass[MockWriteAheadLog2](emptyConf, forDriver = false) + } } object WriteAheadLogSuite { + class MockWriteAheadLog1(val conf: SparkConf, val checkpointDir: String) extends WriteAheadLog { + override def write(record: ByteBuffer, time: Long): WriteAheadLogSegment = { null } + override def read(segment: WriteAheadLogSegment): ByteBuffer = { null } + override def readAll(): util.Iterator[ByteBuffer] = { null } + override def cleanup(threshTime: Long, waitForCompletion: Boolean): Unit = { } + override def close(): Unit = { } + } + + class MockWriteAheadLog2(val conf: SparkConf) extends WriteAheadLog { + override def write(record: ByteBuffer, time: Long): WriteAheadLogSegment = { null } + override def read(segment: WriteAheadLogSegment): ByteBuffer = { null } + override def readAll(): util.Iterator[ByteBuffer] = { null } + override def cleanup(threshTime: Long, waitForCompletion: Boolean): Unit = { } + override def close(): Unit = { } + } + private val hadoopConf = new Configuration() /** Write data to a file directly and return an array of the file segments written. */ @@ -279,7 +329,7 @@ object WriteAheadLogSuite { segments } - /** Write data to rotating files in log directory using the manager class. */ + /** Write data to rotating files in log directory using the WriteAheadLog class. */ def writeDataUsingWriteAheadLog( logDirectory: String, data: Seq[String], @@ -347,7 +397,7 @@ object WriteAheadLogSuite { readData } - /** Read all the data in the log file in a directory using the manager class. */ + /** Read all the data in the log file in a directory using the WriteAheadLog class. */ def readDataUsingWriteAheadLog(logDirectory: String): Seq[String] = { import scala.collection.JavaConversions._ val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf)