Skip to content

Commit

Permalink
Remote BlockFetchTracker trait
Browse files Browse the repository at this point in the history
This trait seems to have been created a while ago when there
were multiple implementations; now that there's just one, I think it
makes sense to merge it into the BlockFetcherIterator trait.

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #39 from kayousterhout/remove_tracker and squashes the following commits:

8173939 [Kay Ousterhout] Remote BlockFetchTracker.
  • Loading branch information
kayousterhout authored and pwendell committed Feb 28, 2014
1 parent 40e080a commit edf8a56
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 38 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,14 @@ import org.apache.spark.util.Utils
*/

private[storage]
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])]
with Logging with BlockFetchTracker {
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
def initialize()
def totalBlocks: Int
def numLocalBlocks: Int
def numRemoteBlocks: Int
def remoteFetchTime: Long
def fetchWaitTime: Long
def remoteBytesRead: Long
}


Expand Down Expand Up @@ -233,7 +238,16 @@ object BlockFetcherIterator {
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
}

//an iterator that will read fetched blocks off the queue as they arrive.
override def totalBlocks: Int = numLocal + numRemote
override def numLocalBlocks: Int = numLocal
override def numRemoteBlocks: Int = numRemote
override def remoteFetchTime: Long = _remoteFetchTime
override def fetchWaitTime: Long = _fetchWaitTime
override def remoteBytesRead: Long = _remoteBytesRead


// Implementing the Iterator methods with an iterator that reads fetched blocks off the queue
// as they arrive.
@volatile protected var resultsGotten = 0

override def hasNext: Boolean = resultsGotten < _numBlocksToFetch
Expand All @@ -251,14 +265,6 @@ object BlockFetcherIterator {
}
(result.blockId, if (result.failed) None else Some(result.deserialize()))
}

// Implementing BlockFetchTracker trait.
override def totalBlocks: Int = numLocal + numRemote
override def numLocalBlocks: Int = numLocal
override def numRemoteBlocks: Int = numRemote
override def remoteFetchTime: Long = _remoteFetchTime
override def fetchWaitTime: Long = _fetchWaitTime
override def remoteBytesRead: Long = _remoteBytesRead
}
// End of BasicBlockFetcherIterator

Expand Down

0 comments on commit edf8a56

Please sign in to comment.