Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple shared links #2457

Merged
merged 49 commits into from
Feb 3, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
d7092c8
Merge pull request #1 from alibaba/master
Mar 9, 2018
dc602c8
Merge branch 'master' of github.com:manzhizhen/dubbo
Jul 12, 2018
444d70d
Merge pull request #2 from apache/master
Sep 4, 2018
99c6e63
make dubbo support multiple shared links, upgrading RPC throughput
Sep 5, 2018
32197c5
Fix compilation error
Sep 6, 2018
5ecdbe7
Fix compilation error
Sep 6, 2018
71c1ac2
opti import
Sep 6, 2018
1467d21
if add {}
Sep 6, 2018
52a3ac7
checkstyle fail
Sep 6, 2018
1cb48fc
fix getSharedClient referenceCount calculation error bug
Sep 7, 2018
d6ddcd5
优化 import
Sep 7, 2018
f376c1e
Merge pull request #3 from apache/master
Sep 26, 2018
06e29bc
Merge pull request #4 from apache/master
Oct 3, 2018
63962a1
Merge pull request #5 from apache/master
Nov 7, 2018
52d69f3
Merge branch 'master' into mult_shared_links
Nov 7, 2018
b3fdee6
Fix the problem that the getSharedClient thread is not safe
Nov 10, 2018
e75855c
Fix the problem that the getSharedClient thread is not safe
Nov 10, 2018
268f8c4
Merge pull request #6 from apache/master
Nov 10, 2018
39a433c
Merge branch 'master' into mult_shared_links
Nov 10, 2018
560dfb9
Try fixing ci error, https://travis-ci.org/apache/incubator-dubbo/job…
Nov 10, 2018
cf47e78
将DEFAULT_CONNECTIONS_KEY修改成SERVICE_CONNECTIONS_KEY
Nov 11, 2018
13fbc63
Merge pull request #7 from apache/master
Nov 15, 2018
f511240
Merge pull request #8 from apache/master
Nov 20, 2018
7b907c9
Merge pull request #9 from apache/master
Dec 20, 2018
a9e7d87
Merge pull request #10 from apache/master
Dec 22, 2018
768b7b5
Merge branch 'master' into mult_shared_links
Dec 22, 2018
593f4a3
Merge pull request #11 from apache/master
Jan 5, 2019
c75808c
合并master的代码
Jan 5, 2019
d739ded
Merge pull request #12 from apache/master
Jan 12, 2019
9dc2f65
Merge branch 'master' into mult_shared_links
Jan 12, 2019
52efa43
dubbo.xsd add shareconnections attribute,
Jan 13, 2019
33039fc
Optimize code format
Jan 13, 2019
4a5943e
Merge pull request #13 from apache/master
Jan 19, 2019
5aea875
resolve conflict
Jan 19, 2019
9c19fd4
Merge pull request #14 from apache/master
Jan 23, 2019
4cb08bc
merge master,fix conflicts
Jan 23, 2019
8f4efc0
Fix mult connect ghost connect problem
Jan 28, 2019
42fd2c7
format code
Jan 28, 2019
de4c9be
Remove the concept of ghostClientMap and ghost connection. In fact, g…
Jan 31, 2019
209f966
Optimize the ReferenceCountExchangeClient and remove the reference to…
Feb 1, 2019
4d00308
Merge pull request #15 from apache/master
Feb 1, 2019
30557d3
fix conflicts
Feb 1, 2019
9dc5c1b
format code
Feb 1, 2019
dc2bad3
try remove close lock
Feb 2, 2019
53aa630
Restore close method
Feb 2, 2019
5042de1
Merge pull request #16 from apache/master
Feb 2, 2019
6b68053
Merge branch 'master' into mult_shared_links
Feb 2, 2019
35b9e31
Restore ReferenceCountExchangeClient reference to LazyConnectExchange…
Feb 2, 2019
51ba855
Optimize the logic of using the LazyConnectExchangeClient inside the …
Feb 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ public class Constants {

public static final int DEFAULT_ALIVE = 60 * 1000;

public static final int DEFAULT_CONNECTIONS = 0;
public static final String DEFAULT_CONNECTIONS = "1";
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved

public static final String DEFAULT_CONNECTIONS_KEY = "default.connections.key";
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved

public static final int DEFAULT_ACCEPTS = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ protected Result doInvoke(final Invocation invocation) throws Throwable {

@Override
public boolean isAvailable() {
if (!super.isAvailable())
if (!super.isAvailable()) {
return false;
}
for (ExchangeClient client : clients) {
if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)) {
//cannot write == not Available ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.serialize.support.SerializableClassRegistry;
import org.apache.dubbo.common.serialize.support.SerializationOptimizer;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.NetUtils;
Expand Down Expand Up @@ -50,6 +51,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -67,9 +69,8 @@ public class DubboProtocol extends AbstractProtocol {
private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke";
private static DubboProtocol INSTANCE;
private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // <host:port,Exchanger>
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>
private final Map<String, List<ReferenceCountExchangeClient>> referenceClientMap = new ConcurrentHashMap<>(); // <host:port,Exchanger>
private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap = new ConcurrentHashMap<String, LazyConnectExchangeClient>();
private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
private final Set<String> optimizers = new ConcurrentHashSet<String>();
//consumer side export a stub service for dispatching event
//servicekey-stubmethods
Expand Down Expand Up @@ -225,8 +226,9 @@ Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException

DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

if (exporter == null)
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
}

return exporter.getInvoker();
}
Expand Down Expand Up @@ -297,8 +299,9 @@ private ExchangeServer createServer(URL url) {
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}

url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
Expand Down Expand Up @@ -364,16 +367,18 @@ private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
service_share_connect = true;
connections = 1;
connections = Integer.parseInt(ConfigUtils.getProperty(Constants.DEFAULT_CONNECTIONS_KEY,
Constants.DEFAULT_CONNECTIONS));
shareClients = getSharedClient(url, connections);
}

ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url);
clients[i] = shareClients.get(i);
} else {
clients[i] = initClient(url);
}
Expand All @@ -383,32 +388,57 @@ private ExchangeClient[] getClients(URL url) {

/**
* Get shared connection
*
* @param url
* @param connectNum
*/
private ExchangeClient getSharedClient(URL url) {
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
String key = url.getAddress();
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
if (!client.isClosed()) {
List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
boolean firstBuild = false;

if (clients == null) {
List<ReferenceCountExchangeClient> referenceCountExchangeClients = buildReferenceCountExchangeClientList(url, key, connectNum);
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
referenceClientMap.put(key, referenceCountExchangeClients);

clients = referenceCountExchangeClients;

firstBuild = true;
}

for (int i = 0; i < clients.size(); i++) {
ReferenceCountExchangeClient client = clients.get(i);
if (client.isClosed()) {
client = buildReferenceCountExchangeClient(url, key);
clients.set(i, client);

} else if (!firstBuild) {
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}

locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
if (referenceClientMap.containsKey(key)) {
return referenceClientMap.get(key);
}
return clients;
}

private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, String key, int connectNum) {

ExchangeClient exchangeClient = initClient(url);
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
locks.remove(key);
return client;
List<ReferenceCountExchangeClient> clients = new ArrayList<ReferenceCountExchangeClient>(connectNum);

for (int i = 0; i < connectNum; i++) {
clients.add(buildReferenceCountExchangeClient(url, key));
}

return clients;
}

private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url, String key) {

ExchangeClient exchangeClient = initClient(url);

ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ghostClientMap is a mapping between URL and client, but referenceClientMap is a mapping between URL and a list of clients.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delayed connections are used to reduce the number of long connections. Create a long connection when a call is initiated. <dubbo:protocol name="dubbo" lazy="true" />Note: This configuration only works for dubbo protocols that use long connections.
Later I read the code about the delayed connection and found that LazyConnectExchangeClient has two purposes:

  1. Create a connection like the delay mentioned earlier, minimizing unnecessary long connection overhead.
  2. A kind of protection when closing the connection resource. When the ReferenceCountExchangeClient is closed (multiple URLs refer to the same ReferenceCountExchangeClient, the last URL that will reference the counter to 0 can actually close the ReferenceCountExchangeClient) will be replaced with LazyConnectExchangeClient. When used again, the warn log will be printed. The put operation of ghostClientMap is only available here.

So, no matter how many ExchangeClients a URL corresponds to, it only needs a ghost connection in the ghostClientMap, which is a LazyConnectExchangeClient.

Also, only the shared ExchangeClient will use the ReferenceCountExchangeClient, so the default client used to share a TCP connection is the ReferenceCountExchangeClient, and the separate TCP connection set by connections does not need or use the ReferenceCountExchangeClient.

延迟连接用于减少长连接数。当有调用发起时,再创建长连接。<dubbo:protocol name="dubbo" lazy="true" />注意:该配置只对使用长连接的 dubbo 协议生效。
后面我有仔细阅读了关于延迟连接的代码,发现LazyConnectExchangeClient有两个用途:

  1. 像前面提到的延迟创建连接,尽可能的减少不必要的长连接开销。
  2. 关闭连接资源时的一种保护,当ReferenceCountExchangeClient被关闭时(多个URL引用同一个ReferenceCountExchangeClient时,最后那个将引用计数器变为0的URL可以真正关闭该ReferenceCountExchangeClient),会被替换成LazyConnectExchangeClient,一旦再被使用,会打印warn日志。而ghostClientMap的put操作,只有在这里才有。

所以,不管一个URL对应多少个ExchangeClient,它在ghostClientMap中最多只需要一个幽灵连接,即一个LazyConnectExchangeClient。

并且,只有共享的ExchangeClient才会用到ReferenceCountExchangeClient,所以默认的共享1条TCP连接所使用的client就是ReferenceCountExchangeClient,而通过connections设置的单独TCP连接不需要也没有使用ReferenceCountExchangeClient。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the mistake, please correct me, thank you

ghostClientMap.remove(key);

return client;
}

/**
Expand Down Expand Up @@ -460,15 +490,20 @@ public void destroy() {
}

for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
ExchangeClient client = referenceClientMap.remove(key);
if (client != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
List<ReferenceCountExchangeClient> clients = referenceClientMap.remove(key);

if (CollectionUtils.isNotEmpty(clients)) {
for (ReferenceCountExchangeClient client : clients) {
if (client != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
}
client.close();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
client.close(ConfigUtils.getServerShutdownTimeout());
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
Expand Down