Skip to content

Commit

Permalink
fix empty protocol in consule registry. apache#4294
Browse files Browse the repository at this point in the history
  • Loading branch information
cvictory committed Jun 20, 2019
1 parent 60c9d9d commit b8e05d2
Showing 1 changed file with 24 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.dubbo.registry.consul;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;
Expand Down Expand Up @@ -49,6 +51,8 @@

import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;

Expand Down Expand Up @@ -136,12 +140,12 @@ public void doSubscribe(URL url, NotifyListener listener) {
Response<Map<String, List<String>>> response = getAllServices(-1, buildWatchTimeout(url));
index = response.getConsulIndex();
List<HealthService> services = getHealthServices(response.getValue());
urls = convert(services);
urls = convert(services, url);
} else {
String service = url.getServiceKey();
Response<List<HealthService>> response = getHealthServices(service, -1, buildWatchTimeout(url));
index = response.getConsulIndex();
urls = convert(response.getValue());
urls = convert(response.getValue(), url);
}

notify(url, listener, urls);
Expand Down Expand Up @@ -175,7 +179,7 @@ public List<URL> lookup(URL url) {
if (result == null || result.getValue() == null || result.getValue().isEmpty()) {
return new ArrayList<>();
} else {
return convert(result.getValue());
return convert(result.getValue(), url);
}
} catch (Throwable e) {
throw new RpcException("Failed to lookup " + url + " from consul " + getUrl() + ", cause: " + e.getMessage(), e);
Expand Down Expand Up @@ -241,7 +245,10 @@ private boolean isProviderSide(URL url) {
return url.getProtocol().equals(PROVIDER_PROTOCOL);
}

private List<URL> convert(List<HealthService> services) {
private List<URL> convert(List<HealthService> services, URL consumerURL) {
if (CollectionUtils.isEmpty(services)) {
return emptyURL(consumerURL);
}
return services.stream()
.map(HealthService::getService)
.filter(Objects::nonNull)
Expand All @@ -252,6 +259,17 @@ private List<URL> convert(List<HealthService> services) {
.collect(Collectors.toList());
}

private List<URL> emptyURL(URL consumerURL) {
// No Category Parameter
URL empty = URLBuilder.from(consumerURL)
.setProtocol(EMPTY_PROTOCOL)
.removeParameter(CATEGORY_KEY)
.build();
List<URL> result = new ArrayList<URL>();
result.add(empty);
return result;
}

private NewService buildService(URL url) {
NewService service = new NewService();
service.setAddress(url.getHost());
Expand Down Expand Up @@ -318,7 +336,7 @@ private void processService() {
if (currentIndex != null && currentIndex > consulIndex) {
consulIndex = currentIndex;
List<HealthService> services = response.getValue();
List<URL> urls = convert(services);
List<URL> urls = convert(services, url);
for (NotifyListener listener : getSubscribed().get(url)) {
doNotify(url, listener, urls);
}
Expand All @@ -331,7 +349,7 @@ private void processServices() {
if (currentIndex != null && currentIndex > consulIndex) {
consulIndex = currentIndex;
List<HealthService> services = getHealthServices(response.getValue());
List<URL> urls = convert(services);
List<URL> urls = convert(services, url);
for (NotifyListener listener : getSubscribed().get(url)) {
doNotify(url, listener, urls);
}
Expand Down

0 comments on commit b8e05d2

Please sign in to comment.