> invokers, URL url, Invocation
}
@Override
- public int getPriority() {
- return priority;
+ public boolean isRuntime() {
+ return this.url.getParameter(Constants.RUNTIME_KEY, false);
}
@Override
- public int compareTo(Router o) {
- if (o == null) {
- throw new IllegalArgumentException();
- }
- if (this.priority == o.getPriority()) {
- if (o instanceof ScriptRouter) {
- ScriptRouter c = (ScriptRouter) o;
- return rule.compareTo(c.rule);
- }
- return 0;
- } else {
- return this.priority > o.getPriority() ? 1 : -1;
- }
+ public boolean isForce() {
+ return url.getParameter(Constants.FORCE_KEY, false);
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return url.getParameter(Constants.ENABLED_KEY, false);
}
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/script/ScriptRouterFactory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/script/ScriptRouterFactory.java
index 56976ed731a..b29f25e23dd 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/script/ScriptRouterFactory.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/script/ScriptRouterFactory.java
@@ -25,7 +25,7 @@
*
* Example URLS used by Script Router Factory:
*
- * - script://registyAddress?type=js&rule=xxxx
+ *
- script://registryAddress?type=js&rule=xxxx
*
- script:///path/to/routerfile.js?type=js&rule=xxxx
*
- script://D:\path\to\routerfile.js?type=js&rule=xxxx
*
- script://C:/path/to/routerfile.js?type=js&rule=xxxx
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java
index c7526ce9c5c..8843b3327c8 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java
@@ -20,36 +20,65 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.configcenter.ConfigChangeEvent;
+import org.apache.dubbo.configcenter.ConfigChangeType;
+import org.apache.dubbo.configcenter.ConfigurationListener;
+import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Router;
+import org.apache.dubbo.rpc.cluster.RouterChain;
+import org.apache.dubbo.rpc.cluster.router.AbstractRouter;
+import org.apache.dubbo.rpc.cluster.router.tag.model.TagRouterRule;
+import org.apache.dubbo.rpc.cluster.router.tag.model.TagRuleParser;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
/**
- * TagRouter
+ *
*/
-public class TagRouter implements Router {
-
+public class TagRouter extends AbstractRouter implements Comparable, ConfigurationListener {
+ public static final String NAME = "TAG_ROUTER";
+ private static final int DEFAULT_PRIORITY = 100;
private static final Logger logger = LoggerFactory.getLogger(TagRouter.class);
+ private static final String TAGROUTERRULES_DATAID = ".tagrouters"; // acts
+ private TagRouterRule tagRouterRule;
+ private String application;
- private final int priority;
- private final URL url;
+ private boolean inited = false;
- public static final URL ROUTER_URL = new URL("tag", Constants.ANYHOST_VALUE, 0, Constants.ANY_VALUE).addParameters(Constants.RUNTIME_KEY, "true");
+ public TagRouter(DynamicConfiguration configuration, URL url) {
+ super(configuration, url);
+ }
- public TagRouter(URL url) {
- this.url = url;
- this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
+ protected TagRouter() {
}
- public TagRouter() {
- this.url = ROUTER_URL;
- this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
+ @Override
+ public synchronized void process(ConfigChangeEvent event) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Notification of tag rule, change type is: " + event.getChangeType() + ", raw rule is:\n " + event
+ .getValue());
+ }
+
+ try {
+ if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
+ this.tagRouterRule = null;
+ } else {
+ this.tagRouterRule = TagRuleParser.parse(event.getValue());
+ }
+
+ routerChains.forEach(RouterChain::notifyRuleChanged);
+
+ } catch (Exception e) {
+ logger.error("Failed to parse the raw tag router rule and it will not take effect, please check if the rule matches with the template, the raw rule is:\n ", e);
+ }
}
@Override
@@ -57,40 +86,152 @@ public URL getUrl() {
return url;
}
+ /**
+ *
+ * @param invokers
+ * @param url
+ * @param invocation
+ * @param
+ * @return
+ * @throws RpcException
+ */
@Override
public List> route(List> invokers, URL url, Invocation invocation) throws RpcException {
- // filter
- List> result = new ArrayList<>();
- try {
- // Dynamic param
- String tag = RpcContext.getContext().getAttachment(Constants.REQUEST_TAG_KEY);
- // Tag request
- if (!StringUtils.isEmpty(tag)) {
- // Select tag invokers first
- for (Invoker invoker : invokers) {
- if (tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
- result.add(invoker);
- }
+ if (CollectionUtils.isEmpty(invokers)) {
+ return invokers;
+ }
+
+ if (tagRouterRule == null || !tagRouterRule.isValid() || !tagRouterRule.isEnabled()) {
+ // the invokers must have been preRouted by static tag configuration, so this invoker list is just what we want.
+ return invokers;
+ }
+
+ List> result = invokers;
+ String tag = StringUtils.isEmpty(invocation.getAttachment(Constants.TAG_KEY)) ? url.getParameter(Constants.TAG_KEY) : invocation.getAttachment(Constants.TAG_KEY);
+ // if we are requesting for a Provider with a specific tag
+ if (StringUtils.isNotEmpty(tag)) {
+ List addresses = tagRouterRule.getTagnameToAddresses().get(tag);
+ // filter by dynamic tag group first
+ if (CollectionUtils.isNotEmpty(addresses)) {
+ result = filterInvoker(invokers, invoker -> addressMatches(invoker.getUrl(), addresses));
+ // if result is not null OR it's null but force=true, return result directly
+ if (CollectionUtils.isNotEmpty(result) || tagRouterRule.isForce()) {
+ return result;
}
+ } else {
+ // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by dynamic tag group but force=false.
+ // check static tag
+ result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl()
+ .getParameter(Constants.TAG_KEY)));
+ }
+ // If there's no tagged providers that can match the current tagged request. force.tag is set by default to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
+ if (CollectionUtils.isNotEmpty(result) || Boolean.valueOf(invocation.getAttachment(Constants.FORCE_USE_TAG, url.getParameter(Constants.FORCE_USE_TAG, "false")))) {
+ return result;
}
- // If Constants.REQUEST_TAG_KEY unspecified or no invoker be selected, downgrade to normal invokers
- if (result.isEmpty()) {
- for (Invoker invoker : invokers) {
- if (StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
- result.add(invoker);
- }
+ // FAILOVER: return all Providers without any tags.
+ else {
+ List> tmp = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), tagRouterRule
+ .getAddresses()));
+ return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl()
+ .getParameter(Constants.TAG_KEY)));
+ }
+ } else {
+ // List addresses = tagRouterRule.filter(providerApp);
+ // return all addresses in dynamic tag group.
+ List addresses = tagRouterRule.getAddresses();
+ if (CollectionUtils.isNotEmpty(addresses)) {
+ result = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), addresses));
+ // 1. all addresses are in dynamic tag group, return empty list.
+ if (CollectionUtils.isEmpty(result)) {
+ return result;
}
+ // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the static tag group.
}
- return result;
- } catch (Exception e) {
- logger.error("Route by tag error,return all invokers.", e);
+ return filterInvoker(result, invoker -> {
+ String localTag = invoker.getUrl().getParameter(Constants.TAG_KEY);
+ if (StringUtils.isEmpty(localTag) || !tagRouterRule.getTagNames().contains(localTag)) {
+ return true;
+ }
+ return false;
+ });
+ }
+ }
+
+ /**
+ * This method is reserved for building router cache.
+ * Currently, we rely on this method to do the init task since it will get triggered before route() really happens.
+ *
+ * @param invokers
+ * @param url
+ * @param invocation
+ * @param
+ * @return
+ * @throws RpcException
+ */
+ @Override
+ public Map>> preRoute(List> invokers, URL url, Invocation invocation) throws RpcException {
+ if (CollectionUtils.isNotEmpty(invokers)) {
+ checkAndInit(invokers.get(0).getUrl());
}
- // Downgrade to all invokers
- return invokers;
+ return super.preRoute(invokers, url, invocation);
}
@Override
public int getPriority() {
- return priority;
+ return DEFAULT_PRIORITY;
+ }
+
+ @Override
+ public boolean isRuntime() {
+ return tagRouterRule != null && tagRouterRule.isRuntime();
+// return false;
+ }
+
+ @Override
+ public boolean isForce() {
+ // FIXME
+ return tagRouterRule != null && tagRouterRule.isForce();
+ }
+
+ private List> filterInvoker(List> invokers, Predicate> predicate) {
+ return invokers.stream()
+ .filter(predicate)
+ .collect(Collectors.toList());
+ }
+
+ private boolean addressMatches(URL url, List addresses) {
+ return addresses != null && addresses.contains(url.getAddress());
+ }
+
+ private boolean addressNotMatches(URL url, List addresses) {
+ return addresses == null || !addresses.contains(url.getAddress());
+ }
+
+ public void setApplication(String app) {
+ this.application = app;
+ }
+
+ private synchronized void checkAndInit(URL url) {
+ String providerApplication = url.getParameter(Constants.REMOTE_APPLICATION_KEY);
+ if (StringUtils.isEmpty(application) || !application.equals(providerApplication)) {
+ setApplication(providerApplication);
+ inited = false;
+ }
+
+ if (StringUtils.isEmpty(application)) {
+ logger.error("TagRouter must getConfig from or subscribe to a specific application, but the application in this TagRouter is not specified.");
+ return;
+ }
+
+ if (!inited) {
+ inited = true;
+ String key = application + TAGROUTERRULES_DATAID;
+ configuration.addListener(key, this);
+ String rawRule = configuration.getConfig(key);
+ if (rawRule != null) {
+ this.process(new ConfigChangeEvent(key, rawRule));
+ }
+ }
}
+
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouterFactory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouterFactory.java
index f827b08d51f..a05275806fb 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouterFactory.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouterFactory.java
@@ -17,15 +17,21 @@
package org.apache.dubbo.rpc.cluster.router.tag;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.configcenter.DynamicConfiguration;
+import org.apache.dubbo.rpc.cluster.AbstractRouterFactory;
import org.apache.dubbo.rpc.cluster.Router;
-import org.apache.dubbo.rpc.cluster.RouterFactory;
-public class TagRouterFactory implements RouterFactory {
+/**
+ *
+ */
+@Activate(order = 100)
+public class TagRouterFactory extends AbstractRouterFactory {
public static final String NAME = "tag";
@Override
- public Router getRouter(URL url) {
- return new TagRouter(url);
+ protected Router createRouter(URL url) {
+ return new TagRouter(DynamicConfiguration.getDynamicConfiguration(), url);
}
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/model/Tag.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/model/Tag.java
new file mode 100644
index 00000000000..28628a5d63a
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/model/Tag.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.tag.model;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class Tag {
+ private String name;
+ private List addresses;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public List getAddresses() {
+ return addresses;
+ }
+
+ public void setAddresses(List addresses) {
+ this.addresses = addresses;
+ }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/model/TagRouterRule.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/model/TagRouterRule.java
new file mode 100644
index 00000000000..827518bda8e
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/model/TagRouterRule.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.tag.model;
+
+import org.apache.dubbo.rpc.cluster.router.AbstractRouterRule;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * %YAML1.2
+ * ---
+ * force: true
+ * runtime: false
+ * enabled: true
+ * priority: 1
+ * key: demo-provider
+ * tags:
+ * - name: tag1
+ * addresses: [ip1, ip2]
+ * - name: tag2
+ * addresses: [ip3, ip4]
+ * ...
+ */
+public class TagRouterRule extends AbstractRouterRule {
+ private List tags;
+
+ private Map> addressToTagnames = new HashMap<>();
+ private Map> tagnameToAddresses = new HashMap<>();
+
+ public void init() {
+ if (!isValid()) {
+ return;
+ }
+
+ tags.forEach(tag -> {
+ tagnameToAddresses.put(tag.getName(), tag.getAddresses());
+ tag.getAddresses().forEach(addr -> {
+ List tagNames = addressToTagnames.computeIfAbsent(addr, k -> new ArrayList<>());
+ tagNames.add(tag.getName());
+ });
+ });
+ }
+
+ public List getAddresses() {
+ return tags.stream().flatMap(tag -> tag.getAddresses().stream()).collect(Collectors.toList());
+ }
+
+ public List getTagNames() {
+ return tags.stream().map(Tag::getName).collect(Collectors.toList());
+ }
+
+ public Map> getAddressToTagnames() {
+ return addressToTagnames;
+ }
+
+
+ public Map> getTagnameToAddresses() {
+ return tagnameToAddresses;
+ }
+
+ public List getTags() {
+ return tags;
+ }
+
+ public void setTags(List tags) {
+ this.tags = tags;
+ }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/model/TagRuleParser.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/model/TagRuleParser.java
new file mode 100644
index 00000000000..4f6669f1b25
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/model/TagRuleParser.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.router.tag.model;
+
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.yaml.snakeyaml.TypeDescription;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+
+/**
+ *
+ */
+public class TagRuleParser {
+
+ public static TagRouterRule parse(String rawRule) {
+ Constructor constructor = new Constructor(TagRouterRule.class);
+ TypeDescription tagDescription = new TypeDescription(TagRouterRule.class);
+ tagDescription.addPropertyParameters("tags", Tag.class);
+ constructor.addTypeDescription(tagDescription);
+
+ Yaml yaml = new Yaml(constructor);
+ TagRouterRule rule = yaml.load(rawRule);
+ rule.setRawRule(rawRule);
+ if (CollectionUtils.isEmpty(rule.getTags())) {
+ rule.setValid(false);
+ }
+
+ rule.init();
+ return rule;
+ }
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
index 2d3602aead4..63486aa5c5c 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
@@ -257,7 +257,7 @@ public String toString() {
protected void checkInvokers(List> invokers, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
- throw new RpcException("Failed to invoke the method "
+ throw new RpcException(RpcException.NO_INVOKER_AVAILABLE_AFTER_FILTER, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". No provider available for the service " + directory.getUrl().getServiceKey()
+ " from registry " + directory.getUrl().getAddress()
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java
index 2e02c92acda..da5e04d13d6 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java
@@ -36,7 +36,6 @@ public static URL mergeUrl(URL remoteUrl, Map localMap) {
Map map = new HashMap();
Map remoteMap = remoteUrl.getParameters();
-
if (remoteMap != null && remoteMap.size() > 0) {
map.putAll(remoteMap);
@@ -78,7 +77,12 @@ public static URL mergeUrl(URL remoteUrl, Map localMap) {
}
if (localMap != null && localMap.size() > 0) {
+ // All providers come to here have been filtered by group, which means only those providers that have the exact same group value with the consumer could come to here.
+ // So, generally, we don't need to care about the group value here.
+ // But when comes to group merger, there is an exception, the consumer group may be '*' while the provider group can be empty or any other values.
+ String remoteGroup = map.get(Constants.GROUP_KEY);
map.putAll(localMap);
+ map.put(Constants.GROUP_KEY, remoteGroup);
}
if (remoteMap != null && remoteMap.size() > 0) {
// Use version passed from provider side
@@ -90,10 +94,6 @@ public static URL mergeUrl(URL remoteUrl, Map localMap) {
if (version != null && version.length() > 0) {
map.put(Constants.VERSION_KEY, version);
}
- String group = remoteMap.get(Constants.GROUP_KEY);
- if (group != null && group.length() > 0) {
- map.put(Constants.GROUP_KEY, group);
- }
String methods = remoteMap.get(Constants.METHODS_KEY);
if (methods != null && methods.length() > 0) {
map.put(Constants.METHODS_KEY, methods);
@@ -103,6 +103,11 @@ public static URL mergeUrl(URL remoteUrl, Map localMap) {
if (remoteTimestamp != null && remoteTimestamp.length() > 0) {
map.put(Constants.REMOTE_TIMESTAMP_KEY, remoteMap.get(Constants.TIMESTAMP_KEY));
}
+
+ // TODO, for compatibility consideration, we cannot simply change the value behind APPLICATION_KEY from Consumer to Provider. So just add an extra key here.
+ // Reserve application name from provider.
+ map.put(Constants.REMOTE_APPLICATION_KEY, remoteMap.get(Constants.APPLICATION_KEY));
+
// Combine filters and listeners on Provider and Consumer
String remoteFilter = remoteMap.get(Constants.REFERENCE_FILTER_KEY);
String localFilter = localMap.get(Constants.REFERENCE_FILTER_KEY);
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java
index b8ae095af6b..dd68d7ef3dd 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java
@@ -16,26 +16,24 @@
*/
package org.apache.dubbo.rpc.cluster.support;
+import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.timer.HashedWheelTimer;
+import org.apache.dubbo.common.timer.Timeout;
+import org.apache.dubbo.common.timer.Timer;
+import org.apache.dubbo.common.timer.TimerTask;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcResult;
-import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@@ -43,78 +41,124 @@
* Especially useful for services of notification.
*
* Failback
- *
*/
public class FailbackClusterInvoker extends AbstractClusterInvoker {
private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);
- private static final long RETRY_FAILED_PERIOD = 5 * 1000;
+ private static final long RETRY_FAILED_PERIOD = 5;
- /**
- * Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
- * which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
- */
- private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
- new NamedInternalThreadFactory("failback-cluster-timer", true));
+ private final int retries;
- private final ConcurrentMap> failed = new ConcurrentHashMap<>();
- private volatile ScheduledFuture> retryFuture;
+ private final int failbackTasks;
+
+ private volatile Timer failTimer;
public FailbackClusterInvoker(Directory directory) {
super(directory);
+
+ int retriesConfig = getUrl().getParameter(Constants.RETRIES_KEY, Constants.DEFAULT_FAILBACK_TIMES);
+ if (retriesConfig <= 0) {
+ retriesConfig = Constants.DEFAULT_FAILBACK_TIMES;
+ }
+ int failbackTasksConfig = getUrl().getParameter(Constants.FAIL_BACK_TASKS_KEY, Constants.DEFAULT_FAILBACK_TASKS);
+ if (failbackTasksConfig <= 0) {
+ failbackTasksConfig = Constants.DEFAULT_FAILBACK_TASKS;
+ }
+ retries = retriesConfig;
+ failbackTasks = failbackTasksConfig;
}
- private void addFailed(Invocation invocation, AbstractClusterInvoker> invoker) {
- if (retryFuture == null) {
+ private void addFailed(LoadBalance loadbalance, Invocation invocation, List> invokers, Invoker lastInvoker) {
+ if (failTimer == null) {
synchronized (this) {
- if (retryFuture == null) {
- retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
-
- @Override
- public void run() {
- // collect retry statistics
- try {
- retryFailed();
- } catch (Throwable t) { // Defensive fault tolerance
- logger.error("Unexpected error occur at collect statistic", t);
- }
- }
- }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
+ if (failTimer == null) {
+ failTimer = new HashedWheelTimer(
+ new NamedThreadFactory("failback-cluster-timer", true),
+ 1,
+ TimeUnit.SECONDS, 32, failbackTasks);
}
}
}
- failed.put(invocation, invoker);
- }
-
- void retryFailed() {
- if (failed.isEmpty()) {
- return;
- }
- for (Map.Entry> entry : new HashMap<>(failed).entrySet()) {
- Invocation invocation = entry.getKey();
- Invoker> invoker = entry.getValue();
- try {
- invoker.invoke(invocation);
- failed.remove(invocation);
- } catch (Throwable e) {
- logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
- }
+ RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
+ try {
+ failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
+ } catch (Throwable e) {
+ logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
}
}
@Override
protected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
+ Invoker invoker = null;
try {
checkInvokers(invokers, invocation);
- Invoker invoker = select(loadbalance, invocation, invokers, null);
+ invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
- addFailed(invocation, this);
+ addFailed(loadbalance, invocation, invokers, invoker);
return new RpcResult(); // ignore
}
}
+ @Override
+ public void destroy() {
+ super.destroy();
+ if (failTimer != null) {
+ failTimer.stop();
+ }
+ }
+
+ /**
+ * RetryTimerTask
+ */
+ private class RetryTimerTask implements TimerTask {
+ private final Invocation invocation;
+ private final LoadBalance loadbalance;
+ private final List> invokers;
+ private final int retries;
+ private final long tick;
+ private Invoker lastInvoker;
+ private int retryTimes = 0;
+
+ RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List> invokers, Invoker lastInvoker, int retries, long tick) {
+ this.loadbalance = loadbalance;
+ this.invocation = invocation;
+ this.invokers = invokers;
+ this.retries = retries;
+ this.tick = tick;
+ this.lastInvoker=lastInvoker;
+ }
+
+ @Override
+ public void run(Timeout timeout) {
+ try {
+ Invoker retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
+ lastInvoker = retryInvoker;
+ retryInvoker.invoke(invocation);
+ } catch (Throwable e) {
+ logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
+ if ((++retryTimes) >= retries) {
+ logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
+ } else {
+ rePut(timeout);
+ }
+ }
+ }
+
+ private void rePut(Timeout timeout) {
+ if (timeout == null) {
+ return;
+ }
+
+ Timer timer = timeout.timer();
+ if (timer.isStop() || timeout.isCancelled()) {
+ return;
+ }
+
+ timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
+ }
+ }
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java
index 0c68eab84ed..82a0e8a2cb7 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java
@@ -30,6 +30,7 @@
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
+import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.Merger;
import org.apache.dubbo.rpc.cluster.merger.MergerFactory;
@@ -47,26 +48,31 @@
import java.util.concurrent.TimeUnit;
@SuppressWarnings("unchecked")
-public class MergeableClusterInvoker implements Invoker {
+public class MergeableClusterInvoker extends AbstractClusterInvoker {
private static final Logger log = LoggerFactory.getLogger(MergeableClusterInvoker.class);
- private final Directory directory;
private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true));
public MergeableClusterInvoker(Directory directory) {
- this.directory = directory;
+ super(directory);
}
@Override
- @SuppressWarnings("rawtypes")
- public Result invoke(final Invocation invocation) throws RpcException {
- List> invokers = directory.list(invocation);
-
+ protected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
+ checkInvokers(invokers, invocation);
String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
for (final Invoker invoker : invokers) {
if (invoker.isAvailable()) {
- return invoker.invoke(invocation);
+ try {
+ return invoker.invoke(invocation);
+ } catch (RpcException e) {
+ if (e.isNoInvokerAvailableAfterFilter()) {
+ log.debug("No available provider for service" + directory.getUrl().getServiceKey() + " on group " + invoker.getUrl().getParameter(Constants.GROUP_KEY) + ", will continue to try another group.");
+ } else {
+ throw e;
+ }
+ }
}
}
return invokers.iterator().next().invoke(invocation);
@@ -101,8 +107,8 @@ public Result call() throws Exception {
try {
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
if (r.hasException()) {
- log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
- " failed: " + r.getException().getMessage(),
+ log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
+ " failed: " + r.getException().getMessage(),
r.getException());
} else {
resultList.add(r);
@@ -128,7 +134,7 @@ public Result call() throws Exception {
try {
method = returnType.getMethod(merger, returnType);
} catch (NoSuchMethodException e) {
- throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " +
+ throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " +
returnType.getClass().getName() + " ]");
}
if (!Modifier.isPublic(method.getModifiers())) {
@@ -170,6 +176,7 @@ public Result call() throws Exception {
return new RpcResult(result);
}
+
@Override
public Class getInterface() {
return directory.getInterface();
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareCluster.java
new file mode 100644
index 00000000000..81b21a3dbbf
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareCluster.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.support;
+
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Cluster;
+import org.apache.dubbo.rpc.cluster.Directory;
+
+/**
+ *
+ */
+public class RegistryAwareCluster implements Cluster {
+
+ public final static String NAME = "registryaware";
+
+ @Override
+ public Invoker join(Directory directory) throws RpcException {
+ return new RegistryAwareClusterInvoker(directory);
+ }
+
+}
\ No newline at end of file
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareClusterInvoker.java
new file mode 100644
index 00000000000..1d5d42b7b4f
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareClusterInvoker.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.support;
+
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Directory;
+import org.apache.dubbo.rpc.cluster.LoadBalance;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class RegistryAwareClusterInvoker extends AbstractClusterInvoker