Skip to content

Commit

Permalink
INT-4465: Fix delay in close propagation with NIO
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-4465

There is a one second delay before a socket close is propagated if there is an active
assembler. This is generally only a problem with deserializers that use EOF to signal
message end (such as the `ByteArrayElasticRawDeserializer`).

Attempt to insert an EOF marker into the buffer queue so that the `getNextBuffer()` will
exit immediately on `close()` if it is blocked awaiting a buffer.

**cherry-pick to 5.0.x, 4.3.x**

(cherry picked from commit d9186f1)
  • Loading branch information
garyrussell authored and artembilan committed May 9, 2018
1 parent 8e1f048 commit 9dcb28b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,6 +58,8 @@ public class TcpNioConnection extends TcpConnectionSupport {

private static final long DEFAULT_PIPE_TIMEOUT = 60000;

private static final byte[] EOF = new byte[0]; // EOF marker buffer

private final SocketChannel socketChannel;

private final ChannelOutputStream channelOutputStream;
Expand Down Expand Up @@ -714,7 +716,7 @@ private byte[] getNextBuffer() throws IOException {
while (buffer == null) {
try {
buffer = this.buffers.poll(1, TimeUnit.SECONDS);
if (buffer == null && this.isClosed) {
if (buffer == EOF || (buffer == null && this.isClosed)) {
return null;
}
}
Expand Down Expand Up @@ -758,6 +760,12 @@ public void write(ByteBuffer byteBuffer) throws IOException {
public void close() throws IOException {
super.close();
this.isClosed = true;
try {
this.buffers.offer(EOF, TcpNioConnection.this.pipeTimeout, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.integration.ip.tcp.connection;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -64,6 +65,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
Expand All @@ -90,6 +92,7 @@
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StopWatch;


/**
Expand Down Expand Up @@ -130,6 +133,7 @@ public void testWriteTimeout() throws Exception {
Socket s = server.accept();
// block so we fill the buffer
done.await(10, TimeUnit.SECONDS);
s.close();
}
catch (Exception e) {
e.printStackTrace();
Expand Down Expand Up @@ -806,6 +810,36 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
te.shutdown();
}

@Test
@Ignore // Timing is too short for CI/Travis
public void testNoDelayOnClose() throws Exception {
TcpNioServerConnectionFactory cf = new TcpNioServerConnectionFactory(0);
final CountDownLatch reading = new CountDownLatch(1);
final StopWatch watch = new StopWatch();
cf.setDeserializer(is -> {
reading.countDown();
watch.start();
is.read();
is.read();
watch.stop();
return null;
});
cf.registerListener(m -> false);
final CountDownLatch listening = new CountDownLatch(1);
cf.setApplicationEventPublisher(e -> {
listening.countDown();
});
cf.afterPropertiesSet();
cf.start();
assertTrue(listening.await(10, TimeUnit.SECONDS));
Socket socket = SocketFactory.getDefault().createSocket("localhost", cf.getPort());
socket.getOutputStream().write("x".getBytes());
assertTrue(reading.await(10, TimeUnit.SECONDS));
socket.close();
cf.stop();
assertThat(watch.getLastTaskTimeMillis(), lessThan(950L));
}

private void readFully(InputStream is, byte[] buff) throws IOException {
for (int i = 0; i < buff.length; i++) {
buff[i] = (byte) is.read();
Expand Down

0 comments on commit 9dcb28b

Please sign in to comment.