Skip to content

Commit

Permalink
Implement java.io.Closeable interface.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin authored and aarondav committed Oct 10, 2014
1 parent 519d64d commit c066309
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.network

import java.io.Closeable

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.storage.StorageLevel


private[spark]
abstract class BlockTransferService {
abstract class BlockTransferService extends Closeable {

/**
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
Expand All @@ -35,7 +37,7 @@ abstract class BlockTransferService {
/**
* Tear down the transfer service.
*/
def stop(): Unit
def close(): Unit

/**
* Port number the service is listening on, available only after [[init]] is invoked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.network.netty

import java.io.Closeable
import java.util.concurrent.TimeoutException

import io.netty.channel.{ChannelFuture, ChannelFutureListener}
Expand All @@ -39,7 +40,7 @@ import org.apache.spark.network.BlockFetchingListener
*/
@throws[TimeoutException]
private[netty]
class BlockClient(cf: ChannelFuture, handler: BlockClientHandler) extends Logging {
class BlockClient(cf: ChannelFuture, handler: BlockClientHandler) extends Closeable with Logging {

private[this] val serverAddr = cf.channel().remoteAddress().toString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.network.netty

import java.io.Closeable
import java.util.concurrent.{ConcurrentHashMap, TimeoutException}

import io.netty.bootstrap.Bootstrap
Expand All @@ -41,7 +42,7 @@ import org.apache.spark.util.Utils
* for the same remote host. It also shares a single worker thread pool for all [[BlockClient]]s.
*/
private[netty]
class BlockClientFactory(val conf: NettyConfig) {
class BlockClientFactory(val conf: NettyConfig) extends Closeable {

def this(sparkConf: SparkConf) = this(new NettyConfig(sparkConf))

Expand Down Expand Up @@ -140,7 +141,7 @@ class BlockClientFactory(val conf: NettyConfig) {
}

/** Close all connections in the connection pool, and shutdown the worker thread pool. */
def stop(): Unit = {
override def close(): Unit = {
val iter = connectionPool.entrySet().iterator()
while (iter.hasNext) {
val entry = iter.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.network.netty

import java.io.Closeable
import java.net.InetSocketAddress

import io.netty.bootstrap.ServerBootstrap
Expand All @@ -29,7 +30,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.channel.socket.oio.OioServerSocketChannel
import io.netty.channel.{ChannelInitializer, ChannelFuture, ChannelOption}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.Logging
import org.apache.spark.network.BlockDataManager
import org.apache.spark.util.Utils

Expand All @@ -38,7 +39,8 @@ import org.apache.spark.util.Utils
* Server for the [[NettyBlockTransferService]].
*/
private[netty]
class BlockServer(conf: NettyConfig, dataProvider: BlockDataManager) extends Logging {
class BlockServer(conf: NettyConfig, dataProvider: BlockDataManager)
extends Closeable with Logging {

def port: Int = _port

Expand Down Expand Up @@ -115,7 +117,7 @@ class BlockServer(conf: NettyConfig, dataProvider: BlockDataManager) extends Log
}

/** Shutdown the server. */
def stop(): Unit = {
def close(): Unit = {
if (channelFuture != null) {
channelFuture.channel().close().awaitUninterruptibly()
channelFuture = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ final class NettyBlockTransferService(conf: SparkConf) extends BlockTransferServ
clientFactory = new BlockClientFactory(nettyConf)
}

override def stop(): Unit = {
override def close(): Unit = {
if (server != null) {
server.stop()
server.close()
}
if (clientFactory != null) {
clientFactory.stop()
clientFactory.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
/**
* Tear down the transfer service.
*/
override def stop(): Unit = {
override def close(): Unit = {
if (cm != null) {
cm.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,7 @@ private[spark] class BlockManager(
}

def stop(): Unit = {
blockTransferService.stop()
blockTransferService.close()
diskBlockManager.stop()
actorSystem.stop(slaveActor)
blockInfo.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class BlockClientFactorySuite extends FunSuite with BeforeAndAfterAll {

override def afterAll() {
if (server1 != null) {
server1.stop()
server1.close()
}
if (server2 != null) {
server2.stop()
server2.close()
}
}

Expand All @@ -55,7 +55,7 @@ class BlockClientFactorySuite extends FunSuite with BeforeAndAfterAll {
assert(c3.isActive)
assert(c1 === c2)
assert(c1 !== c3)
factory.stop()
factory.close()
}

test("never return inactive clients") {
Expand All @@ -75,7 +75,7 @@ class BlockClientFactorySuite extends FunSuite with BeforeAndAfterAll {
// Create c2, which should be different from c1
val c2 = factory.createClient(server1.hostName, server1.port)
assert(c1 !== c2)
factory.stop()
factory.close()
}

test("BlockClients are close when BlockClientFactory is stopped") {
Expand All @@ -84,7 +84,7 @@ class BlockClientFactorySuite extends FunSuite with BeforeAndAfterAll {
val c2 = factory.createClient(server2.hostName, server2.port)
assert(c1.isActive)
assert(c2.isActive)
factory.stop()
factory.close()
assert(!c1.isActive)
assert(!c2.isActive)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ class ServerClientIntegrationSuite extends FunSuite with BeforeAndAfterAll {
}

override def afterAll() = {
server.stop()
clientFactory.stop()
server.close()
clientFactory.close()
}

/** A ByteBuf for buffer_block */
Expand Down Expand Up @@ -162,7 +162,7 @@ class ServerClientIntegrationSuite extends FunSuite with BeforeAndAfterAll {

test("shutting down server should also close client") {
val client = clientFactory.createClient(server.hostName, server.port)
server.stop()
server.close()
eventually(timeout(Span(5, Seconds))) { assert(!client.isActive) }
}
}

0 comments on commit c066309

Please sign in to comment.