Skip to content

Commit

Permalink
Merge pull request #8 from apache/master
Browse files Browse the repository at this point in the history
Master sync
  • Loading branch information
khanimteyaz authored Dec 27, 2018
2 parents 565aa1a + dc1dec5 commit 5b67c0f
Show file tree
Hide file tree
Showing 61 changed files with 403 additions and 728 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import java.util.concurrent.ConcurrentMap;

/**
* If you want to provide a Router implementation based on design of v2.7.0, please extend from this abstract class.
* For 2.6.x style Router, please implement and use RouterFactory directly.
* If you want to provide a router implementation based on design of v2.7.0, please extend from this abstract class.
* For 2.6.x style router, please implement and use RouterFactory directly.
*/
public abstract class AbstractRouterFactory implements RouterFactory {
public abstract class CacheableRouterFactory implements RouterFactory {
private ConcurrentMap<String, Router> routerMap = new ConcurrentHashMap<>();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.dubbo.rpc.RpcException;

import java.util.List;
import java.util.Map;

/**
* Router. (SPI, Prototype, ThreadSafe)
Expand All @@ -34,7 +33,7 @@
*/
public interface Router extends Comparable<Router> {
/**
* get the router url.
* Get the router url.
*
* @return url
*/
Expand All @@ -43,44 +42,47 @@ public interface Router extends Comparable<Router> {
/**
* Filter invokers with current routing rule and only return the invokers that comply with the rule.
*
* @param invokers
* @param invokers invoker list
* @param url refer url
* @param invocation
* @param invocation invocation
* @return routed invokers
* @throws RpcException
*/
<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

default <T> Map<String, List<Invoker<T>>> preRoute(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
return null;
}

/**
* Each router has a reference of the router chain.
* Notify the router the invoker list. Invoker list may change from time to time. This method gives the router a
* chance to prepare before {@link Router#route(List, URL, Invocation)} gets called.
*
* @param routerChain
* @param invokers invoker list
* @param <T> invoker's type
*/
void addRouterChain(RouterChain routerChain);
default <T> void notify(List<Invoker<T>> invokers) {

}

/**
* To decide whether this router need to execute every time an RPC comes or should only execute when addresses or rule change.
* To decide whether this router need to execute every time an RPC comes or should only execute when addresses or
* rule change.
*
* @return
* @return true if the router need to execute every time.
*/
boolean isRuntime();

/**
* To decide whether this router should take effect when none of the invoker can match the router rule, which means the {@link #route(List, URL, Invocation)} would be empty.
* Most of time, most router implementation would default this value to false.
* To decide whether this router should take effect when none of the invoker can match the router rule, which
* means the {@link #route(List, URL, Invocation)} would be empty. Most of time, most router implementation would
* default this value to false.
*
* @return
* @return true to execute if none of invokers matches the current router
*/
boolean isForce();

/**
* used to sort routers.
* Router's priority, used to sort routers.
*
* @return
* @return router's priority
*/
int getPriority();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,129 +28,89 @@
import java.util.stream.Collectors;

/**
*
* Router chain
*/
public class RouterChain<T> {

// full list of addresses from registry, classified by method name.
private List<Invoker<T>> fullInvokers;
private URL url;
private List<Invoker<T>> invokers = Collections.emptyList();

// containing all routers, reconstruct every time 'route://' urls change.
private List<Router> routers = new CopyOnWriteArrayList<>();
// Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the rule for each instance may change but the instance will never delete or recreate.
private List<Router> residentRouters;
private volatile List<Router> routers = Collections.emptyList();

// Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the rule for each instance may change but the
// instance will never delete or recreate.
private List<Router> builtinRouters = Collections.emptyList();

public static <T> RouterChain<T> buildChain(URL url) {
RouterChain<T> routerChain = new RouterChain<>(url);
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class).getActivateExtension(url, (String[]) null);
List<Router> routers = extensionFactories.stream()
.map(factory -> {
Router router = factory.getRouter(url);
router.addRouterChain(routerChain);
return router;
}).collect(Collectors.toList());
routerChain.setResidentRouters(routers);
return routerChain;
return new RouterChain<>(url);
}

protected RouterChain(List<Router> routers) {
this.routers.addAll(routers);
}
private RouterChain(URL url) {
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, (String[]) null);

protected RouterChain(URL url) {
this.url = url;
List<Router> routers = extensionFactories.stream()
.map(factory -> factory.getRouter(url))
.collect(Collectors.toList());

initWithRouters(routers);
}

/**
* the resident routers must being initialized before address notification.
*
* @param residentRouters
* FIXME: this method should not be public
*/
public void setResidentRouters(List<Router> residentRouters) {
this.residentRouters = residentRouters;
this.routers.addAll(residentRouters);
public void initWithRouters(List<Router> builtinRouters) {
this.builtinRouters = builtinRouters;
this.routers = new CopyOnWriteArrayList<>(builtinRouters);
this.sort();
}

public void addRouter(Router router) {
this.routers.add(router);
}

/**
* If we use route:// protocol in version before 2.7.0, each URL will generate a Router instance,
* so we should keep the routers up to date, that is, each time router URLs changes, we should update the routers list,
* only keep the residentRouters which are available all the time and the latest notified routers which are generated from URLs.
* If we use route:// protocol in version before 2.7.0, each URL will generate a Router instance, so we should
* keep the routers up to date, that is, each time router URLs changes, we should update the routers list, only
* keep the builtinRouters which are available all the time and the latest notified routers which are generated
* from URLs.
*
* @param generatedRouters routers from 'router://' rules in 2.6.x or before.
* @param routers routers from 'router://' rules in 2.6.x or before.
*/
public void setGeneratedRouters(List<Router> generatedRouters) {
public void addRouters(List<Router> routers) {
List<Router> newRouters = new CopyOnWriteArrayList<>();
newRouters.addAll(residentRouters);
newRouters.addAll(generatedRouters);
newRouters.addAll(builtinRouters);
newRouters.addAll(routers);
CollectionUtils.sort(routers);
this.routers = newRouters;
// FIXME will sort cause concurrent problem? since it's kind of a write operation.
this.sort();
/* if (fullInvokers != null) {
this.preRoute(fullInvokers, url, null);
}*/
}

public void sort() {
private void sort() {
Collections.sort(routers);
}

/**
* TODO
*
* Building of router cache can be triggered from within different threads, for example, registry notification and governance notification.
* So this operation should be synchronized.
* @param invokers
* @param url
* @param invocation
*/
public void preRoute(List<Invoker<T>> invokers, URL url, Invocation invocation) {
for (Router router : routers) {
router.preRoute(invokers, url, invocation);
}
}

/**
*
* @param url
* @param invocation
* @return
*/
public List<Invoker<T>> route(URL url, Invocation invocation) {
List<Invoker<T>> finalInvokers = fullInvokers;
List<Invoker<T>> finalInvokers = invokers;
for (Router router : routers) {
// if (router.isRuntime()) {
// finalInvokers = router.route(finalInvokers, url, invocation);
// }
finalInvokers = router.route(finalInvokers, url, invocation);
}
return finalInvokers;
}

/**
* When any of the router's rule changed, notify the router chain to rebuild cache from scratch.
*/
public void notifyRuleChanged() {
if (CollectionUtils.isEmpty(this.fullInvokers)) {
return;
}
preRoute(this.fullInvokers, url, null);
}

/**
* Notify router chain of the initial addresses from registry at the first time.
* Notify whenever addresses in registry change.
*
* @param invokers
* @param url
*/
public void notifyFullInvokers(List<Invoker<T>> invokers, URL url) {
setFullMethodInvokers(invokers);
preRoute(invokers, url, null);
}

public void setFullMethodInvokers(List<Invoker<T>> fullInvokers) {
this.fullInvokers = (fullInvokers == null ? Collections.emptyList() : fullInvokers);
public void setInvokers(List<Invoker<T>> invokers) {
this.invokers = (invokers == null ? Collections.emptyList() : invokers);
routers.forEach(router -> router.notify(this.invokers));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,20 @@
*
* @see org.apache.dubbo.rpc.cluster.Cluster#join(Directory)
* @see org.apache.dubbo.rpc.cluster.Directory#list(org.apache.dubbo.rpc.Invocation)
*
* Note Router has a different behaviour since 2.7.0, for each type of Router, there will only has one Router instance for each service.
* See {@link AbstractRouterFactory} and {@link RouterChain} for how to extend a new Router or how the Router instances are loaded.
* <p>
* Note Router has a different behaviour since 2.7.0, for each type of Router, there will only has one Router instance
* for each service. See {@link CacheableRouterFactory} and {@link RouterChain} for how to extend a new Router or how
* the Router instances are loaded.
*/
@SPI
public interface RouterFactory {

/**
* Create router.
* Since 2.7.0, most of the time, we will not use @Adaptive feature, so it's keeped only for compatibility.
* Since 2.7.0, most of the time, we will not use @Adaptive feature, so it's kept only for compatibility.
*
* @param url
* @return router
* @param url url
* @return router instance
*/
@Adaptive("protocol")
Router getRouter(URL url);
Expand Down
Loading

0 comments on commit 5b67c0f

Please sign in to comment.