From bde26b15f312a03dc2f3deb93c43e8551240cd97 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 28 Apr 2015 19:08:36 -0700 Subject: [PATCH] Renamed segment to record handle everywhere --- .../spark/streaming/util/WriteAheadLog.java | 11 +++--- .../dstream/ReceiverInputDStream.scala | 2 +- .../rdd/WriteAheadLogBackedBlockRDD.scala | 38 ++++++++++--------- .../receiver/ReceivedBlockHandler.scala | 8 ++-- .../streaming/ReceivedBlockHandlerSuite.scala | 2 +- .../WriteAheadLogBackedBlockRDDSuite.scala | 18 ++++----- .../streaming/util/WriteAheadLogSuite.scala | 2 +- 7 files changed, 42 insertions(+), 39 deletions(-) diff --git a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java index 5226e59edb9a5..8c0fdfa9c7478 100644 --- a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java +++ b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java @@ -29,15 +29,16 @@ @org.apache.spark.annotation.DeveloperApi public abstract class WriteAheadLog { /** - * Write the record to the log and return the segment information that is necessary to read - * back the written record. The time is used to the index the record, such that it can be - * cleaned later. Note that the written data must be durable and readable (using the - * segment info) by the time this function returns. + * Write the record to the log and return a record handle, which contains all the information + * necessary to read back the written record. The time is used to the index the record, + * such that it can be cleaned later. Note that implementations of this abstract class must + * ensure that the written data is durable and readable (using the record handle) by the + * time this function returns. */ abstract public WriteAheadLogRecordHandle write(ByteBuffer record, long time); /** - * Read a written record based on the given segment information. + * Read a written record based on the given record handle. */ abstract public ByteBuffer read(WriteAheadLogRecordHandle handle); diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 8be04314c4285..4c7fd2c57c006 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -82,7 +82,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont // WriteAheadLogBackedBlockRDD else create simple BlockRDD. if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) { val logSegments = blockStoreResults.map { - _.asInstanceOf[WriteAheadLogBasedStoreResult].segment + _.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle }.toArray // Since storeInBlockManager = false, the storage level does not matter. new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index a22fe3e1d483c..2e9d248a95cf2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -34,26 +34,27 @@ import org.apache.spark.streaming.util._ * the segment of the write ahead log that backs the partition. * @param index index of the partition * @param blockId id of the block having the partition data - * @param segment segment of the write ahead log having the partition data + * @param walRecordHandle Handle of the record in a write ahead log having the partition data */ private[streaming] class WriteAheadLogBackedBlockRDDPartition( val index: Int, val blockId: BlockId, - val segment: WriteAheadLogRecordHandle) + val walRecordHandle: WriteAheadLogRecordHandle) extends Partition /** * This class represents a special case of the BlockRDD where the data blocks in - * the block manager are also backed by segments in write ahead logs. For reading + * the block manager are also backed by data in write ahead logs. For reading * the data, this RDD first looks up the blocks by their ids in the block manager. - * If it does not find them, it looks up the corresponding file segment. + * If it does not find them, it looks up the corresponding data in the write ahead log. * * @param sc SparkContext * @param blockIds Ids of the blocks that contains this RDD's data - * @param segments Segments in write ahead logs that contain this RDD's data - * @param storeInBlockManager Whether to store in the block manager after reading from the segment + * @param walRecordHandles Record handles in write ahead logs that contain this RDD's data + * @param storeInBlockManager Whether to store in the block manager after reading + * from the WAL record * @param storageLevel storage level to store when storing in block manager * (applicable when storeInBlockManager = true) */ @@ -61,15 +62,15 @@ private[streaming] class WriteAheadLogBackedBlockRDD[T: ClassTag]( @transient sc: SparkContext, @transient blockIds: Array[BlockId], - @transient segments: Array[WriteAheadLogRecordHandle], + @transient walRecordHandles: Array[WriteAheadLogRecordHandle], storeInBlockManager: Boolean, storageLevel: StorageLevel) extends BlockRDD[T](sc, blockIds) { require( - blockIds.length == segments.length, + blockIds.length == walRecordHandles.length, s"Number of block ids (${blockIds.length}) must be " + - s"the same as number of segments (${segments.length}})!") + s"the same as number of WAL record handles (${walRecordHandles.length}})!") // Hadoop configuration is not serializable, so broadcast it as a serializable. @transient private val hadoopConfig = sc.hadoopConfiguration @@ -78,13 +79,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( override def getPartitions: Array[Partition] = { assertValid() Array.tabulate(blockIds.size) { i => - new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i)) + new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), walRecordHandles(i)) } } /** * Gets the partition data by getting the corresponding block from the block manager. - * If the block does not exist, then the data is read from the corresponding segment + * If the block does not exist, then the data is read from the corresponding record * in write ahead log files. */ override def compute(split: Partition, context: TaskContext): Iterator[T] = { @@ -105,11 +106,11 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( val dummyDirectory = FileUtils.getTempDirectoryPath() writeAheadLog = WriteAheadLogUtils.createLogForReceiver( SparkEnv.get.conf, dummyDirectory, hadoopConf) - dataRead = writeAheadLog.read(partition.segment) + dataRead = writeAheadLog.read(partition.walRecordHandle) } catch { case NonFatal(e) => throw new SparkException( - s"Could not read data from write ahead log segment ${partition.segment}", e) + s"Could not read data from write ahead log record ${partition.walRecordHandle}", e) } finally { if (writeAheadLog != null) { writeAheadLog.close() @@ -117,10 +118,11 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( } if (dataRead == null) { throw new SparkException( - s"Could not read data from write ahead log segment ${partition.segment}, " + + s"Could not read data from write ahead log record ${partition.walRecordHandle}, " + s"read returned null") } - logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}") + logInfo(s"Read partition data of $this from write ahead log, record handle " + + partition.walRecordHandle) if (storeInBlockManager) { blockManager.putBytes(blockId, dataRead, storageLevel) logDebug(s"Stored partition data of $this into block manager with level $storageLevel") @@ -132,14 +134,14 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( /** * Get the preferred location of the partition. This returns the locations of the block - * if it is present in the block manager, else it returns the location of the - * corresponding segment in HDFS. + * if it is present in the block manager, else if FileBasedWriteAheadLogSegment is used, + * it returns the location of the corresponding file segment in HDFS . */ override def getPreferredLocations(split: Partition): Seq[String] = { val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] val blockLocations = getBlockIdLocations().get(partition.blockId) blockLocations.getOrElse { - partition.segment match { + partition.walRecordHandle match { case fileSegment: FileBasedWriteAheadLogSegment => HdfsUtils.getFileSegmentLocations( fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 602e7167a1d37..4b3d9ee4b0090 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -96,7 +96,7 @@ private[streaming] class BlockManagerBasedBlockHandler( */ private[streaming] case class WriteAheadLogBasedStoreResult( blockId: StreamBlockId, - segment: WriteAheadLogRecordHandle + walRecordHandle: WriteAheadLogRecordHandle ) extends ReceivedBlockStoreResult @@ -178,10 +178,10 @@ private[streaming] class WriteAheadLogBasedBlockHandler( writeAheadLog.write(serializedBlock, clock.getTimeMillis()) } - // Combine the futures, wait for both to complete, and return the write ahead log segment + // Combine the futures, wait for both to complete, and return the write ahead log record handle val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) - val segment = Await.result(combinedFuture, blockStoreTimeout) - WriteAheadLogBasedStoreResult(blockId, segment) + val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) + WriteAheadLogBasedStoreResult(blockId, walRecordHandle) } def cleanupOldBlocks(threshTime: Long) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 4a99519629885..d1628260e4fca 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -130,7 +130,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche "Unexpected store result type" ) // Verify the data in write ahead log files is correct - val walSegments = storeResults.map { _.asInstanceOf[WriteAheadLogBasedStoreResult].segment} + val walSegments = storeResults.map { _.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle} val loggedData = walSegments.flatMap { walSegment => val fileSegment = walSegment.asInstanceOf[FileBasedWriteAheadLogSegment] val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index d8160e41c12e5..deafd76183df7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -100,9 +100,9 @@ class WriteAheadLogBackedBlockRDDSuite blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER) } - // Generate write ahead log segments - val segments = generateFakeSegments(numPartitionsInBM) ++ - writeLogSegments(data.takeRight(numPartitionsInWAL), blockIds.takeRight(numPartitionsInWAL)) + // Generate write ahead log file segments + val recordHandles = generateFakeRecordHandles(numPartitionsInBM) ++ + generateWALRecordHandles(data.takeRight(numPartitionsInWAL), blockIds.takeRight(numPartitionsInWAL)) // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not require( @@ -116,24 +116,24 @@ class WriteAheadLogBackedBlockRDDSuite // Make sure that the right `numPartitionsInWAL` blocks are in WALs, and other are not require( - segments.takeRight(numPartitionsInWAL).forall(s => + recordHandles.takeRight(numPartitionsInWAL).forall(s => new File(s.path.stripPrefix("file://")).exists()), "Expected blocks not in write ahead log" ) require( - segments.take(numPartitionsInBM).forall(s => + recordHandles.take(numPartitionsInBM).forall(s => !new File(s.path.stripPrefix("file://")).exists()), "Unexpected blocks in write ahead log" ) // Create the RDD and verify whether the returned data is correct val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray, - segments.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY) + recordHandles.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY) assert(rdd.collect() === data.flatten) if (testStoreInBM) { val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray, - segments.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY) + recordHandles.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY) assert(rdd2.collect() === data.flatten) assert( blockIds.forall(blockManager.get(_).nonEmpty), @@ -142,7 +142,7 @@ class WriteAheadLogBackedBlockRDDSuite } } - private def writeLogSegments( + private def generateWALRecordHandles( blockData: Seq[Seq[String]], blockIds: Seq[BlockId] ): Seq[FileBasedWriteAheadLogSegment] = { @@ -155,7 +155,7 @@ class WriteAheadLogBackedBlockRDDSuite segments } - private def generateFakeSegments(count: Int): Seq[FileBasedWriteAheadLogSegment] = { + private def generateFakeRecordHandles(count: Int): Seq[FileBasedWriteAheadLogSegment] = { Array.fill(count)(new FileBasedWriteAheadLogSegment("random", 0L, 0)) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 7742b44e4b86f..58bde54918ac9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -294,7 +294,7 @@ object WriteAheadLogSuite { class MockWriteAheadLog0() extends WriteAheadLog { override def write(record: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { null } - override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = { null } + override def read(handle: WriteAheadLogRecordHandle): ByteBuffer = { null } override def readAll(): util.Iterator[ByteBuffer] = { null } override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = { } override def close(): Unit = { }