Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7056][Streaming] Make the Write Ahead Log pluggable #5645

Closed
wants to merge 16 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import kafka.message.MessageAndMetadata
import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}

import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -79,7 +80,7 @@ object KafkaUtils {
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.util;

import java.nio.ByteBuffer;
import java.util.Iterator;

/**
* This abstract class represents a write ahead log (aka journal) that is used by Spark Streaming
* to save the received data (by receivers) and associated metadata to a reliable storage, so that
* they can be recovered after driver failures. See the Spark documentation for more information
* on how to plug in your own custom implementation of a write ahead log.
*/
@org.apache.spark.annotation.DeveloperApi
public abstract class WriteAheadLog {
/**
* Write the record to the log and return a record handle, which contains all the information
* necessary to read back the written record. The time is used to the index the record,
* such that it can be cleaned later. Note that implementations of this abstract class must
* ensure that the written data is durable and readable (using the record handle) by the
* time this function returns.
*/
abstract public WriteAheadLogRecordHandle write(ByteBuffer record, long time);

/**
* Read a written record based on the given record handle.
*/
abstract public ByteBuffer read(WriteAheadLogRecordHandle handle);

/**
* Read and return an iterator of all the records that have been written but not yet cleaned up.
*/
abstract public Iterator<ByteBuffer> readAll();

/**
* Clean all the records that are older than the threshold time. It can wait for
* the completion of the deletion.
*/
abstract public void clean(long threshTime, boolean waitForCompletion);

/**
* Close this log and release any resources.
*/
abstract public void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.util;

/**
* This abstract class represents a handle that refers to a record written in a
* {@link org.apache.spark.streaming.util.WriteAheadLog WriteAheadLog}.
* It must contain all the information necessary for the record to be read and returned by
* an implemenation of the WriteAheadLog class.
*
* @see org.apache.spark.streaming.util.WriteAheadLog
*/
@org.apache.spark.annotation.DeveloperApi
public abstract class WriteAheadLogRecordHandle implements java.io.Serializable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
// WriteAheadLogBackedBlockRDD else create simple BlockRDD.
if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
val logSegments = blockStoreResults.map {
_.asInstanceOf[WriteAheadLogBasedStoreResult].segment
_.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
}.toArray
// Since storeInBlockManager = false, the storage level does not matter.
new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,61 @@
*/
package org.apache.spark.streaming.rdd

import java.nio.ByteBuffer

import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.commons.io.FileUtils

import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, WriteAheadLogRandomReader}
import org.apache.spark.streaming.util._

/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
* It contains information about the id of the blocks having this partition's data and
* the segment of the write ahead log that backs the partition.
* @param index index of the partition
* @param blockId id of the block having the partition data
* @param segment segment of the write ahead log having the partition data
* @param walRecordHandle Handle of the record in a write ahead log having the partition data
*/
private[streaming]
class WriteAheadLogBackedBlockRDDPartition(
val index: Int,
val blockId: BlockId,
val segment: WriteAheadLogFileSegment)
val walRecordHandle: WriteAheadLogRecordHandle)
extends Partition


/**
* This class represents a special case of the BlockRDD where the data blocks in
* the block manager are also backed by segments in write ahead logs. For reading
* the block manager are also backed by data in write ahead logs. For reading
* the data, this RDD first looks up the blocks by their ids in the block manager.
* If it does not find them, it looks up the corresponding file segment.
* If it does not find them, it looks up the corresponding data in the write ahead log.
*
* @param sc SparkContext
* @param blockIds Ids of the blocks that contains this RDD's data
* @param segments Segments in write ahead logs that contain this RDD's data
* @param storeInBlockManager Whether to store in the block manager after reading from the segment
* @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
* @param storeInBlockManager Whether to store in the block manager after reading
* from the WAL record
* @param storageLevel storage level to store when storing in block manager
* (applicable when storeInBlockManager = true)
*/
private[streaming]
class WriteAheadLogBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
@transient blockIds: Array[BlockId],
@transient segments: Array[WriteAheadLogFileSegment],
@transient walRecordHandles: Array[WriteAheadLogRecordHandle],
storeInBlockManager: Boolean,
storageLevel: StorageLevel)
extends BlockRDD[T](sc, blockIds) {

require(
blockIds.length == segments.length,
blockIds.length == walRecordHandles.length,
s"Number of block ids (${blockIds.length}) must be " +
s"the same as number of segments (${segments.length}})!")
s"the same as number of WAL record handles (${walRecordHandles.length}})!")

// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
Expand All @@ -75,13 +79,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
override def getPartitions: Array[Partition] = {
assertValid()
Array.tabulate(blockIds.size) { i =>
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i))
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), walRecordHandles(i))
}
}

/**
* Gets the partition data by getting the corresponding block from the block manager.
* If the block does not exist, then the data is read from the corresponding segment
* If the block does not exist, then the data is read from the corresponding record
* in write ahead log files.
*/
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
Expand All @@ -96,10 +100,29 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
logDebug(s"Read partition data of $this from block manager, block $blockId")
iterator
case None => // Data not found in Block Manager, grab it from write ahead log file
val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf)
val dataRead = reader.read(partition.segment)
reader.close()
logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}")
var dataRead: ByteBuffer = null
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel dirty seeing nulls in scala

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    ByteBuffer.wrap(new byte[0])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why allocate two (at least) objects when it is completely obvious that they are not going to be used. The null does not get exposed to anything outside the function, and hence is okay to have.

If you look at rest of the Spark source code, we dont strictly adhere to Scala-way of doing things, rather balance code understandability (limit the levels of functional nesting) and efficiency (while loops instead of for when perf matters) with Scala styles.

var writeAheadLog: WriteAheadLog = null
try {
val dummyDirectory = FileUtils.getTempDirectoryPath()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why here need to use dummyDirectory? Assuming WAL may not be file-based, so I'm not sure what's the meaning we need to have this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the default WAL is file based so a log directory is needed for it to work. However, the log directory is really not needed reading a particular record. But to read a single record you have to create a FileBasedWriteAheadLog object, which needs a log directory. Hence I am providing a dummy directory for this.

I know that this is a little awkward. This is the cost of defining a single interface for both writing and reading single records. Earlier there were two independent classes (WALWriter and WALRandomReader) that was used for these two purposes, which has different requirements. But since I am trying make single interface that can be used for all reading and writing, the log directory must be provided in the constructor of the default file-based WAL. This results in the awkwardness.

I dont quite like it myself, but it may practically be okay as long as we ensure that the FileBasedWAL does not create unnecessary directories/files when only reading a single record. I can add a test to ensure that.

writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
SparkEnv.get.conf, dummyDirectory, hadoopConf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also IIUC here if the journal system if not hadoop based, hadoopConf may not be available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hadoopConf is always available through the SparkContext. Irrespective of whether Hadoop file system is used, a Hadoop conf is created by the SparkContext which is passed on to this location. If the WAL is not the default FileBasedWAL, then this parameter is just ignored (see the method WriteAheadLogUtils.createLogForReceiver

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm thinking is that do we need to have this parameter for the interface, can we hide this into file-based WAL implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log directory needs to be passed through the WriteAheadLogUtils.createLogForXXX(). If you want to hide it from this method, and pass it through the SparkConf, then every place where WriteAheadLogUtils.createLogForXXX() needs to be called, we need to add the following.

val walConf = SparkEnv.get.conf.clone()
walConf.set("logdir", ....)
`WriteAheadLogUtils.createLogForXXX(walConf, hadoopConf)`

IMO that duplicates code everywhere and uglier that this dummy dir approach. And also, this does not handle hadoopConfparameter, which is equally unnecessary as the log directory for arbitrary WAL implementations.

dataRead = writeAheadLog.read(partition.walRecordHandle)
} catch {
case NonFatal(e) =>
throw new SparkException(
s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
} finally {
if (writeAheadLog != null) {
writeAheadLog.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be reset writeAheadLog to null after close to avoid unexpected behavior :).

writeAheadLog = null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}
if (dataRead == null) {
throw new SparkException(
s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
s"read returned null")
}
logInfo(s"Read partition data of $this from write ahead log, record handle " +
partition.walRecordHandle)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to use string interpolation here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it has to be on the next line, using s"$..." is strictly more number of chars :)

if (storeInBlockManager) {
blockManager.putBytes(blockId, dataRead, storageLevel)
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
Expand All @@ -111,14 +134,20 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](

/**
* Get the preferred location of the partition. This returns the locations of the block
* if it is present in the block manager, else it returns the location of the
* corresponding segment in HDFS.
* if it is present in the block manager, else if FileBasedWriteAheadLogSegment is used,
* it returns the location of the corresponding file segment in HDFS .
*/
override def getPreferredLocations(split: Partition): Seq[String] = {
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockLocations = getBlockIdLocations().get(partition.blockId)
blockLocations.getOrElse(
HdfsUtils.getFileSegmentLocations(
partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig))
blockLocations.getOrElse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to add location info to the WALRecordHandle interface itself. This way, systems that are not HDFS, but still benefit from preferred locations can use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I wasnt super sure of whether it is a good idea to have it in the interface in this version. We can add it later and maintain binary compatibility as the RecordHandle is an abstract class. Also It is still a developer API s. For now, I am going to merge this in to unblock #5732 .

partition.walRecordHandle match {
case fileSegment: FileBasedWriteAheadLogSegment =>
HdfsUtils.getFileSegmentLocations(
fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
case _ =>
Seq.empty
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

package org.apache.spark.streaming.receiver

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.{existentials, postfixOps}

import WriteAheadLogBasedBlockHandler._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage._
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager}
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
import org.apache.spark.{Logging, SparkConf, SparkException}

/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
Expand Down Expand Up @@ -96,7 +96,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
*/
private[streaming] case class WriteAheadLogBasedStoreResult(
blockId: StreamBlockId,
segment: WriteAheadLogFileSegment
walRecordHandle: WriteAheadLogRecordHandle
) extends ReceivedBlockStoreResult


Expand All @@ -116,10 +116,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler(

private val blockStoreTimeout = conf.getInt(
"spark.streaming.receiver.blockStoreTimeout", 30).seconds
private val rollingInterval = conf.getInt(
"spark.streaming.receiver.writeAheadLog.rollingInterval", 60)
private val maxFailures = conf.getInt(
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)

private val effectiveStorageLevel = {
if (storageLevel.deserialized) {
Expand All @@ -139,13 +135,9 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
s"$effectiveStorageLevel when write ahead log is enabled")
}

// Manages rolling log files
private val logManager = new WriteAheadLogManager(
checkpointDirToLogDir(checkpointDir, streamId),
hadoopConf, rollingInterval, maxFailures,
callerName = this.getClass.getSimpleName,
clock = clock
)
// Write ahead log manages
private val writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
conf, checkpointDirToLogDir(checkpointDir, streamId), hadoopConf)

// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
Expand Down Expand Up @@ -183,21 +175,21 @@ private[streaming] class WriteAheadLogBasedBlockHandler(

// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
logManager.writeToLog(serializedBlock)
writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}

// Combine the futures, wait for both to complete, and return the write ahead log segment
// Combine the futures, wait for both to complete, and return the write ahead log record handle
val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
val segment = Await.result(combinedFuture, blockStoreTimeout)
WriteAheadLogBasedStoreResult(blockId, segment)
val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
WriteAheadLogBasedStoreResult(blockId, walRecordHandle)
}

def cleanupOldBlocks(threshTime: Long) {
logManager.cleanupOldLogs(threshTime, waitForCompletion = false)
writeAheadLog.clean(threshTime, false)
}

def stop() {
logManager.stop()
writeAheadLog.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import scala.collection.mutable.ArrayBuffer
import com.google.common.base.Throwables
import org.apache.hadoop.conf.Configuration

import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.util.{RpcUtils, Utils}
import org.apache.spark.{Logging, SparkEnv, SparkException}

/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
Expand All @@ -46,7 +47,7 @@ private[streaming] class ReceiverSupervisorImpl(
) extends ReceiverSupervisor(receiver, env.conf) with Logging {

private val receivedBlockHandler: ReceivedBlockHandler = {
if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
Expand Down
Loading