Skip to content

Commit

Permalink
[SPARK-3019] Pluggable block transfer interface (BlockTransferService)
Browse files Browse the repository at this point in the history
This pull request creates a new BlockTransferService interface for block fetch/upload and refactors the existing ConnectionManager to implement BlockTransferService (NioBlockTransferService).

Most of the changes are simply moving code around. The main class to inspect is ShuffleBlockFetcherIterator.

Review guide:
- Most of the ConnectionManager code is now in network.cm package
- ManagedBuffer is a new buffer abstraction backed by several different implementations (file segment, nio ByteBuffer, Netty ByteBuf)
- BlockTransferService is the main internal interface introduced in this PR
- NioBlockTransferService implements BlockTransferService and replaces the old BlockManagerWorker
- ShuffleBlockFetcherIterator replaces the told BlockFetcherIterator to use the new interface

TODOs that should be separate PRs:
- Implement NettyBlockTransferService
- Finalize the API/semantics for ManagedBuffer.release()

Author: Reynold Xin <rxin@apache.org>

Closes apache#2240 from rxin/blockTransferService and squashes the following commits:

64cd9d7 [Reynold Xin] Merge branch 'master' into blockTransferService
1dfd3d7 [Reynold Xin] Limit the length of the FileInputStream.
1332156 [Reynold Xin] Fixed style violation from refactoring.
2960c93 [Reynold Xin] Added ShuffleBlockFetcherIteratorSuite.
e29c721 [Reynold Xin] Updated comment for ShuffleBlockFetcherIterator.
8a1046e [Reynold Xin] Code review feedback:
2c6b1e1 [Reynold Xin] Removed println in test cases.
2a907e4 [Reynold Xin] Merge branch 'master' into blockTransferService-merge
07ccf0d [Reynold Xin] Added init check to CMBlockTransferService.
98c668a [Reynold Xin] Added failure handling and fixed unit tests.
ae05fcd [Reynold Xin] Updated tests, although DistributedSuite is hanging.
d8d595c [Reynold Xin] Merge branch 'master' of github.com:apache/spark into blockTransferService
9ef279c [Reynold Xin] Initial refactoring to move ConnectionManager to use the BlockTransferService.
  • Loading branch information
rxin committed Sep 8, 2014
1 parent 939a322 commit 08ce188
Show file tree
Hide file tree
Showing 38 changed files with 1,129 additions and 1,273 deletions.
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.ConnectionManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
Expand Down Expand Up @@ -59,8 +60,8 @@ class SparkEnv (
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockTransferService: BlockTransferService,
val blockManager: BlockManager,
val connectionManager: ConnectionManager,
val securityManager: SecurityManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
Expand Down Expand Up @@ -88,6 +89,8 @@ class SparkEnv (
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
// actorSystem.awaitTermination()

// Note that blockTransferService is stopped by BlockManager since it is started by it.
}

private[spark]
Expand Down Expand Up @@ -223,14 +226,14 @@ object SparkEnv extends Logging {

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

val blockTransferService = new NioBlockTransferService(conf, securityManager)

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager, mapOutputTracker, shuffleManager)

val connectionManager = blockManager.connectionManager
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

Expand Down Expand Up @@ -278,8 +281,8 @@ object SparkEnv extends Logging {
mapOutputTracker,
shuffleManager,
broadcastManager,
blockTransferService,
blockManager,
connectionManager,
securityManager,
httpFileServer,
sparkFilesDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@

package org.apache.spark.network

import java.nio.ByteBuffer
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.storage.StorageLevel

private[spark] object ReceiverTest {
def main(args: Array[String]) {
val conf = new SparkConf
val manager = new ConnectionManager(9999, conf, new SecurityManager(conf))
println("Started connection manager with id = " + manager.id)

manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
/* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis) */
val buffer = ByteBuffer.wrap("response".getBytes("utf-8"))
Some(Message.createBufferMessage(buffer, msg.id))
})
Thread.currentThread.join()
}
}
trait BlockDataManager {

/**
* Interface to get local block data.
*
* @return Some(buffer) if the block exists locally, and None if it doesn't.
*/
def getBlockData(blockId: String): Option[ManagedBuffer]

/**
* Put the block locally, using the given storage level.
*/
def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network

import java.util.EventListener


/**
* Listener callback interface for [[BlockTransferService.fetchBlocks]].
*/
trait BlockFetchingListener extends EventListener {

/**
* Called once per successfully fetched block.
*/
def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit

/**
* Called upon failures. For each failure, this is called only once (i.e. not once per block).
*/
def onBlockFetchFailure(exception: Throwable): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.storage.StorageLevel


abstract class BlockTransferService {

/**
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
* local blocks or put local blocks.
*/
def init(blockDataManager: BlockDataManager)

/**
* Tear down the transfer service.
*/
def stop(): Unit

/**
* Port number the service is listening on, available only after [[init]] is invoked.
*/
def port: Int

/**
* Host name the service is listening on, available only after [[init]] is invoked.
*/
def hostName: String

/**
* Fetch a sequence of blocks from a remote node asynchronously,
* available only after [[init]] is invoked.
*
* Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block,
* while [[BlockFetchingListener.onBlockFetchFailure]] is called once per failure (not per block).
*
* Note that this API takes a sequence so the implementation can batch requests, and does not
* return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
* the data of a block is fetched, rather than waiting for all blocks to be fetched.
*/
def fetchBlocks(
hostName: String,
port: Int,
blockIds: Seq[String],
listener: BlockFetchingListener): Unit

/**
* Upload a single block to a remote node, available only after [[init]] is invoked.
*/
def uploadBlock(
hostname: String,
port: Int,
blockId: String,
blockData: ManagedBuffer,
level: StorageLevel): Future[Unit]

/**
* A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
*
* It is also only available after [[init]] is invoked.
*/
def fetchBlockSync(hostName: String, port: Int, blockId: String): ManagedBuffer = {
// A monitor for the thread to wait on.
val lock = new Object
@volatile var result: Either[ManagedBuffer, Throwable] = null
fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener {
override def onBlockFetchFailure(exception: Throwable): Unit = {
lock.synchronized {
result = Right(exception)
lock.notify()
}
}
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
lock.synchronized {
result = Left(data)
lock.notify()
}
}
})

// Sleep until result is no longer null
lock.synchronized {
while (result == null) {
try {
lock.wait()
} catch {
case e: InterruptedException =>
}
}
}

result match {
case Left(data) => data
case Right(e) => throw e
}
}

/**
* Upload a single block to a remote node, available only after [[init]] is invoked.
*
* This method is similar to [[uploadBlock]], except this one blocks the thread
* until the upload finishes.
*/
def uploadBlockSync(
hostname: String,
port: Int,
blockId: String,
blockData: ManagedBuffer,
level: StorageLevel): Unit = {
Await.result(uploadBlock(hostname, port, blockId, blockData, level), Duration.Inf)
}
}

This file was deleted.

Loading

0 comments on commit 08ce188

Please sign in to comment.