Skip to content

Commit

Permalink
Refactor nacos to update (#11313)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlbumenJ authored Jan 17, 2023
1 parent 62ccf46 commit 71e0c46
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class NacosNamingServiceWrapper {
public NacosNamingServiceWrapper(NacosConnectionManager nacosConnectionManager, int retryTimes, int sleepMsBetweenRetries) {
this.nacosConnectionManager = nacosConnectionManager;
this.isSupportBatchRegister = MethodUtils.findMethod(NamingService.class, "batchRegisterInstance", String.class, String.class, List.class) != null;
logger.info("Nacos batch register enable: " + isSupportBatchRegister);
this.retryTimes = Math.max(retryTimes, 0);
this.sleepMsBetweenRetries = sleepMsBetweenRetries;
}
Expand Down Expand Up @@ -141,6 +142,7 @@ public void registerInstance(String serviceName, String group, Instance instance
instancesInfo.setBatchRegistered(true);
return;
} catch (NacosException e) {
logger.info("Failed to batch register to nacos. Service Name: " + serviceName + ". Maybe nacos server not support. Will fallback to multi connection register.");
// ignore
}
}
Expand Down Expand Up @@ -170,6 +172,52 @@ public void registerInstance(String serviceName, String group, Instance instance
}
}

public void updateInstance(String serviceName, String group, Instance oldInstance, Instance newInstance) throws NacosException {
String nacosServiceName = handleInnerSymbol(serviceName);
InstancesInfo instancesInfo;
try {
mapLock.lock();
instancesInfo = registerStatus.computeIfAbsent(new InstanceId(nacosServiceName, group), id -> new InstancesInfo());
} finally {
mapLock.unlock();
}

try {
instancesInfo.lock();
if (!instancesInfo.isValid() || instancesInfo.getInstances().isEmpty()) {
throw new IllegalArgumentException(serviceName + " has not been registered to nacos.");
}

Optional<InstanceInfo> optional = instancesInfo.getInstances()
.stream()
.filter(instanceInfo -> instanceInfo.getInstance().equals(oldInstance))
.findAny();

if (!optional.isPresent()) {
throw new IllegalArgumentException(oldInstance + " has not been registered to nacos.");
}

InstanceInfo oldInstanceInfo = optional.get();
instancesInfo.getInstances().remove(oldInstanceInfo);
instancesInfo.getInstances().add(new InstanceInfo(newInstance, oldInstanceInfo.getNamingService()));

if (isSupportBatchRegister && instancesInfo.isBatchRegistered()) {
NamingService namingService = oldInstanceInfo.getNamingService();
List<Instance> instanceListToRegister = instancesInfo.getInstances().stream()
.map(InstanceInfo::getInstance)
.collect(Collectors.toList());

accept(() -> namingService.batchRegisterInstance(nacosServiceName, group, instanceListToRegister));
return;
}

// fallback to register one by one
accept(() -> oldInstanceInfo.getNamingService().registerInstance(nacosServiceName, group, newInstance));
} finally {
instancesInfo.unlock();
}
}

public void deregisterInstance(String serviceName, String group, String ip, int port) throws NacosException {
String nacosServiceName = handleInnerSymbol(serviceName);
InstancesInfo instancesInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand All @@ -34,6 +35,7 @@
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.nacos.util.NacosNamingServiceUtils;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.ApplicationModel;

import com.alibaba.nacos.api.exception.NacosException;
Expand All @@ -51,6 +53,7 @@
import static org.apache.dubbo.registry.nacos.util.NacosNamingServiceUtils.createNamingService;
import static org.apache.dubbo.registry.nacos.util.NacosNamingServiceUtils.getGroup;
import static org.apache.dubbo.registry.nacos.util.NacosNamingServiceUtils.toInstance;
import static org.apache.dubbo.rpc.RpcException.REGISTRY_EXCEPTION;

/**
* Nacos {@link ServiceDiscovery} implementation
Expand Down Expand Up @@ -102,14 +105,31 @@ public void doUnregister(ServiceInstance serviceInstance) throws RuntimeExceptio

@Override
protected void doUpdate(ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance) throws RuntimeException {
// register first to ensure that consumer will not throw no provider exception
if (!EMPTY_REVISION.equals(getExportedServicesRevision(serviceInstance))) {
if (EMPTY_REVISION.equals(getExportedServicesRevision(newServiceInstance))) {
super.doUpdate(oldServiceInstance, newServiceInstance);
return;
}

if (!Objects.equals(newServiceInstance.getHost(), oldServiceInstance.getHost()) ||
!Objects.equals(newServiceInstance.getPort(), oldServiceInstance.getPort())) {
// Ignore if id changed. Should unregister first.
super.doUpdate(oldServiceInstance, newServiceInstance);
return;
}

Instance oldInstance = toInstance(oldServiceInstance);
Instance newInstance = toInstance(newServiceInstance);

try {
this.serviceInstance = newServiceInstance;
reportMetadata(newServiceInstance.getServiceMetadata());
this.doRegister(newServiceInstance);
execute(namingService, service -> {
Instance instance = toInstance(serviceInstance);
service.updateInstance(instance.getServiceName(), group, oldInstance, newInstance);
});
} catch (Exception e) {
throw new RpcException(REGISTRY_EXCEPTION, "Failed register instance " + newServiceInstance.toString(), e);
}

this.doUnregister(oldServiceInstance);
}

@Override
Expand Down

0 comments on commit 71e0c46

Please sign in to comment.