Skip to content

Commit

Permalink
Some logging and clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Feb 26, 2014
1 parent 4d88030 commit d238b88
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ object BlockFetcherIterator {
}

protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
// Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them
// smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
// nodes, rather than blocking on reading output from one node.
val maxRequestSize = math.max(maxBytesInFlight / 5, 1L)
logInfo("maxBytesInFlight: " + maxBytesInFlight + ", maxRequestSize: " + maxRequestSize)

// Split local and remote blocks. Remote blocks are further split into FetchRequests of size
// at most maxBytesInFlight in order to limit the amount of data in flight.
val remoteRequests = new ArrayBuffer[FetchRequest]
Expand All @@ -157,11 +163,6 @@ object BlockFetcherIterator {
_numBlocksToFetch += localBlocksToFetch.size
} else {
numRemote += blockInfos.size
// Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them
// smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
// nodes, rather than blocking on reading output from one node.
val minRequestSize = math.max(maxBytesInFlight / 5, 1L)
logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize)
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = new ArrayBuffer[(BlockId, Long)]
Expand All @@ -176,11 +177,12 @@ object BlockFetcherIterator {
} else if (size < 0) {
throw new BlockException(blockId, "Negative block size " + size)
}
if (curRequestSize >= minRequestSize) {
if (curRequestSize >= maxRequestSize) {
// Add this FetchRequest
remoteRequests += new FetchRequest(address, curBlocks)
curRequestSize = 0
curBlocks = new ArrayBuffer[(BlockId, Long)]
logDebug(s"Creating fetch request of $curRequestSize at $address")
}
}
// Add in the final request
Expand All @@ -189,7 +191,7 @@ object BlockFetcherIterator {
}
}
}
logInfo("Getting " + _numBlocksToFetch + " non-zero-bytes blocks out of " +
logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
totalBlocks + " blocks")
remoteRequests
}
Expand Down Expand Up @@ -224,8 +226,8 @@ object BlockFetcherIterator {
sendRequest(fetchRequests.dequeue())
}

val numGets = remoteRequests.size - fetchRequests.size
logInfo("Started " + numGets + " remote gets in " + Utils.getUsedTimeMs(startTime))
val numFetches = remoteRequests.size - fetchRequests.size
logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))

// Get Local Blocks
startTime = System.currentTimeMillis
Expand Down Expand Up @@ -325,7 +327,7 @@ object BlockFetcherIterator {
}

copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))
logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
logInfo("Started " + fetchRequestsSync.size + " remote fetches in " +
Utils.getUsedTimeMs(startTime))

// Get Local Blocks
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,7 @@ private[spark] object Utils extends Logging {
}

/**
* Return the string to tell how long has passed in seconds. The passing parameter should be in
* millisecond.
* Return the string to tell how long has passed in milliseconds.
*/
def getUsedTimeMs(startTimeMs: Long): String = {
return " " + (System.currentTimeMillis - startTimeMs) + " ms"
Expand Down

0 comments on commit d238b88

Please sign in to comment.