Skip to content

Commit

Permalink
Initial refactoring to move ConnectionManager to use the BlockTransfe…
Browse files Browse the repository at this point in the history
…rService.
  • Loading branch information
rxin committed Aug 28, 2014
1 parent b21ae5b commit 9ef279c
Show file tree
Hide file tree
Showing 33 changed files with 784 additions and 859 deletions.
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.ConnectionManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.cm.CMBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
Expand Down Expand Up @@ -59,8 +60,8 @@ class SparkEnv (
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockTransferService: BlockTransferService,
val blockManager: BlockManager,
val connectionManager: ConnectionManager,
val securityManager: SecurityManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
Expand All @@ -79,6 +80,7 @@ class SparkEnv (
Option(httpFileServer).foreach(_.stop())
mapOutputTracker.stop()
shuffleManager.stop()
blockTransferService.stop()
broadcastManager.stop()
blockManager.stop()
blockManager.master.stop()
Expand Down Expand Up @@ -223,14 +225,14 @@ object SparkEnv extends Logging {

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

val blockTransferService = new CMBlockTransferService(conf, securityManager)

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager, mapOutputTracker, shuffleManager)

val connectionManager = blockManager.connectionManager
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

Expand Down Expand Up @@ -278,8 +280,8 @@ object SparkEnv extends Logging {
mapOutputTracker,
shuffleManager,
broadcastManager,
blockTransferService,
blockManager,
connectionManager,
securityManager,
httpFileServer,
sparkFilesDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@

package org.apache.spark.network

import java.nio.ByteBuffer
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.storage.StorageLevel

private[spark] object ReceiverTest {
def main(args: Array[String]) {
val conf = new SparkConf
val manager = new ConnectionManager(9999, conf, new SecurityManager(conf))
println("Started connection manager with id = " + manager.id)

manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
/* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis) */
val buffer = ByteBuffer.wrap("response".getBytes("utf-8"))
Some(Message.createBufferMessage(buffer, msg.id))
})
Thread.currentThread.join()
}
}
trait BlockDataManager {

/**
* Interface to get local block data.
*
* @return Some(buffer) if the block exists locally, and None if it doesn't.
*/
def getBlockData(blockId: String): Option[ManagedBuffer]

/**
* Put the block locally, using the given storage level.
*/
def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network

import java.util.EventListener


/**
* Listener callback interface for [[BlockTransferService.fetchBlocks]].
*/
trait BlockFetchingListener extends EventListener {

/**
* Called once per successfully fetched block.
*/
def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit

/**
* Called upon failures.
*/
def onBlockFetchFailure(exception: Exception): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network

import org.apache.spark.storage.StorageLevel


abstract class BlockTransferService {

/**
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
* local blocks or put local blocks.
*/
def init(blockDataManager: BlockDataManager)

/**
* Tear down the transfer service.
*/
def stop(): Unit

/**
* Port number the service is listening on, available only after [[init]] is invoked.
*/
def port: Int

/**
* Host name the service is listening on, available only after [[init]] is invoked.
*/
def hostName: String

/**
* Fetch a sequence of blocks from a remote node, available only after [[init]] is invoked.
*
* This takes a sequence so the implementation can batch requests.
*/
def fetchBlocks(
hostName: String,
port: Int,
blockIds: Seq[String],
listener: BlockFetchingListener): Unit

/**
* Fetch a single block from a remote node, available only after [[init]] is invoked.
*
* This is functionally equivalent to
* {{{
* fetchBlocks(hostName, port, Seq(blockId)).iterator().next()._2
* }}}
*/
def fetchBlock(hostName: String, port: Int, blockId: String): ManagedBuffer = {
//fetchBlocks(hostName, port, Seq(blockId)).iterator().next()._2
null
}

/**
* Upload a single block to a remote node, available only after [[init]] is invoked.
*
* This call blocks until the upload completes, or throws an exception upon failures.
*/
def uploadBlock(
hostname: String,
port: Int,
blockId: String,
blockData: ManagedBuffer,
level: StorageLevel): Unit
}

This file was deleted.

Loading

0 comments on commit 9ef279c

Please sign in to comment.