diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 925022e7fe6fb..962b79f46c78a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -146,6 +146,12 @@ object BlockFetcherIterator { } protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { + // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them + // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 + // nodes, rather than blocking on reading output from one node. + val maxRequestSize = math.max(maxBytesInFlight / 5, 1L) + logInfo("maxBytesInFlight: " + maxBytesInFlight + ", maxRequestSize: " + maxRequestSize) + // Split local and remote blocks. Remote blocks are further split into FetchRequests of size // at most maxBytesInFlight in order to limit the amount of data in flight. val remoteRequests = new ArrayBuffer[FetchRequest] @@ -157,11 +163,6 @@ object BlockFetcherIterator { _numBlocksToFetch += localBlocksToFetch.size } else { numRemote += blockInfos.size - // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them - // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 - // nodes, rather than blocking on reading output from one node. - val minRequestSize = math.max(maxBytesInFlight / 5, 1L) - logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize) val iterator = blockInfos.iterator var curRequestSize = 0L var curBlocks = new ArrayBuffer[(BlockId, Long)] @@ -176,11 +177,12 @@ object BlockFetcherIterator { } else if (size < 0) { throw new BlockException(blockId, "Negative block size " + size) } - if (curRequestSize >= minRequestSize) { + if (curRequestSize >= maxRequestSize) { // Add this FetchRequest remoteRequests += new FetchRequest(address, curBlocks) curRequestSize = 0 curBlocks = new ArrayBuffer[(BlockId, Long)] + logDebug(s"Creating fetch request of $curRequestSize at $address") } } // Add in the final request @@ -189,7 +191,7 @@ object BlockFetcherIterator { } } } - logInfo("Getting " + _numBlocksToFetch + " non-zero-bytes blocks out of " + + logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " + totalBlocks + " blocks") remoteRequests } @@ -224,8 +226,8 @@ object BlockFetcherIterator { sendRequest(fetchRequests.dequeue()) } - val numGets = remoteRequests.size - fetchRequests.size - logInfo("Started " + numGets + " remote gets in " + Utils.getUsedTimeMs(startTime)) + val numFetches = remoteRequests.size - fetchRequests.size + logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime)) // Get Local Blocks startTime = System.currentTimeMillis @@ -325,7 +327,7 @@ object BlockFetcherIterator { } copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6)) - logInfo("Started " + fetchRequestsSync.size + " remote gets in " + + logInfo("Started " + fetchRequestsSync.size + " remote fetches in " + Utils.getUsedTimeMs(startTime)) // Get Local Blocks diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8e69f1d3351b5..bd760c8d90335 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -463,8 +463,7 @@ private[spark] object Utils extends Logging { } /** - * Return the string to tell how long has passed in seconds. The passing parameter should be in - * millisecond. + * Return the string to tell how long has passed in milliseconds. */ def getUsedTimeMs(startTimeMs: Long): String = { return " " + (System.currentTimeMillis - startTimeMs) + " ms"