Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: the issue of possible infinite loop when cleaning up expired metadata info #15086

Open
wants to merge 7 commits into
base: 3.3
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
import org.apache.dubbo.registry.client.metadata.store.MetaCacheManager;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_METADATA_INFO_CACHE_EXPIRE;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_METADATA_INFO_CACHE_SIZE;
Expand All @@ -50,7 +51,6 @@
import static org.apache.dubbo.common.constants.CommonConstants.METADATA_INFO_CACHE_SIZE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REGISTRY_LOCAL_FILE_CACHE_ENABLED;
import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.INTERNAL_ERROR;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_FAILED_LOAD_METADATA;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_KEY;
import static org.apache.dubbo.metadata.RevisionResolver.EMPTY_REVISION;
Expand All @@ -70,7 +70,7 @@ public abstract class AbstractServiceDiscovery implements ServiceDiscovery {
protected volatile ServiceInstance serviceInstance;
protected volatile MetadataInfo metadataInfo;
protected final ConcurrentHashMap<String, MetadataInfoStat> metadataInfos = new ConcurrentHashMap<>();
protected final ScheduledFuture<?> refreshCacheFuture;
protected volatile ScheduledFuture<?> refreshCacheFuture;
protected MetadataReport metadataReport;
protected String metadataType;
protected final MetaCacheManager metaCacheManager;
Expand Down Expand Up @@ -110,36 +110,44 @@ private AbstractServiceDiscovery(ApplicationModel applicationModel, String servi
registryURL.getParameter(METADATA_INFO_CACHE_EXPIRE_KEY, DEFAULT_METADATA_INFO_CACHE_EXPIRE);
int metadataInfoCacheSize =
registryURL.getParameter(METADATA_INFO_CACHE_SIZE_KEY, DEFAULT_METADATA_INFO_CACHE_SIZE);
startRefreshCache(metadataInfoCacheExpireTime / 2, metadataInfoCacheSize, metadataInfoCacheExpireTime);
}

private void removeExpiredMetadataInfo(int metadataInfoCacheSize, int metadataInfoCacheExpireTime) {
Long nextTime = null;
// Cache cleanup is only required when the cache size exceeds the cache limit.
if (metadataInfos.size() > metadataInfoCacheSize) {
List<MetadataInfoStat> values = new ArrayList<>(metadataInfos.values());
// Place the earliest data at the front
values.sort(Comparator.comparingLong(MetadataInfoStat::getUpdateTime));
for (MetadataInfoStat v : values) {
long time = System.currentTimeMillis() - v.getUpdateTime();
if (time > metadataInfoCacheExpireTime) {
metadataInfos.remove(v.metadataInfo.getRevision(), v);
} else {
// Calculate how long it will take for the next task to start
nextTime = metadataInfoCacheExpireTime - time;
break;
}
}
}
// If there is no metadata to clean up this time, the next task will start within half of the cache expiration
// time.
startRefreshCache(
nextTime == null ? metadataInfoCacheExpireTime / 2 : nextTime,
metadataInfoCacheSize,
metadataInfoCacheExpireTime);
}

private void startRefreshCache(long nextTime, int metadataInfoCacheSize, int metadataInfoCacheExpireTime) {
this.refreshCacheFuture = applicationModel
.getFrameworkModel()
.getBeanFactory()
.getBean(FrameworkExecutorRepository.class)
.getSharedScheduledExecutor()
.scheduleAtFixedRate(
() -> {
try {
while (metadataInfos.size() > metadataInfoCacheSize) {
AtomicReference<String> oldestRevision = new AtomicReference<>();
AtomicReference<MetadataInfoStat> oldestStat = new AtomicReference<>();
metadataInfos.forEach((k, v) -> {
if (System.currentTimeMillis() - v.getUpdateTime() > metadataInfoCacheExpireTime
&& (oldestStat.get() == null
|| oldestStat.get().getUpdateTime() > v.getUpdateTime())) {
oldestRevision.set(k);
oldestStat.set(v);
}
});
if (oldestStat.get() != null) {
metadataInfos.remove(oldestRevision.get(), oldestStat.get());
}
}
} catch (Throwable t) {
logger.error(
INTERNAL_ERROR, "", "", "Error occurred when clean up metadata info cache.", t);
}
},
metadataInfoCacheExpireTime / 2,
metadataInfoCacheExpireTime / 2,
.schedule(
() -> removeExpiredMetadataInfo(metadataInfoCacheSize, metadataInfoCacheExpireTime),
nextTime,
TimeUnit.MILLISECONDS);
}

Expand Down
Loading