diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 1c7b7787e19f4..23295bf658712 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -117,9 +117,9 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( */ override def getPreferredLocations(split: Partition): Seq[String] = { val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] - def blockLocations = getBlockIdLocations().get(partition.blockId) + val blockLocations = getBlockIdLocations().get(partition.blockId) def segmentLocations = HdfsUtils.getFileSegmentLocations( partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig) - blockLocations.orElse(segmentLocations).getOrElse(Seq.empty) + blockLocations.getOrElse(segmentLocations) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index 59947891cb6c3..27a28bab83ed5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -54,12 +54,12 @@ private[streaming] object HdfsUtils { /** Get the locations of the HDFS blocks containing the given file segment. */ def getFileSegmentLocations( - path: String, offset: Long, length: Long, conf: Configuration): Option[Seq[String]] = { + path: String, offset: Long, length: Long, conf: Configuration): Array[String] = { val dfsPath = new Path(path) val dfs = getFileSystemForPath(dfsPath, conf) val fileStatus = dfs.getFileStatus(dfsPath) val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, offset, length)) - blockLocs.map(_.flatMap(_.getHosts)) + blockLocs.map(_.flatMap(_.getHosts)).getOrElse(Array.empty) } def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = {