diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Channel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Channel.java index 8c9531f858a..aa54b7a6566 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Channel.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Channel.java @@ -21,8 +21,6 @@ /** * Channel. (API/SPI, Prototype, ThreadSafe) * - * - * * @see org.apache.dubbo.remoting.Client * @see org.apache.dubbo.remoting.Server#getChannels() * @see org.apache.dubbo.remoting.Server#getChannel(InetSocketAddress) @@ -73,5 +71,4 @@ public interface Channel extends Endpoint { * @param key key. */ void removeAttribute(String key); - } \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java index fa6d5bf4107..3922626fdfb 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java @@ -57,5 +57,4 @@ public interface ExchangeChannel extends Channel { */ @Override void close(int timeout); - } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Response.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Response.java index bfd72f5d7e6..c24e5e4d9ac 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Response.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Response.java @@ -40,6 +40,11 @@ public class Response { */ public static final byte SERVER_TIMEOUT = 31; + /** + * channel inactive, directly return the unfinished requests. + */ + public static final byte CHANNEL_INACTIVE = 35; + /** * request format error. */ diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java index ae9c95bd7e5..5fa6cfe9a47 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java @@ -48,9 +48,9 @@ public class DefaultFuture implements ResponseFuture { private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class); - private static final Map CHANNELS = new ConcurrentHashMap(); + private static final Map CHANNELS = new ConcurrentHashMap<>(); - private static final Map FUTURES = new ConcurrentHashMap(); + private static final Map FUTURES = new ConcurrentHashMap<>(); public static final Timer TIME_OUT_TIMER = new HashedWheelTimer( new NamedThreadFactory("dubbo-future-timeout", true), @@ -87,10 +87,21 @@ private static void timeoutCheck(DefaultFuture future) { TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS); } + /** + * init a DefaultFuture + * 1.init a DefaultFuture + * 2.timeout check + * + * @param channel channel + * @param request the request + * @param timeout timeout + * @return a new DefaultFuture + */ public static DefaultFuture newFuture(Channel channel, Request request, int timeout) { - final DefaultFuture defaultFuture = new DefaultFuture(channel, request, timeout); - timeoutCheck(defaultFuture); - return defaultFuture; + final DefaultFuture future = new DefaultFuture(channel, request, timeout); + // timeout check + timeoutCheck(future); + return future; } public static DefaultFuture getFuture(long id) { @@ -108,6 +119,29 @@ public static void sent(Channel channel, Request request) { } } + /** + * close a channel when a channel is inactive + * directly return the unfinished requests. + * + * @param channel channel to close + */ + public static void closeChannel(Channel channel) { + for (long id : CHANNELS.keySet()) { + if (channel.equals(CHANNELS.get(id))) { + DefaultFuture future = getFuture(id); + if (future != null && !future.isDone()) { + Response disconnectResponse = new Response(future.getId()); + disconnectResponse.setStatus(Response.CHANNEL_INACTIVE); + disconnectResponse.setErrorMessage("Channel " + + channel + + " is inactive. Directly return the unFinished request : " + + future.getRequest()); + DefaultFuture.received(channel, disconnectResponse); + } + } + } + } + public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); @@ -212,6 +246,7 @@ public void run(Timeout timeout) { timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // handle response. DefaultFuture.received(future.getChannel(), timeoutResponse); + } } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java index 5532620d7a2..1be8827a8db 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java @@ -146,6 +146,7 @@ public void disconnected(Channel channel) throws RemotingException { try { handler.disconnected(exchangeChannel); } finally { + DefaultFuture.closeChannel(channel); HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java index 58429d0000f..bacc2646bfd 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java @@ -43,5 +43,4 @@ public void send(Object message, boolean sent) throws RemotingException { public String toString() { return getLocalAddress() + " -> " + getRemoteAddress(); } - } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvilableTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvilableTest.java index 892a45d7027..ced3d067751 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvilableTest.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvilableTest.java @@ -131,7 +131,6 @@ public void test_Lazy_ChannelReadOnly() throws Exception { } //invoke method --> init client - IDemoService service = (IDemoService) proxy.getProxy(invoker); Assert.assertEquals("ok", service.get()); diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/MultiThreadTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/MultiThreadTest.java index 4e573b8fd69..ce43473025b 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/MultiThreadTest.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/MultiThreadTest.java @@ -23,30 +23,31 @@ import org.apache.dubbo.rpc.ProxyFactory; import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService; import org.apache.dubbo.rpc.protocol.dubbo.support.DemoServiceImpl; - -import junit.framework.TestCase; +import org.junit.Assert; +import org.junit.Test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -public class MultiThreadTest extends TestCase { +public class MultiThreadTest { private Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); private ProxyFactory proxy = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); + @Test public void testDubboMultiThreadInvoke() throws Exception { Exporter rpcExporter = protocol.export(proxy.getInvoker(new DemoServiceImpl(), DemoService.class, URL.valueOf("dubbo://127.0.0.1:20259/TestService"))); final AtomicInteger counter = new AtomicInteger(); final DemoService service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("dubbo://127.0.0.1:20259/TestService"))); - assertEquals(service.getSize(new String[]{"123", "456", "789"}), 3); + Assert.assertEquals(service.getSize(new String[]{"123", "456", "789"}), 3); final StringBuffer sb = new StringBuffer(); for (int i = 0; i < 1024 * 64 + 32; i++) sb.append('A'); - assertEquals(sb.toString(), service.echo(sb.toString())); + Assert.assertEquals(sb.toString(), service.echo(sb.toString())); ExecutorService exec = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { @@ -55,7 +56,7 @@ public void testDubboMultiThreadInvoke() throws Exception { public void run() { for (int i = 0; i < 30; i++) { System.out.println(fi + ":" + counter.getAndIncrement()); - assertEquals(service.echo(sb.toString()), sb.toString()); + Assert.assertEquals(service.echo(sb.toString()), sb.toString()); } } }); diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java index 2a315073452..ea7743ad524 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java @@ -123,6 +123,14 @@ public void test_counter_error() { Assert.assertEquals("should not warning message", 0, LogUtil.findMessage(errorMsg)); // counter is incorrect, invocation still succeeds client.close(); + + // wait close done. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Assert.fail(); + } + Assert.assertEquals("hello", helloService.hello()); Assert.assertEquals("should warning message", 1, LogUtil.findMessage(errorMsg)); diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/RpcFilterTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/RpcFilterTest.java index c1ac717140a..f9cca922c11 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/RpcFilterTest.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/RpcFilterTest.java @@ -24,23 +24,25 @@ import org.apache.dubbo.rpc.protocol.dubbo.support.DemoServiceImpl; import org.apache.dubbo.rpc.service.EchoService; -import junit.framework.TestCase; +import org.junit.Assert; +import org.junit.Test; -public class RpcFilterTest extends TestCase { +public class RpcFilterTest { private Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); private ProxyFactory proxy = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); + @Test public void testRpcFilter() throws Exception { DemoService service = new DemoServiceImpl(); URL url = URL.valueOf("dubbo://127.0.0.1:9010/org.apache.dubbo.rpc.DemoService?service.filter=echo"); protocol.export(proxy.getInvoker(service, DemoService.class, url)); service = proxy.getProxy(protocol.refer(DemoService.class, url)); - assertEquals("123", service.echo("123")); + Assert.assertEquals("123", service.echo("123")); // cast to EchoService EchoService echo = proxy.getProxy(protocol.refer(EchoService.class, url)); - assertEquals(echo.$echo("test"), "test"); - assertEquals(echo.$echo("abcdefg"), "abcdefg"); - assertEquals(echo.$echo(1234), 1234); + Assert.assertEquals(echo.$echo("test"), "test"); + Assert.assertEquals(echo.$echo("abcdefg"), "abcdefg"); + Assert.assertEquals(echo.$echo(1234), 1234); } } \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java b/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java index 0a372728c11..a21e3786bd6 100644 --- a/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java +++ b/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java @@ -38,7 +38,6 @@ import org.apache.thrift.transport.TIOStreamTransport; import org.apache.thrift.transport.TTransport; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import java.io.ByteArrayInputStream;