Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more code review. #3083

Merged
merged 50 commits into from
Dec 29, 2018
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
9a4bc92
refactor ScriptRouter
beiwei30 Dec 27, 2018
919bfc6
refactor TagRouter
beiwei30 Dec 27, 2018
9b6c9ec
refactor AbstractConfiguratorListener
beiwei30 Dec 27, 2018
7f47626
make sure parameter should not be null
beiwei30 Dec 27, 2018
a334c06
correct comments
beiwei30 Dec 27, 2018
e495884
make ReferenceConfigurationListener private static
beiwei30 Dec 27, 2018
60506f2
avoid dup code in init
beiwei30 Dec 27, 2018
daf7f6e
add fixme for potential useless code
beiwei30 Dec 27, 2018
ffc6c4d
clean up useless variables
beiwei30 Dec 27, 2018
849c305
move methods into UrlUtils
beiwei30 Dec 27, 2018
642ac47
make method private
beiwei30 Dec 27, 2018
58d0869
reformat javadoc
beiwei30 Dec 27, 2018
a98e1e2
avoid dup code
beiwei30 Dec 27, 2018
85263e2
reformat log message
beiwei30 Dec 27, 2018
aae6be0
reformat log message
beiwei30 Dec 27, 2018
c7fa8ec
reformat the code
beiwei30 Dec 27, 2018
c261ab1
remove useless imports
beiwei30 Dec 27, 2018
0a35848
Merge branch 'master' into code-review
beiwei30 Dec 27, 2018
3790680
remove useless code
beiwei30 Dec 27, 2018
462de88
Merge branch 'code-review' of https://github.com/beiwei30/incubator-d…
beiwei30 Dec 27, 2018
1d63715
refactor ScriptRouter
beiwei30 Dec 27, 2018
7022fef
refactor TagRouter
beiwei30 Dec 27, 2018
d3f2a68
refactor AbstractConfiguratorListener
beiwei30 Dec 27, 2018
b7559dd
Add comment
chickenlj Dec 27, 2018
5d53967
Fix UT
chickenlj Dec 27, 2018
4fdbf80
make sure parameter should not be null
beiwei30 Dec 27, 2018
77e94f7
correct comments
beiwei30 Dec 27, 2018
65946a4
make ReferenceConfigurationListener private static
beiwei30 Dec 27, 2018
0bb7f5a
Revert demo changes
chickenlj Dec 27, 2018
24d367a
Revert code to avoid NPE in RPC wire after providers are cleared.
chickenlj Dec 27, 2018
7e56894
make ListenableRouter code thread safe
chickenlj Dec 27, 2018
a376db3
Fix UT
chickenlj Dec 27, 2018
647c2ae
Remove assert check to continue with execute.
chickenlj Dec 27, 2018
f070470
avoid dup code in init
beiwei30 Dec 27, 2018
82adeb4
solve compile error
chickenlj Dec 27, 2018
6ffcfdd
add fixme for potential useless code
beiwei30 Dec 27, 2018
7a99a01
clean up useless variables
beiwei30 Dec 27, 2018
2aa03e4
move methods into UrlUtils
beiwei30 Dec 27, 2018
c1e37be
make method private
beiwei30 Dec 27, 2018
0a59ed8
reformat javadoc
beiwei30 Dec 27, 2018
20882ae
avoid dup code
beiwei30 Dec 27, 2018
aa26f11
reformat log message
beiwei30 Dec 27, 2018
e37453e
reformat log message
beiwei30 Dec 27, 2018
81596df
reformat the code
beiwei30 Dec 27, 2018
cb83a45
remove useless imports
beiwei30 Dec 27, 2018
90ca4d6
remove useless code
beiwei30 Dec 27, 2018
1878000
Merge branch 'code-review' of https://github.com/beiwei30/incubator-d…
beiwei30 Dec 28, 2018
de6bddf
Merge branch 'master' into code-review
beiwei30 Dec 28, 2018
d8e4642
code review comments from @khanimteyaz
beiwei30 Dec 28, 2018
f597d39
code review from @khanimteyaz
beiwei30 Dec 28, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.dubbo.common.Constants.CONFIGURATORS_CATEGORY;
import static org.apache.dubbo.common.Constants.OVERRIDE_PROTOCOL;
import static org.apache.dubbo.common.Constants.PROVIDERS_CATEGORY;
import static org.apache.dubbo.common.Constants.ROUTERS_CATEGORY;
import static org.apache.dubbo.common.Constants.ROUTE_PROTOCOL;

public class UrlUtils {

/**
Expand Down Expand Up @@ -445,6 +451,22 @@ public static List<URL> classifyUrls(List<URL> urls, Predicate<URL> predicate) {
return urls.stream().filter(predicate).collect(Collectors.toList());
}

public static boolean isConfigure(URL url) {
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
return CONFIGURATORS_CATEGORY.equals(url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY))
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
|| OVERRIDE_PROTOCOL.equals(url.getProtocol());
}

public static boolean isRoute(URL url) {
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
return ROUTE_PROTOCOL.equals(url.getProtocol())
|| ROUTERS_CATEGORY.equals(url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY));
}

public static boolean isProvider(URL url) {
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
return PROVIDERS_CATEGORY.equals(url.getParameter(Constants.CATEGORY_KEY, PROVIDERS_CATEGORY))
&& !OVERRIDE_PROTOCOL.equals(url.getProtocol())
&& !ROUTE_PROTOCOL.equals(url.getProtocol());
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Check if the given value matches the given pattern. The pattern supports wildcard "*".
*
Expand All @@ -459,4 +481,4 @@ static boolean isItemMatch(String pattern, String value) {
return "*".equals(pattern) || pattern.equals(value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
Expand All @@ -34,7 +35,6 @@
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Configurator;
import org.apache.dubbo.rpc.cluster.ConfiguratorFactory;
import org.apache.dubbo.rpc.cluster.Router;
import org.apache.dubbo.rpc.cluster.RouterChain;
import org.apache.dubbo.rpc.cluster.RouterFactory;
Expand All @@ -47,7 +47,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -61,7 +60,6 @@
import static org.apache.dubbo.common.Constants.CONFIGURATORS_CATEGORY;
import static org.apache.dubbo.common.Constants.DEFAULT_CATEGORY;
import static org.apache.dubbo.common.Constants.DYNAMIC_CONFIGURATORS_CATEGORY;
import static org.apache.dubbo.common.Constants.OVERRIDE_PROTOCOL;
import static org.apache.dubbo.common.Constants.PROVIDERS_CATEGORY;
import static org.apache.dubbo.common.Constants.ROUTERS_CATEGORY;
import static org.apache.dubbo.common.Constants.ROUTE_PROTOCOL;
Expand All @@ -80,13 +78,10 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getAdaptiveExtension();

private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getAdaptiveExtension();
private final String serviceKey; // Initialization at construction time, assertion not null
private final Class<T> serviceType; // Initialization at construction time, assertion not null
private final Map<String, String> queryMap; // Initialization at construction time, assertion not null
private final URL directoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
private final String[] serviceMethods;
private final boolean multiGroup;
private Protocol protocol; // Initialization at the time of injection, the assertion is not null
private Registry registry; // Initialization at the time of injection, the assertion is not null
Expand All @@ -106,9 +101,6 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
private volatile List<Invoker<T>> invokers;

// Map<methodName, Invoker> cache service method to invokers mapping.
// private volatile Map<String, List<Invoker<T>>> methodInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference

// Set<invokerUrls> cache invokeUrls to invokers mapping.
private volatile Set<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference

Expand All @@ -130,8 +122,6 @@ public RegistryDirectory(Class<T> serviceType, URL url) {
this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url);
String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
this.multiGroup = group != null && ("*".equals(group) || group.contains(","));
String methods = queryMap.get(Constants.METHODS_KEY);
this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);
}

private URL turnRegistryUrlToConsumerUrl(URL url) {
Expand Down Expand Up @@ -187,37 +177,38 @@ public void destroy() {

@Override
public synchronized void notify(List<URL> urls) {
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
List<URL> categoryUrls = urls.stream().filter(this::isValidCategory).filter(this::isNotCompatibleFor26x).collect(Collectors.toList());
List<URL> categoryUrls = urls.stream()
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.toList());

/**
* TODO Try to refactor the processing of these three type of urls using Collectors.groupBy()?
*/
this.configurators = Configurator.toConfigurators(classifyUrls(categoryUrls, url -> (CONFIGURATORS_CATEGORY.equals(url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY))
|| OVERRIDE_PROTOCOL.equals(url.getProtocol())))).orElse(configurators);
this.configurators = Configurator.toConfigurators(classifyUrls(categoryUrls, UrlUtils::isConfigure))
.orElse(configurators);

toRouters(classifyUrls(categoryUrls, url -> {
return ROUTE_PROTOCOL.equals(url.getProtocol())
|| ROUTERS_CATEGORY.equals(url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY));
})).ifPresent(this::addRouters);
toRouters(classifyUrls(categoryUrls, UrlUtils::isRoute)).ifPresent(this::addRouters);
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved

// providers
refreshOverrideAndInvoker(classifyUrls(categoryUrls, url -> PROVIDERS_CATEGORY.equals(url.getParameter(Constants.CATEGORY_KEY, PROVIDERS_CATEGORY))
&& !OVERRIDE_PROTOCOL.equals(url.getProtocol())
&& !ROUTE_PROTOCOL.equals(url.getProtocol()))
);
refreshOverrideAndInvoker(classifyUrls(categoryUrls, UrlUtils::isProvider));
}

public void refreshOverrideAndInvoker(List<URL> urls) {
private void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();
refreshInvoker(urls);
}

/**
* Convert the invokerURL list to the Invoker Map. The rules of the conversion are as follows:
* 1.If URL has been converted to invoker, it is no longer re-referenced and obtained directly from the cache, and notice that any parameter changes in the URL will be re-referenced.
* 2.If the incoming invoker list is not empty, it means that it is the latest invoker list
* 3.If the list of incoming invokerUrl is empty, It means that the rule is only a override rule or a route rule, which needs to be re-contrasted to decide whether to re-reference.
* <ol>
* <li> If URL has been converted to invoker, it is no longer re-referenced and obtained directly from the cache,
* and notice that any parameter changes in the URL will be re-referenced.</li>
* <li>If the incoming invoker list is not empty, it means that it is the latest invoker list.</li>
* <li>If the list of incoming invokerUrl is empty, It means that the rule is only a override rule or a route
* rule, which needs to be re-contrasted to decide whether to re-reference.</li>
* </ol>
*
* @param invokerUrls this parameter can't be null
*/
Expand Down Expand Up @@ -248,7 +239,6 @@ private void refreshInvoker(List<URL> invokerUrls) {
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
// Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map

// state change
// If the calculation is wrong, it is not processed.
Expand All @@ -262,7 +252,6 @@ private void refreshInvoker(List<URL> invokerUrls) {
// pre-route and build cache, notice that route cache should build on original Invoker list.
// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
routerChain.setInvokers(newInvokers);
// this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;

Expand Down Expand Up @@ -360,10 +349,10 @@ private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
continue;
}
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl()
.getAddress() + " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " + ExtensionLoader
.getExtensionLoader(Protocol.class)
.getSupportedExtensions()));
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
URL url = mergeUrl(providerUrl);
Expand Down Expand Up @@ -437,71 +426,22 @@ private URL mergeUrl(URL providerUrl) {
}

private URL overrideWithConfigurator(URL providerUrl) {
List<Configurator> localConfigurators = this.configurators; // local reference
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
providerUrl = configurator.configure(providerUrl);
}
}

List<Configurator> localAppDynamicConfigurators = consumerConfigurationListener.getConfigurators(); // local reference
if (localAppDynamicConfigurators != null && !localAppDynamicConfigurators.isEmpty()) {
for (Configurator configurator : localAppDynamicConfigurators) {
providerUrl = configurator.configure(providerUrl);
}
}

providerUrl = overrideWithConfigurators(this.configurators, providerUrl);
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
providerUrl = overrideWithConfigurators(consumerConfigurationListener.getConfigurators(), providerUrl);
if (serviceConfigurationListener != null) {
List<Configurator> localDynamicConfigurators = serviceConfigurationListener.getConfigurators(); // local reference
if (localDynamicConfigurators != null && !localDynamicConfigurators.isEmpty()) {
for (Configurator configurator : localDynamicConfigurators) {
providerUrl = configurator.configure(providerUrl);
}
}
providerUrl = overrideWithConfigurators(serviceConfigurationListener.getConfigurators(), providerUrl);
}

return providerUrl;
}

/**
* Transform the invokers list into a mapping relationship with a method
*
* @param invokersMap Invoker Map
* @return Mapping relation between Invoker and method
*/
private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
// According to the methods classification declared by the provider URL, the methods is compatible with the registry to execute the filtered methods
List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();
if (invokersMap != null && invokersMap.size() > 0) {
for (Invoker<T> invoker : invokersMap.values()) {
String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY);
if (parameter != null && parameter.length() > 0) {
String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
if (methods != null && methods.length > 0) {
for (String method : methods) {
if (method != null && method.length() > 0 && !Constants.ANY_VALUE.equals(method)) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null) {
methodInvokers = new ArrayList<Invoker<T>>();
newMethodInvokerMap.put(method, methodInvokers);
}
methodInvokers.add(invoker);
}
}
}
}
invokersList.add(invoker);
private URL overrideWithConfigurators(List<Configurator> configurators, URL url) {
if (configurators != null && !configurators.isEmpty()) {
for (Configurator configurator : configurators) {
url = configurator.configure(url);
}
}
newMethodInvokerMap.put(Constants.ANY_VALUE, invokersList);
// sort and unmodifiable
for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
Collections.sort(methodInvokers, InvokerComparator.getComparator());
newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
}
return Collections.unmodifiableMap(newMethodInvokerMap);
return url;
}

/**
Expand Down Expand Up @@ -571,8 +511,10 @@ private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl()
.getServiceKey() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
", please check status of providers(disabled, not registered or in blacklist).");
}

if (multiGroup) {
Expand Down Expand Up @@ -647,33 +589,16 @@ public List<Invoker<T>> getInvokers() {
return invokers;
}

private static class InvokerComparator implements Comparator<Invoker<?>> {

private static final InvokerComparator comparator = new InvokerComparator();

private InvokerComparator() {
}

public static InvokerComparator getComparator() {
return comparator;
}

@Override
public int compare(Invoker<?> o1, Invoker<?> o2) {
return o1.getUrl().toString().compareTo(o2.getUrl().toString());
}

}

private boolean isValidCategory(URL url) {
String category = url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
if ((ROUTERS_CATEGORY.equals(category) || ROUTE_PROTOCOL.equals(url.getProtocol())) || PROVIDERS_CATEGORY.equals(category) || CONFIGURATORS_CATEGORY
.equals(category) || DYNAMIC_CONFIGURATORS_CATEGORY.equals(category) || APP_DYNAMIC_CONFIGURATORS_CATEGORY
.equals(category)) {
if ((ROUTERS_CATEGORY.equals(category) || ROUTE_PROTOCOL.equals(url.getProtocol())) ||
PROVIDERS_CATEGORY.equals(category) ||
CONFIGURATORS_CATEGORY.equals(category) || DYNAMIC_CONFIGURATORS_CATEGORY.equals(category) ||
APP_DYNAMIC_CONFIGURATORS_CATEGORY.equals(category)) {
return true;
}
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils
.getLocalHost());
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " +
getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
return false;
}

Expand Down