Skip to content

Commit

Permalink
Protect TcpNetConnectionSupport calls
Browse files Browse the repository at this point in the history
5.0 introduced the `TcpNetConnectionSupport` user hook to create connection objects.

If a user-supplied instance threw an exception, the server socket would remain open
but the server `accept()` thread is gone.

- wrap the connection initialization code in try/catch
- close the server socket if necessary on an exception
  • Loading branch information
garyrussell authored and artembilan committed Feb 23, 2018
1 parent d5303ca commit 0e30349
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -140,22 +140,31 @@ public void run() {
if (isShuttingDown()) {
if (logger.isInfoEnabled()) {
logger.info("New connection from " + socket.getInetAddress().getHostAddress()
+ ":" + socket.getPort()
+ " rejected; the server is in the process of shutting down.");
}
socket.close();
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Accepted connection from " + socket.getInetAddress().getHostAddress());
logger.debug("Accepted connection from " + socket.getInetAddress().getHostAddress()
+ ":" + socket.getPort());
}
try {
setSocketAttributes(socket);
TcpConnectionSupport connection = this.tcpNetConnectionSupport.createNewConnection(socket, true,
isLookupHost(), getApplicationEventPublisher(), getComponentName());
connection = wrapConnection(connection);
initializeConnection(connection, socket);
getTaskExecutor().execute(connection);
harvestClosedConnections();
connection.publishConnectionOpenEvent();
}
catch (Exception e) {
this.logger.error("Failed to create and configure a TcpConnection for the new socket: "
+ socket.getInetAddress().getHostAddress() + ":" + socket.getPort(), e);
socket.close();
}
setSocketAttributes(socket);
TcpConnectionSupport connection = this.tcpNetConnectionSupport.createNewConnection(socket, true,
isLookupHost(), getApplicationEventPublisher(), getComponentName());
connection = wrapConnection(connection);
initializeConnection(connection, socket);
getTaskExecutor().execute(connection);
harvestClosedConnections();
connection.publishConnectionOpenEvent();
}
}
}
Expand All @@ -167,6 +176,12 @@ public void run() {
else if (isActive()) {
logger.error("Error on ServerSocket; port = " + getPort(), e);
publishServerExceptionEvent(e);
try {
this.serverSocket.close();
}
catch (IOException e1) {
// empty
}
}
}
finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -203,6 +203,7 @@ protected void doAccept(final Selector selector, ServerSocketChannel server, lon
if (isShuttingDown()) {
if (logger.isInfoEnabled()) {
logger.info("New connection from " + channel.socket().getInetAddress().getHostAddress()
+ ":" + channel.socket().getPort()
+ " rejected; the server is in the process of shutting down.");
}
channel.close();
Expand All @@ -226,7 +227,9 @@ protected void doAccept(final Selector selector, ServerSocketChannel server, lon
connection.publishConnectionOpenEvent();
}
catch (Exception e) {
logger.error("Exception accepting new connection", e);
logger.error("Exception accepting new connection from "
+ channel.socket().getInetAddress().getHostAddress()
+ ":" + channel.socket().getPort(), e);
channel.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.ip.tcp.connection;

import static org.assertj.core.api.Assertions.assertThat;

import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import javax.net.SocketFactory;

import org.junit.Test;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.Message;

/**
* @author Gary Russell
* @since 5.0.3
*
*/
public class TcpNetConnectionSupportTests {

@Test
public void testBadCode() throws Exception {
TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(0);
AtomicReference<Message<?>> message = new AtomicReference<>();
CountDownLatch latch1 = new CountDownLatch(1);
server.registerListener(m -> {
message.set(m);
latch1.countDown();
return false;
});
AtomicBoolean firstTime = new AtomicBoolean(true);
server.setTcpNetConnectionSupport(new DefaultTcpNetConnectionSupport() {

@Override
public TcpNetConnection createNewConnection(Socket socket, boolean server, boolean lookupHost,
ApplicationEventPublisher applicationEventPublisher, String connectionFactoryName)
throws Exception {
if (firstTime.getAndSet(false)) {
throw new RuntimeException("intended");
}
return super.createNewConnection(socket, server, lookupHost, applicationEventPublisher, connectionFactoryName);
}

});
CountDownLatch latch2 = new CountDownLatch(1);
server.setApplicationEventPublisher(e -> {
if (e instanceof TcpConnectionServerListeningEvent) {
latch2.countDown();
}
});
server.afterPropertiesSet();
server.start();
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
Socket socket = SocketFactory.getDefault().createSocket("localhost", server.getPort());
socket.close();
socket = SocketFactory.getDefault().createSocket("localhost", server.getPort());
socket.getOutputStream().write("foo\r\n".getBytes());
socket.close();
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(message.get()).isNotNull();
server.stop();
}

}

0 comments on commit 0e30349

Please sign in to comment.