diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index 4763afee0ed11..17b59f6ce8715 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -129,26 +129,28 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO int clientIndex = rand.nextInt(numConnectionsPerPeer); TransportClient cachedClient = clientPool.clients[clientIndex]; - if (cachedClient != null) { - if (cachedClient.isActive()) { - logger.trace("Returning cached connection to {}: {}", address, cachedClient); - return cachedClient; - } else { - logger.info("Found inactive connection to {}, closing it.", address); - clientPool.clients[clientIndex] = null; // Remove inactive clients. - } + + if (cachedClient != null && cachedClient.isActive()) { + logger.trace("Returning cached connection to {}: {}", address, cachedClient); + return cachedClient; } // If we reach here, we don't have an existing connection open. Let's create a new one. - // Multiple threads might race here to create new connections. Let's keep only one of them - // active at anytime. + // Multiple threads might race here to create new connections. Keep only one of them active. synchronized (clientPool.locks[clientIndex]) { - if (clientPool.clients[clientIndex] == null || !clientPool.clients[clientIndex].isActive()) { - clientPool.clients[clientIndex] = createClient(address); + cachedClient = clientPool.clients[clientIndex]; + + if (cachedClient != null) { + if (cachedClient.isActive()) { + logger.trace("Returning cached connection to {}: {}", address, cachedClient); + return cachedClient; + } else { + logger.info("Found inactive connection to {}, creating a new one.", address); + } } + clientPool.clients[clientIndex] = createClient(address); + return clientPool.clients[clientIndex]; } - - return clientPool.clients[clientIndex]; } /** Create a completely new {@link TransportClient} to the remote address. */ @@ -203,7 +205,7 @@ public void initChannel(SocketChannel ch) { long postBootstrap = System.nanoTime(); logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)", - address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000); + address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000); return client; }