Skip to content

Commit

Permalink
optimize DistroConsistencyServiceImpl task process
Browse files Browse the repository at this point in the history
  • Loading branch information
zrlw committed Dec 11, 2021
1 parent a891d5b commit b646b97
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,11 +380,14 @@ public class Notifier implements Runnable {
*/
public void addTask(String datumKey, DataOperation action) {

if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
if (services.putIfAbsent(datumKey, StringUtils.EMPTY) != null) {
// if previous change task is waiting for handling.
return;
}
} else if (action == DataOperation.DELETE) {
// remove previous change task to permit new change task.
services.remove(datumKey);
}
tasks.offer(Pair.with(datumKey, action));
}
Expand Down Expand Up @@ -412,7 +415,18 @@ private void handle(Pair<String, DataOperation> pair) {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();

services.remove(datumKey);
if (action == DataOperation.CHANGE) {
// remove current change task to permit new change task.
if (services.remove(datumKey) == null) {
// if current change task is removed by other.
return;
}
} else if (action == DataOperation.DELETE) {
if (services.contains(datumKey)) {
// if new change task is waiting for handling.
return;
}
}

int count = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,8 @@ public void createServiceIfAbsent(String namespaceId, String serviceName, boolea
}
service.validate();

putServiceAndInit(service);
if (!local) {
boolean success = putServiceAndInit(service);
if (success && !local) {
addOrReplaceService(service);
}
}
Expand Down Expand Up @@ -889,15 +889,20 @@ public void putService(Service service) {
serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}

private void putServiceAndInit(Service service) throws NacosException {
private boolean putServiceAndInit(Service service) throws NacosException {
putService(service);
service = getService(service.getNamespaceId(), service.getName());
Service existedService = getService(service.getNamespaceId(), service.getName());
if (existedService != service) {
Loggers.SRV_LOG.warn("[NEW-SERVICE] {} failed as the same service is already existed.", service.toJson());
return false;
}
service.init();
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
return true;
}

/**
Expand Down

0 comments on commit b646b97

Please sign in to comment.