Skip to content

Commit

Permalink
[SPARK-3453] Netty-based BlockTransferService, extracted from Spark core
Browse files Browse the repository at this point in the history
This PR encapsulates #2330, which is itself a continuation of #2240. The first goal of this
PR is to provide an alternate, simpler implementation of the ConnectionManager which is based on Netty.

In addition to this goal, however, we want to resolve [SPARK-3796](https://issues.apache.org/jira/browse/SPARK-3796), which calls for a
standalone shuffle service which can be integrated into the YARN NodeManager, Standalone Worker, or
on its own. This PR makes the first step in this direction by ensuring that the actual Netty service
is as small as possible and extracted from Spark core. Given this, we should be able to construct
this standalone jar which can be included in other JVMs without incurring significant dependency or
runtime issues. The actual work to ensure that such a standalone shuffle service would work in Spark
will be left for a future PR, however.

In order to minimize dependencies and allow for the service to be long-running (possibly
much longer-running than Spark, and possibly having to support multiple version of Spark
simultaneously), the entire service has been ported to Java, where we have full control
over the binary compatibility of the components and do not depend on the Scala runtime or
version.

These PRs have been addressed by folding in #2330:

SPARK-3453: Refactor Netty module to use BlockTransferService interface
SPARK-3018: Release all buffers upon task completion/failure
SPARK-3002: Create a connection pool and reuse clients across different threads
SPARK-3017: Integration tests and unit tests for connection failures
SPARK-3049: Make sure client doesn't block when server/connection has error(s)
SPARK-3502: SO_RCVBUF and SO_SNDBUF should be bootstrap childOption, not option
SPARK-3503: Disable thread local cache in PooledByteBufAllocator

TODO before mergeable:
[ ] Implement uploadBlock()
[ ] Unit tests for RPC side of code
[ ] Performance testing
[ ] Turn OFF by default (currently on for unit testing)
  • Loading branch information
aarondav committed Oct 10, 2014
1 parent f7e7568 commit 29c6dcf
Show file tree
Hide file tree
Showing 76 changed files with 3,579 additions and 1,899 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>network</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
Expand Down
17 changes: 9 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.network.netty.{NettyBlockTransferService}
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
import org.apache.spark.storage._
import org.apache.spark.util.{AkkaUtils, Utils}


/**
* :: DeveloperApi ::
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
Expand Down Expand Up @@ -233,12 +232,14 @@ object SparkEnv extends Logging {

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

// TODO(rxin): Config option based on class name, similar to shuffle mgr and compression codec.
val blockTransferService = if (conf.getBoolean("spark.shuffle.use.netty", false)) {
new NettyBlockTransferService(conf)
} else {
new NioBlockTransferService(conf, securityManager)
}
// TODO: This is only netty by default for initial testing -- it should not be merged as such!!!
val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" =>
new NettyBlockTransferService(conf)
case "nio" =>
new NioBlockTransferService(conf, securityManager)
}

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.network

import org.apache.spark.storage.StorageLevel

import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.storage.{BlockId, StorageLevel}

private[spark]
trait BlockDataManager {
Expand All @@ -27,10 +27,10 @@ trait BlockDataManager {
* Interface to get local block data. Throws an exception if the block cannot be found or
* cannot be read successfully.
*/
def getBlockData(blockId: String): ManagedBuffer
def getBlockData(blockId: BlockId): ManagedBuffer

/**
* Put the block locally, using the given storage level.
*/
def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): Unit
def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.network

import java.util.EventListener

import org.apache.spark.network.buffer.ManagedBuffer


/**
* Listener callback interface for [[BlockTransferService.fetchBlocks]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
package org.apache.spark.network

import java.io.Closeable
import java.nio.ByteBuffer

import org.apache.spark.network.buffer.ManagedBuffer

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel

import org.apache.spark.util.Utils

private[spark]
abstract class BlockTransferService extends Closeable {
abstract class BlockTransferService extends Closeable with Logging {

/**
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
Expand Down Expand Up @@ -92,10 +94,7 @@ abstract class BlockTransferService extends Closeable {
}
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
lock.synchronized {
val ret = ByteBuffer.allocate(data.size.toInt)
ret.put(data.nioByteBuffer())
ret.flip()
result = Left(new NioManagedBuffer(ret))
result = Left(data)
lock.notify()
}
}
Expand Down
187 changes: 0 additions & 187 deletions core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala

This file was deleted.

Loading

0 comments on commit 29c6dcf

Please sign in to comment.