Skip to content

Commit

Permalink
Added unit test and fixed compilation error.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 23, 2015
1 parent bce5e75 commit 84ce469
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
import org.apache.spark.streaming.util.{WriteAheadLogSegment, WriteAheadLogUtils}
import org.apache.spark.util.{Clock, SystemClock}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
import org.apache.spark.{Logging, SparkConf, SparkException}

/** Trait that represents the metadata related to storage of blocks */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package org.apache.spark.streaming.util

import java.io._
import java.nio.ByteBuffer
import java.util

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -146,7 +148,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
}

test("FileBasedWriteAheadLog - write rotating logs") {
// Write data using manager
// Write data with rotation using WriteAheadLog class
val dataToWrite = generateRandomData()
writeDataUsingWriteAheadLog(testDir, dataToWrite)

Expand All @@ -158,7 +160,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
}

test("FileBasedWriteAheadLog - read rotating logs") {
// Write data manually for testing reading through manager
// Write data manually for testing reading through WriteAheadLog
val writtenData = (1 to 10).map { i =>
val data = generateRandomData()
val file = testDir + s"/log-$i-$i"
Expand Down Expand Up @@ -243,10 +245,58 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
val readData = readDataUsingWriteAheadLog(testDir)
assert(readData === dataToWrite2)
}

test("WriteAheadLogUtils - log creation") {
val logDir = Utils.createTempDir().getAbsolutePath

def assertLogClass[T: ClassTag](conf: SparkConf, forDriver: Boolean): Unit = {
val log = if (forDriver) {
WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf, 1, 1)
} else {
WriteAheadLogUtils.createLogForReceiver(conf, logDir, hadoopConf, 1, 1)
}
assert(log.isInstanceOf[T])
}

val emptyConf = new SparkConf() // no log configuration
assertLogClass[FileBasedWriteAheadLog](emptyConf, forDriver = true)
assertLogClass[FileBasedWriteAheadLog](emptyConf, forDriver = false)

val driverWALConf = new SparkConf().set("spark.streaming.driver.writeAheadLog.class",
classOf[MockWriteAheadLog1].getCanonicalName)
assertLogClass[MockWriteAheadLog1](emptyConf, forDriver = true)
assertLogClass[FileBasedWriteAheadLog](emptyConf, forDriver = false)

val receiverWALConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
classOf[MockWriteAheadLog1].getCanonicalName)
assertLogClass[FileBasedWriteAheadLog](emptyConf, forDriver = true)
assertLogClass[MockWriteAheadLog1](emptyConf, forDriver = false)

val receiverWALConf2 = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
classOf[MockWriteAheadLog2].getCanonicalName)
assertLogClass[FileBasedWriteAheadLog](emptyConf, forDriver = true)
assertLogClass[MockWriteAheadLog2](emptyConf, forDriver = false)
}
}

object WriteAheadLogSuite {

class MockWriteAheadLog1(val conf: SparkConf, val checkpointDir: String) extends WriteAheadLog {
override def write(record: ByteBuffer, time: Long): WriteAheadLogSegment = { null }
override def read(segment: WriteAheadLogSegment): ByteBuffer = { null }
override def readAll(): util.Iterator[ByteBuffer] = { null }
override def cleanup(threshTime: Long, waitForCompletion: Boolean): Unit = { }
override def close(): Unit = { }
}

class MockWriteAheadLog2(val conf: SparkConf) extends WriteAheadLog {
override def write(record: ByteBuffer, time: Long): WriteAheadLogSegment = { null }
override def read(segment: WriteAheadLogSegment): ByteBuffer = { null }
override def readAll(): util.Iterator[ByteBuffer] = { null }
override def cleanup(threshTime: Long, waitForCompletion: Boolean): Unit = { }
override def close(): Unit = { }
}

private val hadoopConf = new Configuration()

/** Write data to a file directly and return an array of the file segments written. */
Expand Down Expand Up @@ -279,7 +329,7 @@ object WriteAheadLogSuite {
segments
}

/** Write data to rotating files in log directory using the manager class. */
/** Write data to rotating files in log directory using the WriteAheadLog class. */
def writeDataUsingWriteAheadLog(
logDirectory: String,
data: Seq[String],
Expand Down Expand Up @@ -347,7 +397,7 @@ object WriteAheadLogSuite {
readData
}

/** Read all the data in the log file in a directory using the manager class. */
/** Read all the data in the log file in a directory using the WriteAheadLog class. */
def readDataUsingWriteAheadLog(logDirectory: String): Seq[String] = {
import scala.collection.JavaConversions._
val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf)
Expand Down

0 comments on commit 84ce469

Please sign in to comment.