Skip to content

Commit

Permalink
Migration bugfix & optimization (apache#11629)
Browse files Browse the repository at this point in the history
* Code optimize

* Thread pool leak fix

* Fix volatile problem
  • Loading branch information
mytang0 authored and lcb11 committed Mar 31, 2023
1 parent 4c93b1b commit 357899d
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class DefaultMigrationAddressComparator implements MigrationAddressCompar
public static final String OLD_ADDRESS_SIZE = "OLD_ADDRESS_SIZE";
public static final String NEW_ADDRESS_SIZE = "NEW_ADDRESS_SIZE";

private ConcurrentMap<String, Map<String, Integer>> serviceMigrationData = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Map<String, Integer>> serviceMigrationData = new ConcurrentHashMap<>();

@Override
public <T> boolean shouldMigrate(ClusterInvoker<T> newInvoker, ClusterInvoker<T> oldInvoker, MigrationRule rule) {
Expand Down Expand Up @@ -90,10 +90,7 @@ public <T> boolean shouldMigrate(ClusterInvoker<T> newInvoker, ClusterInvoker<T>
return false;
}

if (((float) newAddressSize / (float) oldAddressSize) >= threshold) {
return true;
}
return false;
return ((float) newAddressSize / (float) oldAddressSize) >= threshold;
}

private <T> int getAddressSize(ClusterInvoker<T> invoker) {
Expand All @@ -115,6 +112,4 @@ private String getInvokerType(ClusterInvoker<?> invoker) {
}
return "interface";
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@
import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;

public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
private ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(MigrationInvoker.class);
private final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(MigrationInvoker.class);

private URL url;
private URL consumerUrl;
private Cluster cluster;
private Registry registry;
private Class<T> type;
private RegistryProtocol registryProtocol;
private final URL consumerUrl;
private final Cluster cluster;
private final Registry registry;
private final Class<T> type;
private final RegistryProtocol registryProtocol;
private MigrationRuleListener migrationRuleListener;
private ConsumerModel consumerModel;
private FrameworkStatusReportService reportService;
private final ConsumerModel consumerModel;
private final FrameworkStatusReportService reportService;

private volatile ClusterInvoker<T> invoker;
private volatile ClusterInvoker<T> serviceDiscoveryInvoker;
Expand All @@ -77,6 +77,7 @@ public MigrationInvoker(RegistryProtocol registryProtocol,
this(null, null, registryProtocol, cluster, registry, type, url, consumerUrl);
}

@SuppressWarnings("unchecked")
public MigrationInvoker(ClusterInvoker<T> invoker,
ClusterInvoker<T> serviceDiscoveryInvoker,
RegistryProtocol registryProtocol,
Expand Down Expand Up @@ -249,6 +250,7 @@ public void migrateToApplicationFirstInvoker(MigrationRule newRule) {
calcPreferredInvoker(newRule);
}

@SuppressWarnings("all")
private void waitAddressNotify(MigrationRule newRule, CountDownLatch latch) {
// wait and compare threshold
int delay = newRule.getDelay(consumerUrl);
Expand Down Expand Up @@ -317,6 +319,7 @@ public boolean isAvailable() {
: (invoker != null && invoker.isAvailable()) || (serviceDiscoveryInvoker != null && serviceDiscoveryInvoker.isAvailable());
}

@SuppressWarnings("unchecked")
@Override
public void destroy() {
if (migrationRuleListener != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ public class MigrationRuleHandler<T> {
public static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "dubbo.application.migration.step";
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(MigrationRuleHandler.class);

private MigrationClusterInvoker<T> migrationInvoker;
private MigrationStep currentStep;
private Float currentThreshold = 0f;
private URL consumerURL;
private final MigrationClusterInvoker<T> migrationInvoker;
private volatile MigrationStep currentStep;
private volatile Float currentThreshold = 0f;
private final URL consumerURL;

public MigrationRuleHandler(MigrationClusterInvoker<T> invoker, URL url) {
this.migrationInvoker = invoker;
Expand Down Expand Up @@ -115,12 +115,6 @@ private void setMigrationRule(MigrationRule rule) {
this.migrationInvoker.setMigrationRule(rule);
}

private Float getMigrationThreshold(MigrationRule rule, Float threshold) {
Float configuredThreshold = rule.getThreshold(consumerURL);
threshold = configuredThreshold == null ? threshold : configuredThreshold;
return threshold;
}

private void setCurrentStepAndThreshold(MigrationStep currentStep, Float currentThreshold) {
if (currentThreshold != null) {
this.currentThreshold = currentThreshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_THREAD_INTERRUPTED_EXCEPTION;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_PROPERTY_TYPE_MISMATCH;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_EMPTY_ADDRESS;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_THREAD_INTERRUPTED_EXCEPTION;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.INTERNAL_ERROR;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_EMPTY_ADDRESS;
import static org.apache.dubbo.common.constants.RegistryConstants.INIT;

/**
Expand All @@ -75,7 +75,7 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur
private static final int MIGRATION_DEFAULT_DELAY_TIME = 60000;
private String ruleKey;

protected final ConcurrentMap<MigrationInvoker, MigrationRuleHandler> handlers = new ConcurrentHashMap<>();
protected final ConcurrentMap<MigrationInvoker<?>, MigrationRuleHandler<?>> handlers = new ConcurrentHashMap<>();
protected final LinkedBlockingQueue<String> ruleQueue = new LinkedBlockingQueue<>();

private final AtomicBoolean executorSubmit = new AtomicBoolean(false);
Expand All @@ -88,7 +88,7 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur

private volatile String rawRule;
private volatile MigrationRule rule;
private ModuleModel moduleModel;
private final ModuleModel moduleModel;

public MigrationRuleListener(ModuleModel moduleModel) {
this.moduleModel = moduleModel;
Expand Down Expand Up @@ -175,44 +175,44 @@ public synchronized void process(ConfigChangedEvent event) {
logger.info("Ignore duplicated rule");
continue;
}

logger.info("Using the following migration rule to migrate:");
logger.info(rule);

setRawRule(rule);

if (CollectionUtils.isEmptyMap(handlers)) {
continue;
}

ExecutorService executorService = null;
try {
logger.info("Using the following migration rule to migrate:");
logger.info(rule);

setRawRule(rule);

if (CollectionUtils.isNotEmptyMap(handlers)) {
ExecutorService executorService = Executors.newFixedThreadPool(100, new NamedThreadFactory("Dubbo-Invoker-Migrate"));
List<Future<?>> migrationFutures = new ArrayList<>(handlers.size());
handlers.forEach((_key, handler) -> {
Future<?> future = executorService.submit(() -> {
handler.doMigrate(this.rule);
});
migrationFutures.add(future);
});

Throwable migrationException = null;
for (Future<?> future : migrationFutures) {
try {
future.get();
} catch (InterruptedException ie) {
logger.warn(INTERNAL_ERROR, "unknown error in registry module", "", "Interrupted while waiting for migration async task to finish.");
} catch (ExecutionException ee) {
migrationException = ee.getCause();
}
}
if (migrationException != null) {
logger.error(INTERNAL_ERROR, "unknown error in registry module", "", "Migration async task failed.", migrationException);
executorService = Executors.newFixedThreadPool(Math.min(handlers.size(), 100), new NamedThreadFactory("Dubbo-Invoker-Migrate"));
List<Future<?>> migrationFutures = new ArrayList<>(handlers.size());
for (MigrationRuleHandler<?> handler : handlers.values()) {
Future<?> future = executorService.submit(() -> handler.doMigrate(this.rule));
migrationFutures.add(future);
}

for (Future<?> future : migrationFutures) {
try {
future.get();
} catch (InterruptedException ie) {
logger.warn(INTERNAL_ERROR, "unknown error in registry module", "", "Interrupted while waiting for migration async task to finish.");
} catch (ExecutionException ee) {
logger.error(INTERNAL_ERROR, "unknown error in registry module", "", "Migration async task failed.", ee.getCause());
}
executorService.shutdown();
}
} catch (Throwable t) {
logger.error(INTERNAL_ERROR, "unknown error in registry module", "", "Error occurred when migration.", t);
} finally {
if (executorService != null) {
executorService.shutdown();
}
}
}
});
}

}

public void setRawRule(String rawRule) {
Expand Down Expand Up @@ -260,13 +260,11 @@ public void onDestroy() {
if (localRuleMigrationFuture != null) {
localRuleMigrationFuture.cancel(true);
}
if (ruleManageExecutor != null) {
ruleManageExecutor.shutdown();
}
ruleManageExecutor.shutdown();
ruleQueue.clear();
}

public Map<MigrationInvoker, MigrationRuleHandler> getHandlers() {
public Map<MigrationInvoker<?>, MigrationRuleHandler<?>> getHandlers() {
return handlers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

class DefaultMigrationAddressComparatorTest {

@SuppressWarnings("all")
@Test
void test() {
DefaultMigrationAddressComparator comparator = new DefaultMigrationAddressComparator();
Expand Down Expand Up @@ -69,13 +70,13 @@ void test() {

Mockito.when(oldInvoker.hasProxyInvokers()).thenReturn(true);

List<Invoker> newInvokerList = new LinkedList<>();
List<Invoker<?>> newInvokerList = new LinkedList<>();
newInvokerList.add(Mockito.mock(Invoker.class));
newInvokerList.add(Mockito.mock(Invoker.class));
newInvokerList.add(Mockito.mock(Invoker.class));
Mockito.when(newDirectory.getAllInvokers()).thenReturn(newInvokerList);

List<Invoker> oldInvokerList = new LinkedList<>();
List<Invoker<?>> oldInvokerList = new LinkedList<>();
oldInvokerList.add(Mockito.mock(Invoker.class));
oldInvokerList.add(Mockito.mock(Invoker.class));
Mockito.when(oldDirectory.getAllInvokers()).thenReturn(oldInvokerList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void after() {
FrameworkModel.destroyAll();
}

@SuppressWarnings("all")
@Test
void test() {
RegistryProtocol registryProtocol = Mockito.mock(RegistryProtocol.class);
Expand All @@ -73,10 +74,10 @@ void test() {
Mockito.when(invoker.hasProxyInvokers()).thenReturn(true);
Mockito.when(serviceDiscoveryInvoker.hasProxyInvokers()).thenReturn(true);

List<Invoker> invokers = new LinkedList<>();
List<Invoker<?>> invokers = new LinkedList<>();
invokers.add(Mockito.mock(Invoker.class));
invokers.add(Mockito.mock(Invoker.class));
List<Invoker> serviceDiscoveryInvokers = new LinkedList<>();
List<Invoker<?>> serviceDiscoveryInvokers = new LinkedList<>();
serviceDiscoveryInvokers.add(Mockito.mock(Invoker.class));
serviceDiscoveryInvokers.add(Mockito.mock(Invoker.class));
Mockito.when(directory.getAllInvokers()).thenReturn(invokers);
Expand All @@ -96,7 +97,7 @@ void test() {
Mockito.when(invoker.getUrl()).thenReturn(consumerURL);
Mockito.when(serviceDiscoveryInvoker.getUrl()).thenReturn(consumerURL);

MigrationInvoker migrationInvoker = new MigrationInvoker(registryProtocol, null, null, DemoService.class, null, consumerURL);
MigrationInvoker<?> migrationInvoker = new MigrationInvoker<>(registryProtocol, null, null, DemoService.class, null, consumerURL);

MigrationRule migrationRule = Mockito.mock(MigrationRule.class);
Mockito.when(migrationRule.getForce(Mockito.any())).thenReturn(true);
Expand Down Expand Up @@ -225,6 +226,7 @@ void test() {
Assertions.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 2000);
}

@SuppressWarnings("all")
@Test
void testDecide() {
RegistryProtocol registryProtocol = Mockito.mock(RegistryProtocol.class);
Expand All @@ -244,10 +246,10 @@ void testDecide() {
Mockito.when(invoker.hasProxyInvokers()).thenReturn(true);
Mockito.when(serviceDiscoveryInvoker.hasProxyInvokers()).thenReturn(true);

List<Invoker> invokers = new LinkedList<>();
List<Invoker<?>> invokers = new LinkedList<>();
invokers.add(Mockito.mock(Invoker.class));
invokers.add(Mockito.mock(Invoker.class));
List<Invoker> serviceDiscoveryInvokers = new LinkedList<>();
List<Invoker<?>> serviceDiscoveryInvokers = new LinkedList<>();
serviceDiscoveryInvokers.add(Mockito.mock(Invoker.class));
serviceDiscoveryInvokers.add(Mockito.mock(Invoker.class));
Mockito.when(directory.getAllInvokers()).thenReturn(invokers);
Expand All @@ -267,7 +269,7 @@ void testDecide() {
Mockito.when(invoker.getUrl()).thenReturn(consumerURL);
Mockito.when(serviceDiscoveryInvoker.getUrl()).thenReturn(consumerURL);

MigrationInvoker migrationInvoker = new MigrationInvoker(registryProtocol, null, null, DemoService.class, null, consumerURL);
MigrationInvoker<?> migrationInvoker = new MigrationInvoker<>(registryProtocol, null, null, DemoService.class, null, consumerURL);

MigrationRule migrationRule = Mockito.mock(MigrationRule.class);
Mockito.when(migrationRule.getForce(Mockito.any())).thenReturn(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
class MigrationRuleHandlerTest {
@Test
void test() {
MigrationClusterInvoker invoker = Mockito.mock(MigrationClusterInvoker.class);
MigrationClusterInvoker<?> invoker = Mockito.mock(MigrationClusterInvoker.class);
URL url = Mockito.mock(URL.class);
Mockito.when(url.getDisplayServiceKey()).thenReturn("test");
Mockito.when(url.getParameter(Mockito.any(), (String) Mockito.any())).thenAnswer(i->i.getArgument(1));
Mockito.when(url.getOrDefaultApplicationModel()).thenReturn(ApplicationModel.defaultModel());
MigrationRuleHandler handler = new MigrationRuleHandler(invoker, url);
MigrationRuleHandler<?> handler = new MigrationRuleHandler<>(invoker, url);

Mockito.when(invoker.migrateToForceApplicationInvoker(Mockito.any())).thenReturn(true);
Mockito.when(invoker.migrateToForceInterfaceInvoker(Mockito.any())).thenReturn(true);
Expand Down Expand Up @@ -61,7 +61,7 @@ void test() {
testMigrationWithStepUnchanged(rule, url, handler, invoker);
}

private void testMigrationFailed(MigrationRule rule, URL url, MigrationRuleHandler handler, MigrationClusterInvoker invoker) {
private void testMigrationFailed(MigrationRule rule, URL url, MigrationRuleHandler<?> handler, MigrationClusterInvoker<?> invoker) {
Assertions.assertEquals(MigrationStep.FORCE_INTERFACE, handler.getMigrationStep());

Mockito.when(invoker.migrateToForceApplicationInvoker(Mockito.any())).thenReturn(false);
Expand All @@ -72,7 +72,7 @@ private void testMigrationFailed(MigrationRule rule, URL url, MigrationRuleHandl
Assertions.assertEquals(MigrationStep.FORCE_INTERFACE, handler.getMigrationStep());
}

private void testMigrationWithStepUnchanged(MigrationRule rule, URL url, MigrationRuleHandler handler, MigrationClusterInvoker invoker) {
private void testMigrationWithStepUnchanged(MigrationRule rule, URL url, MigrationRuleHandler<?> handler, MigrationClusterInvoker<?> invoker) {
// set the same as
Mockito.when(rule.getStep(url)).thenReturn(handler.getMigrationStep());
handler.doMigrate(rule);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@

import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_TYPE_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class MigrationRuleTest {
private static ServiceNameMapping mapping = mock(ServiceNameMapping.class);
private static final ServiceNameMapping mapping = mock(ServiceNameMapping.class);

@Test
void test_parse() {
Expand Down Expand Up @@ -87,14 +89,14 @@ void test_parse() {
assertEquals(0.5f, migrationRule.getThreshold(url));
assertEquals(30, migrationRule.getProportion(url));
assertEquals(30, migrationRule.getDelay(url));
assertEquals(true, migrationRule.getForce(url));
assertTrue(migrationRule.getForce(url));
assertEquals(MigrationStep.APPLICATION_FIRST, migrationRule.getStep(url));

Mockito.when(url.getDisplayServiceKey()).thenReturn("GreetingService:1.0.0");
assertEquals(1.0f, migrationRule.getThreshold(url));
assertEquals(60, migrationRule.getProportion(url));
assertEquals(60, migrationRule.getDelay(url));
assertEquals(false, migrationRule.getForce(url));
assertFalse(migrationRule.getForce(url));
assertEquals(MigrationStep.FORCE_APPLICATION, migrationRule.getStep(url));

Mockito.when(url.getDisplayServiceKey()).thenReturn("GreetingService:1.0.1");
Expand All @@ -107,7 +109,7 @@ void test_parse() {
assertEquals(0.3f, migrationRule.getThreshold(url));
assertEquals(20, migrationRule.getProportion(url));
assertEquals(10, migrationRule.getDelay(url));
assertEquals(false, migrationRule.getForce(url));
assertFalse(migrationRule.getForce(url));
assertEquals(MigrationStep.FORCE_INTERFACE, migrationRule.getStep(url));
when(mapping.getMapping(any(URL.class))).thenReturn(Collections.emptySet());

Expand Down

0 comments on commit 357899d

Please sign in to comment.