diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala deleted file mode 100644 index 2e0b0e6eda765..0000000000000 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -private[spark] trait BlockFetchTracker { - def totalBlocks : Int - def numLocalBlocks: Int - def numRemoteBlocks: Int - def remoteFetchTime : Long - def fetchWaitTime: Long - def remoteBytesRead : Long -} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 925022e7fe6fb..fb50b45bd4197 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -44,9 +44,14 @@ import org.apache.spark.util.Utils */ private[storage] -trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] - with Logging with BlockFetchTracker { +trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging { def initialize() + def totalBlocks: Int + def numLocalBlocks: Int + def numRemoteBlocks: Int + def remoteFetchTime: Long + def fetchWaitTime: Long + def remoteBytesRead: Long } @@ -233,7 +238,16 @@ object BlockFetcherIterator { logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") } - //an iterator that will read fetched blocks off the queue as they arrive. + override def totalBlocks: Int = numLocal + numRemote + override def numLocalBlocks: Int = numLocal + override def numRemoteBlocks: Int = numRemote + override def remoteFetchTime: Long = _remoteFetchTime + override def fetchWaitTime: Long = _fetchWaitTime + override def remoteBytesRead: Long = _remoteBytesRead + + + // Implementing the Iterator methods with an iterator that reads fetched blocks off the queue + // as they arrive. @volatile protected var resultsGotten = 0 override def hasNext: Boolean = resultsGotten < _numBlocksToFetch @@ -251,14 +265,6 @@ object BlockFetcherIterator { } (result.blockId, if (result.failed) None else Some(result.deserialize())) } - - // Implementing BlockFetchTracker trait. - override def totalBlocks: Int = numLocal + numRemote - override def numLocalBlocks: Int = numLocal - override def numRemoteBlocks: Int = numRemote - override def remoteFetchTime: Long = _remoteFetchTime - override def fetchWaitTime: Long = _fetchWaitTime - override def remoteBytesRead: Long = _remoteBytesRead } // End of BasicBlockFetcherIterator