Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4183] Close transport-related resources between SparkContexts #3053

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ object SparkEnv extends Logging {
val shuffleMemoryManager = new ShuffleMemoryManager(conf)

val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "nio").toLowerCase match {
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" =>
new NettyBlockTransferService(conf)
case "nio" =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,8 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService {
result.future
}

override def close(): Unit = server.close()
override def close(): Unit = {
server.close()
clientFactory.close()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the most significant leak, we'd leak one event loop per SparkContext

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,10 @@ private[spark] class BlockManager(

def stop(): Unit = {
blockTransferService.close()
if (shuffleClient ne blockTransferService) {
// Closing should be idempotent, but maybe not for the NioBlockTransferService.
shuffleClient.close()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the other leaks would only leak during the ExternalShuffleIntegrationSuite, which only creates a handful of SparkContexts.

}
diskBlockManager.stop()
actorSystem.stop(slaveActor)
blockInfo.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.storage.BlockManagerId
/**
* Test add and remove behavior of ExecutorAllocationManager.
*/
class ExecutorAllocationManagerSuite extends FunSuite {
class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 please take a look at the changes to this test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

import ExecutorAllocationManager._
import ExecutorAllocationManagerSuite._

Expand All @@ -36,17 +36,21 @@ class ExecutorAllocationManagerSuite extends FunSuite {
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
intercept[SparkException] { new SparkContext(conf) }
SparkEnv.get.stop() // cleanup the created environment

// Only min
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
intercept[SparkException] { new SparkContext(conf1) }
SparkEnv.get.stop()

// Only max
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
intercept[SparkException] { new SparkContext(conf2) }
SparkEnv.get.stop()

// Both min and max, but min > max
intercept[SparkException] { createSparkContext(2, 1) }
SparkEnv.get.stop()

// Both min and max, and min == max
val sc1 = createSparkContext(1, 1)
Expand All @@ -60,18 +64,17 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}

test("starting state") {
val sc = createSparkContext()
sc = createSparkContext()
val manager = sc.executorAllocationManager.get
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
assert(executorIds(manager).isEmpty)
assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
assert(removeTimes(manager).isEmpty)
sc.stop()
}

test("add executors") {
val sc = createSparkContext(1, 10)
sc = createSparkContext(1, 10)
val manager = sc.executorAllocationManager.get

// Keep adding until the limit is reached
Expand Down Expand Up @@ -112,11 +115,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
assert(addExecutors(manager) === 0)
assert(numExecutorsPending(manager) === 6)
assert(numExecutorsToAdd(manager) === 1)
sc.stop()
}

test("remove executors") {
val sc = createSparkContext(5, 10)
sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
(1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }

Expand Down Expand Up @@ -163,11 +165,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
assert(executorsPendingToRemove(manager).isEmpty)
assert(!removeExecutor(manager, "8"))
assert(executorsPendingToRemove(manager).isEmpty)
sc.stop()
}

test ("interleaving add and remove") {
val sc = createSparkContext(5, 10)
sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get

// Add a few executors
Expand Down Expand Up @@ -232,11 +233,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
onExecutorAdded(manager, "15")
onExecutorAdded(manager, "16")
assert(executorIds(manager).size === 10)
sc.stop()
}

test("starting/canceling add timer") {
val sc = createSparkContext(2, 10)
sc = createSparkContext(2, 10)
val clock = new TestClock(8888L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
Expand Down Expand Up @@ -268,7 +268,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}

test("starting/canceling remove timers") {
val sc = createSparkContext(2, 10)
sc = createSparkContext(2, 10)
val clock = new TestClock(14444L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
Expand Down Expand Up @@ -313,7 +313,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}

test("mock polling loop with no events") {
val sc = createSparkContext(1, 20)
sc = createSparkContext(1, 20)
val manager = sc.executorAllocationManager.get
val clock = new TestClock(2020L)
manager.setClock(clock)
Expand All @@ -339,7 +339,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}

test("mock polling loop add behavior") {
val sc = createSparkContext(1, 20)
sc = createSparkContext(1, 20)
val clock = new TestClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
Expand Down Expand Up @@ -388,7 +388,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}

test("mock polling loop remove behavior") {
val sc = createSparkContext(1, 20)
sc = createSparkContext(1, 20)
val clock = new TestClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
Expand Down Expand Up @@ -449,7 +449,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}

test("listeners trigger add executors correctly") {
val sc = createSparkContext(2, 10)
sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get
assert(addTime(manager) === NOT_SET)

Expand Down Expand Up @@ -479,7 +479,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}

test("listeners trigger remove executors correctly") {
val sc = createSparkContext(2, 10)
sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get
assert(removeTimes(manager).isEmpty)

Expand Down Expand Up @@ -510,7 +510,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}

test("listeners trigger add and remove executor callbacks correctly") {
val sc = createSparkContext(2, 10)
sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
Expand Down
21 changes: 20 additions & 1 deletion core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils

class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
class MapOutputTrackerSuite extends FunSuite {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwendell maybe take a look at the changes here? No one in particular is responsible for this test, I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the shutdown code isn't in a try-finally block or after() function, I guess it's still possible that it won't be called when tests fail. So, one test failure still might trigger spurious failures of later tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is less of a concern, though, since I guess we're more worried about leaked resources from a passing test causing a subsequent test to fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, either would be better, but since each test creates a different set of things, and sometimes calling stop() throws an exception, I decided to just do it in the non-failing case for this PR.

private val conf = new SparkConf

test("master start and stop") {
Expand All @@ -37,6 +37,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
tracker.trackerActor =
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.stop()
actorSystem.shutdown()
}

test("master register shuffle and fetch") {
Expand All @@ -56,6 +57,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
(BlockManagerId("b", "hostB", 1000), size10000)))
tracker.stop()
actorSystem.shutdown()
}

test("master register and unregister shuffle") {
Expand All @@ -74,6 +76,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
tracker.unregisterShuffle(10)
assert(!tracker.containsShuffle(10))
assert(tracker.getServerStatuses(10, 0).isEmpty)

tracker.stop()
actorSystem.shutdown()
}

test("master register shuffle and unregister map output and fetch") {
Expand All @@ -97,6 +102,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// this should cause it to fail, and the scheduler will ignore the failure due to the
// stage already being aborted.
intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }

tracker.stop()
actorSystem.shutdown()
}

test("remote fetch") {
Expand Down Expand Up @@ -136,6 +144,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {

// failure should be cached
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }

masterTracker.stop()
slaveTracker.stop()
actorSystem.shutdown()
slaveSystem.shutdown()
}

test("remote fetch below akka frame size") {
Expand All @@ -154,6 +167,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
masterActor.receive(GetMapOutputStatuses(10))

// masterTracker.stop() // this throws an exception
actorSystem.shutdown()
}

test("remote fetch exceeds akka frame size") {
Expand All @@ -176,5 +192,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
}
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }

// masterTracker.stop() // this throws an exception
actorSystem.shutdown()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@

package org.apache.spark

import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester}
import org.scalatest.{FunSuite, PrivateMethodTester}

import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend

class SparkContextSchedulerCreationSuite
extends FunSuite with PrivateMethodTester with Logging with BeforeAndAfterEach {
extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging {

def createTaskScheduler(master: String): TaskSchedulerImpl = {
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val createTaskSchedulerMethod =
PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,16 @@ class FlumePollingStreamSuite extends TestSuiteBase {
outputStream.register()

ssc.start()
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
assertChannelIsEmpty(channel)
assertChannelIsEmpty(channel2)
sink.stop()
channel.stop()
try {
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
assertChannelIsEmpty(channel)
assertChannelIsEmpty(channel2)
} finally {
sink.stop()
sink2.stop()
channel.stop()
channel2.stop()
}
}

def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class TransportClientFactory implements Closeable {
private final ConcurrentHashMap<SocketAddress, TransportClient> connectionPool;

private final Class<? extends Channel> socketChannelClass;
private final EventLoopGroup workerGroup;
private EventLoopGroup workerGroup;

public TransportClientFactory(TransportContext context) {
this.context = context;
Expand Down Expand Up @@ -150,6 +150,7 @@ public void close() {

if (workerGroup != null) {
workerGroup.shutdownGracefully();
workerGroup = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class TransportServer implements Closeable {
private ChannelFuture channelFuture;
private int port = -1;

/** Creates a TransportServer that binds to the given port, or to any available if 0. */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a comment @rxin asked for, probably won't hurt anything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 :)

public TransportServer(TransportContext context, int portToBind) {
this.context = context;
this.conf = context.getConf();
Expand All @@ -67,7 +68,7 @@ private void init(int portToBind) {

IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
EventLoopGroup workerGroup = bossGroup;

bootstrap = new ServerBootstrap()
Expand Down Expand Up @@ -105,7 +106,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
@Override
public void close() {
if (channelFuture != null) {
// close is a local operation and should finish with milliseconds; timeout just to be safe
// close is a local operation and should finish within milliseconds; timeout just to be safe
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
channelFuture = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.network.shuffle;

import java.io.Closeable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,4 +87,9 @@ public void registerWithShuffleServer(
JavaUtils.serialize(new RegisterExecutor(appId, execId, executorInfo));
client.sendRpcSync(registerExecutorMessage, 5000 /* timeoutMs */);
}

@Override
public void close() {
clientFactory.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.network.shuffle;

import java.io.Closeable;

/** Provides an interface for reading shuffle files, either from an Executor or external service. */
public interface ShuffleClient {
public interface ShuffleClient extends Closeable {
/**
* Fetch a sequence of blocks from a remote node asynchronously,
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
after {
if (ssc != null) {
ssc.stop()
if (ssc.sc != null) {
// Calling ssc.stop() does not always stop the associated SparkContext.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas I have reverted the changes, though you can see from the fact that we have to do this that the API is not clean.

ssc.sc.stop()
}
ssc = null
}
if (sc != null) {
Expand Down