Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direct return when the server goes down unnormally. #2185

Merged
merged 34 commits into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f2c59b9
Keep the unfinished request in every channel.
carryxyh Aug 4, 2018
29c4f4e
default method
carryxyh Aug 4, 2018
348b8e4
direct return unfinished requests when remote inactive or disconnect
carryxyh Aug 4, 2018
60f49ca
direct return unfinished requests when remote inactive or disconnect
carryxyh Aug 4, 2018
51a90ae
name and doc
carryxyh Aug 4, 2018
dd0f8c5
name and doc
carryxyh Aug 4, 2018
11be40d
fix unit test
carryxyh Aug 4, 2018
076f648
fix unit test
carryxyh Aug 4, 2018
35d3065
fix unit test
carryxyh Aug 6, 2018
9fb70d3
close heartbeat to fix unit test
carryxyh Aug 6, 2018
c049d26
sth test
carryxyh Aug 6, 2018
467138c
sth test
carryxyh Aug 6, 2018
b01dfa8
sth test
carryxyh Aug 6, 2018
b32df7e
sth test
carryxyh Aug 7, 2018
fd04bb4
close when client close
carryxyh Aug 7, 2018
e39034c
fix unit test
carryxyh Aug 7, 2018
c745802
fix unit test
carryxyh Aug 7, 2018
6bbc849
fix unit test
carryxyh Aug 7, 2018
76b909f
ignore remote invoke unit test
carryxyh Aug 7, 2018
f3f2fca
don't clear request when channelInactive
carryxyh Aug 7, 2018
2429402
replace import *
carryxyh Aug 7, 2018
35785b7
remove unuse import
carryxyh Aug 7, 2018
ee8723a
optimize
carryxyh Aug 7, 2018
36926b8
optimize
carryxyh Aug 7, 2018
89070b3
fix ci failed
carryxyh Aug 7, 2018
41a9733
fix ci failed
carryxyh Aug 7, 2018
aa02691
fix ci failed
carryxyh Aug 7, 2018
45a81a3
fix ci failed
carryxyh Aug 7, 2018
482be3d
fix ci failed
carryxyh Aug 7, 2018
b41eb41
fix ci failed
carryxyh Aug 7, 2018
d800e8d
fix ci failed
carryxyh Aug 7, 2018
b423621
optimize unfinished request keeper
carryxyh Aug 18, 2018
b8283e3
rebase onto master
carryxyh Aug 21, 2018
1079239
fix review code
carryxyh Aug 23, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
/**
* Channel. (API/SPI, Prototype, ThreadSafe)
*
*
*
* @see org.apache.dubbo.remoting.Client
* @see org.apache.dubbo.remoting.Server#getChannels()
* @see org.apache.dubbo.remoting.Server#getChannel(InetSocketAddress)
Expand Down Expand Up @@ -73,5 +71,4 @@ public interface Channel extends Endpoint {
* @param key key.
*/
void removeAttribute(String key);

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,4 @@ public interface ExchangeChannel extends Channel {
*/
@Override
void close(int timeout);

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public class Response {
*/
public static final byte SERVER_TIMEOUT = 31;

/**
* channel inactive, directly return the unfinished requests.
*/
public static final byte CHANNEL_INACTIVE = 35;

/**
* request format error.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public class DefaultFuture implements ResponseFuture {

private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class);

private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();

private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();

public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-future-timeout", true),
Expand Down Expand Up @@ -87,10 +87,21 @@ private static void timeoutCheck(DefaultFuture future) {
TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}

/**
* init a DefaultFuture
* 1.init a DefaultFuture
* 2.timeout check
*
* @param channel channel
* @param request the request
* @param timeout timeout
* @return a new DefaultFuture
*/
public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
final DefaultFuture defaultFuture = new DefaultFuture(channel, request, timeout);
timeoutCheck(defaultFuture);
return defaultFuture;
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
// timeout check
timeoutCheck(future);
return future;
}

public static DefaultFuture getFuture(long id) {
Expand All @@ -108,6 +119,29 @@ public static void sent(Channel channel, Request request) {
}
}

/**
* close a channel when a channel is inactive
* directly return the unfinished requests.
*
* @param channel channel to close
*/
public static void closeChannel(Channel channel) {
for (long id : CHANNELS.keySet()) {
if (channel.equals(CHANNELS.get(id))) {
DefaultFuture future = getFuture(id);
if (future != null && !future.isDone()) {
Response disconnectResponse = new Response(future.getId());
disconnectResponse.setStatus(Response.CHANNEL_INACTIVE);
disconnectResponse.setErrorMessage("Channel " +
channel +
" is inactive. Directly return the unFinished request : " +
future.getRequest());
DefaultFuture.received(channel, disconnectResponse);
}
}
}
}

public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
Expand Down Expand Up @@ -212,6 +246,7 @@ public void run(Timeout timeout) {
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse);

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public void disconnected(Channel channel) throws RemotingException {
try {
handler.disconnected(exchangeChannel);
} finally {
DefaultFuture.closeChannel(channel);
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,4 @@ public void send(Object message, boolean sent) throws RemotingException {
public String toString() {
return getLocalAddress() + " -> " + getRemoteAddress();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public void test_Lazy_ChannelReadOnly() throws Exception {

}
//invoke method --> init client

IDemoService service = (IDemoService) proxy.getProxy(invoker);
Assert.assertEquals("ok", service.get());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,31 @@
import org.apache.dubbo.rpc.ProxyFactory;
import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
import org.apache.dubbo.rpc.protocol.dubbo.support.DemoServiceImpl;

import junit.framework.TestCase;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class MultiThreadTest extends TestCase {
public class MultiThreadTest {

private Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private ProxyFactory proxy = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

@Test
public void testDubboMultiThreadInvoke() throws Exception {
Exporter<?> rpcExporter = protocol.export(proxy.getInvoker(new DemoServiceImpl(), DemoService.class, URL.valueOf("dubbo://127.0.0.1:20259/TestService")));

final AtomicInteger counter = new AtomicInteger();
final DemoService service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("dubbo://127.0.0.1:20259/TestService")));
assertEquals(service.getSize(new String[]{"123", "456", "789"}), 3);
Assert.assertEquals(service.getSize(new String[]{"123", "456", "789"}), 3);

final StringBuffer sb = new StringBuffer();
for (int i = 0; i < 1024 * 64 + 32; i++)
sb.append('A');
assertEquals(sb.toString(), service.echo(sb.toString()));
Assert.assertEquals(sb.toString(), service.echo(sb.toString()));

ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
Expand All @@ -55,7 +56,7 @@ public void testDubboMultiThreadInvoke() throws Exception {
public void run() {
for (int i = 0; i < 30; i++) {
System.out.println(fi + ":" + counter.getAndIncrement());
assertEquals(service.echo(sb.toString()), sb.toString());
Assert.assertEquals(service.echo(sb.toString()), sb.toString());
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ public void test_counter_error() {
Assert.assertEquals("should not warning message", 0, LogUtil.findMessage(errorMsg));
// counter is incorrect, invocation still succeeds
client.close();

// wait close done.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Assert.fail();
}

Assert.assertEquals("hello", helloService.hello());
Assert.assertEquals("should warning message", 1, LogUtil.findMessage(errorMsg));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,25 @@
import org.apache.dubbo.rpc.protocol.dubbo.support.DemoServiceImpl;
import org.apache.dubbo.rpc.service.EchoService;

import junit.framework.TestCase;
import org.junit.Assert;
import org.junit.Test;

public class RpcFilterTest extends TestCase {
public class RpcFilterTest {
private Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private ProxyFactory proxy = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

@Test
public void testRpcFilter() throws Exception {
DemoService service = new DemoServiceImpl();
URL url = URL.valueOf("dubbo://127.0.0.1:9010/org.apache.dubbo.rpc.DemoService?service.filter=echo");
protocol.export(proxy.getInvoker(service, DemoService.class, url));
service = proxy.getProxy(protocol.refer(DemoService.class, url));
assertEquals("123", service.echo("123"));
Assert.assertEquals("123", service.echo("123"));
// cast to EchoService
EchoService echo = proxy.getProxy(protocol.refer(EchoService.class, url));
assertEquals(echo.$echo("test"), "test");
assertEquals(echo.$echo("abcdefg"), "abcdefg");
assertEquals(echo.$echo(1234), 1234);
Assert.assertEquals(echo.$echo("test"), "test");
Assert.assertEquals(echo.$echo("abcdefg"), "abcdefg");
Assert.assertEquals(echo.$echo(1234), 1234);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TTransport;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import java.io.ByteArrayInputStream;
Expand Down