diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index 2673ec22509e9..fffa1911f5bc2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -84,5 +84,9 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds "Attempted to use %s after its blocks have been removed!".format(toString)) } } + + protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = { + locations_ + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala new file mode 100644 index 0000000000000..dec2f1b56b2da --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.BlockRDD +import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader} +import org.apache.spark._ + +private[streaming] +class HDFSBackedBlockRDDPartition( + val blockId: BlockId, idx: Int, val segment: WriteAheadLogFileSegment) extends Partition { + val index = idx +} + +private[streaming] +class HDFSBackedBlockRDD[T: ClassTag]( + @transient sc: SparkContext, + @transient hadoopConfiguration: Configuration, + @transient override val blockIds: Array[BlockId], + @transient val segments: Array[WriteAheadLogFileSegment], + val storeInBlockManager: Boolean, + val storageLevel: StorageLevel + ) extends BlockRDD[T](sc, blockIds) { + + if (blockIds.length != segments.length) { + throw new IllegalStateException("Number of block ids must be the same as number of segments!") + } + + // Hadoop Configuration is not serializable, so broadcast it as a serializable. + val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration)) + .asInstanceOf[Broadcast[SerializableWritable[Configuration]]] + override def getPartitions: Array[Partition] = { + assertValid() + (0 until blockIds.size).map { i => + new HDFSBackedBlockRDDPartition(blockIds(i), i, segments(i)) + }.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + assertValid() + val hadoopConf = broadcastedHadoopConf.value.value + val blockManager = SparkEnv.get.blockManager + val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] + val blockId = partition.blockId + blockManager.get(blockId) match { + // Data is in Block Manager, grab it from there. + case Some(block) => + block.data.asInstanceOf[Iterator[T]] + // Data not found in Block Manager, grab it from HDFS + case None => + logInfo("Reading partition data from write ahead log " + partition.segment.path) + val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) + val dataRead = reader.read(partition.segment) + reader.close() + // Currently, we support storing the data to BM only in serialized form and not in + // deserialized form + if (storeInBlockManager) { + blockManager.putBytes(blockId, dataRead, storageLevel) + } + dataRead.rewind() + blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] + } + } + + override def getPreferredLocations(split: Partition): Seq[String] = { + val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition] + val locations = getBlockIdLocations() + locations.getOrElse(partition.blockId, + HdfsUtils.getBlockLocations(partition.segment.path, hadoopConfiguration) + .getOrElse(new Array[String](0)).toSeq) + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala new file mode 100644 index 0000000000000..9e1ad312e52fb --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.rdd + +import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer +import org.scalatest.{BeforeAndAfter, FunSuite} + +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} +import org.apache.spark.{SparkConf, SparkContext} + +class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter { + val conf = new SparkConf() + .setMaster("local[2]") + .setAppName(this.getClass.getSimpleName) + val sparkContext = new SparkContext(conf) + val hadoopConf = new Configuration() + val blockManager = sparkContext.env.blockManager + // Since the same BM is reused in all tests, use an atomic int to generate ids + val idGenerator = new AtomicInteger(0) + var file: File = null + var dir: File = null + + before { + dir = Files.createTempDir() + file = new File(dir, "BlockManagerWrite") + } + + after { + file.delete() + dir.delete() + } + + test("Data available in BM and HDFS") { + doTestHDFSBackedRDD(5, 5, 20, 5) + } + + test("Data available in in BM but not in HDFS") { + doTestHDFSBackedRDD(5, 0, 20, 5) + } + + test("Data available in in HDFS and not in BM") { + doTestHDFSBackedRDD(0, 5, 20, 5) + } + + test("Data partially available in BM, and the rest in HDFS") { + doTestHDFSBackedRDD(3, 2, 20, 5) + } + + /** + * Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the + * BlockManager, so all reads need not happen from HDFS. + * @param total - Total number of Strings to write + * @param blockCount - Number of blocks to write (therefore, total # of events per block = + * total/blockCount + */ + private def doTestHDFSBackedRDD( + writeToBMCount: Int, + writeToHDFSCount: Int, + total: Int, + blockCount: Int + ) { + val countPerBlock = total / blockCount + val blockIds = (0 until blockCount).map { + i => + StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet()) + } + + val writtenStrings = generateData(total, countPerBlock) + + if (writeToBMCount != 0) { + (0 until writeToBMCount).foreach { i => + blockManager + .putIterator(blockIds(i), writtenStrings(i).iterator, StorageLevel.MEMORY_ONLY_SER) + } + } + + val segments = new ArrayBuffer[WriteAheadLogFileSegment] + if (writeToHDFSCount != 0) { + // Generate some fake segments for the blocks in BM so the RDD does not complain + segments ++= generateFakeSegments(writeToBMCount) + segments ++= writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount), + blockIds.slice(writeToBMCount, blockCount)) + + } else { + segments ++= generateFakeSegments(blockCount) + } + val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray, + segments.toArray, false, StorageLevel.MEMORY_ONLY) + + val dataFromRDD = rdd.collect() + // verify each partition is equal to the data pulled out + assert(writtenStrings.flatten === dataFromRDD) + } + + /** + * Write data to HDFS and get a list of Seq of Seqs in which each Seq represents the data that + * went into one block. + * @param count - Number of Strings to write + * @param countPerBlock - Number of Strings per block + * @return - Tuple of (Seq of Seqs, each of these Seqs is one block, Seq of WriteAheadLogFileSegments, + * each representing the block being written to HDFS. + */ + private def generateData( + count: Int, + countPerBlock: Int + ): Seq[Seq[String]] = { + val strings = (0 until count).map { _ => scala.util.Random.nextString(50)} + strings.grouped(countPerBlock).toSeq + } + + private def writeDataToHDFS( + blockData: Seq[Seq[String]], + blockIds: Seq[BlockId] + ): Seq[WriteAheadLogFileSegment] = { + assert(blockData.size === blockIds.size) + val segments = new ArrayBuffer[WriteAheadLogFileSegment]() + val writer = new WriteAheadLogWriter(file.toString, hadoopConf) + blockData.zip(blockIds).foreach { + case (data, id) => + segments += writer.write(blockManager.dataSerialize(id, data.iterator)) + } + writer.close() + segments + } + + private def generateFakeSegments(count: Int): Seq[WriteAheadLogFileSegment] = { + (0 until count).map { + _ => new WriteAheadLogFileSegment("random", 0l, 0) + } + } +}