diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java index b02f5209ec48..72a5b590e4af 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java @@ -42,7 +42,7 @@ public class DefaultMigrationAddressComparator implements MigrationAddressCompar public static final String OLD_ADDRESS_SIZE = "OLD_ADDRESS_SIZE"; public static final String NEW_ADDRESS_SIZE = "NEW_ADDRESS_SIZE"; - private ConcurrentMap> serviceMigrationData = new ConcurrentHashMap<>(); + private final ConcurrentMap> serviceMigrationData = new ConcurrentHashMap<>(); @Override public boolean shouldMigrate(ClusterInvoker newInvoker, ClusterInvoker oldInvoker, MigrationRule rule) { @@ -90,10 +90,7 @@ public boolean shouldMigrate(ClusterInvoker newInvoker, ClusterInvoker return false; } - if (((float) newAddressSize / (float) oldAddressSize) >= threshold) { - return true; - } - return false; + return ((float) newAddressSize / (float) oldAddressSize) >= threshold; } private int getAddressSize(ClusterInvoker invoker) { @@ -115,6 +112,4 @@ private String getInvokerType(ClusterInvoker invoker) { } return "interface"; } - - } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java index 9fbe66fa0afc..d9176f4c43e6 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java @@ -49,17 +49,17 @@ import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY; public class MigrationInvoker implements MigrationClusterInvoker { - private ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(MigrationInvoker.class); + private final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(MigrationInvoker.class); private URL url; - private URL consumerUrl; - private Cluster cluster; - private Registry registry; - private Class type; - private RegistryProtocol registryProtocol; + private final URL consumerUrl; + private final Cluster cluster; + private final Registry registry; + private final Class type; + private final RegistryProtocol registryProtocol; private MigrationRuleListener migrationRuleListener; - private ConsumerModel consumerModel; - private FrameworkStatusReportService reportService; + private final ConsumerModel consumerModel; + private final FrameworkStatusReportService reportService; private volatile ClusterInvoker invoker; private volatile ClusterInvoker serviceDiscoveryInvoker; @@ -77,6 +77,7 @@ public MigrationInvoker(RegistryProtocol registryProtocol, this(null, null, registryProtocol, cluster, registry, type, url, consumerUrl); } + @SuppressWarnings("unchecked") public MigrationInvoker(ClusterInvoker invoker, ClusterInvoker serviceDiscoveryInvoker, RegistryProtocol registryProtocol, @@ -249,6 +250,7 @@ public void migrateToApplicationFirstInvoker(MigrationRule newRule) { calcPreferredInvoker(newRule); } + @SuppressWarnings("all") private void waitAddressNotify(MigrationRule newRule, CountDownLatch latch) { // wait and compare threshold int delay = newRule.getDelay(consumerUrl); @@ -317,6 +319,7 @@ public boolean isAvailable() { : (invoker != null && invoker.isAvailable()) || (serviceDiscoveryInvoker != null && serviceDiscoveryInvoker.isAvailable()); } + @SuppressWarnings("unchecked") @Override public void destroy() { if (migrationRuleListener != null) { diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java index 334a180a4387..920e1d65c28a 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java @@ -30,10 +30,10 @@ public class MigrationRuleHandler { public static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "dubbo.application.migration.step"; private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(MigrationRuleHandler.class); - private MigrationClusterInvoker migrationInvoker; - private MigrationStep currentStep; - private Float currentThreshold = 0f; - private URL consumerURL; + private final MigrationClusterInvoker migrationInvoker; + private volatile MigrationStep currentStep; + private volatile Float currentThreshold = 0f; + private final URL consumerURL; public MigrationRuleHandler(MigrationClusterInvoker invoker, URL url) { this.migrationInvoker = invoker; @@ -115,12 +115,6 @@ private void setMigrationRule(MigrationRule rule) { this.migrationInvoker.setMigrationRule(rule); } - private Float getMigrationThreshold(MigrationRule rule, Float threshold) { - Float configuredThreshold = rule.getThreshold(consumerURL); - threshold = configuredThreshold == null ? threshold : configuredThreshold; - return threshold; - } - private void setCurrentStepAndThreshold(MigrationStep currentStep, Float currentThreshold) { if (currentThreshold != null) { this.currentThreshold = currentThreshold; diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java index bc5c5a487623..02e877ca8993 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java @@ -51,10 +51,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_THREAD_INTERRUPTED_EXCEPTION; import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_PROPERTY_TYPE_MISMATCH; -import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_EMPTY_ADDRESS; +import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_THREAD_INTERRUPTED_EXCEPTION; import static org.apache.dubbo.common.constants.LoggerCodeConstants.INTERNAL_ERROR; +import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_EMPTY_ADDRESS; import static org.apache.dubbo.common.constants.RegistryConstants.INIT; /** @@ -75,7 +75,7 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur private static final int MIGRATION_DEFAULT_DELAY_TIME = 60000; private String ruleKey; - protected final ConcurrentMap handlers = new ConcurrentHashMap<>(); + protected final ConcurrentMap, MigrationRuleHandler> handlers = new ConcurrentHashMap<>(); protected final LinkedBlockingQueue ruleQueue = new LinkedBlockingQueue<>(); private final AtomicBoolean executorSubmit = new AtomicBoolean(false); @@ -88,7 +88,7 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur private volatile String rawRule; private volatile MigrationRule rule; - private ModuleModel moduleModel; + private final ModuleModel moduleModel; public MigrationRuleListener(ModuleModel moduleModel) { this.moduleModel = moduleModel; @@ -175,44 +175,44 @@ public synchronized void process(ConfigChangedEvent event) { logger.info("Ignore duplicated rule"); continue; } + + logger.info("Using the following migration rule to migrate:"); + logger.info(rule); + + setRawRule(rule); + + if (CollectionUtils.isEmptyMap(handlers)) { + continue; + } + + ExecutorService executorService = null; try { - logger.info("Using the following migration rule to migrate:"); - logger.info(rule); - - setRawRule(rule); - - if (CollectionUtils.isNotEmptyMap(handlers)) { - ExecutorService executorService = Executors.newFixedThreadPool(100, new NamedThreadFactory("Dubbo-Invoker-Migrate")); - List> migrationFutures = new ArrayList<>(handlers.size()); - handlers.forEach((_key, handler) -> { - Future future = executorService.submit(() -> { - handler.doMigrate(this.rule); - }); - migrationFutures.add(future); - }); - - Throwable migrationException = null; - for (Future future : migrationFutures) { - try { - future.get(); - } catch (InterruptedException ie) { - logger.warn(INTERNAL_ERROR, "unknown error in registry module", "", "Interrupted while waiting for migration async task to finish."); - } catch (ExecutionException ee) { - migrationException = ee.getCause(); - } - } - if (migrationException != null) { - logger.error(INTERNAL_ERROR, "unknown error in registry module", "", "Migration async task failed.", migrationException); + executorService = Executors.newFixedThreadPool(Math.min(handlers.size(), 100), new NamedThreadFactory("Dubbo-Invoker-Migrate")); + List> migrationFutures = new ArrayList<>(handlers.size()); + for (MigrationRuleHandler handler : handlers.values()) { + Future future = executorService.submit(() -> handler.doMigrate(this.rule)); + migrationFutures.add(future); + } + + for (Future future : migrationFutures) { + try { + future.get(); + } catch (InterruptedException ie) { + logger.warn(INTERNAL_ERROR, "unknown error in registry module", "", "Interrupted while waiting for migration async task to finish."); + } catch (ExecutionException ee) { + logger.error(INTERNAL_ERROR, "unknown error in registry module", "", "Migration async task failed.", ee.getCause()); } - executorService.shutdown(); } } catch (Throwable t) { logger.error(INTERNAL_ERROR, "unknown error in registry module", "", "Error occurred when migration.", t); + } finally { + if (executorService != null) { + executorService.shutdown(); + } } } }); } - } public void setRawRule(String rawRule) { @@ -260,13 +260,11 @@ public void onDestroy() { if (localRuleMigrationFuture != null) { localRuleMigrationFuture.cancel(true); } - if (ruleManageExecutor != null) { - ruleManageExecutor.shutdown(); - } + ruleManageExecutor.shutdown(); ruleQueue.clear(); } - public Map getHandlers() { + public Map, MigrationRuleHandler> getHandlers() { return handlers; } diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparatorTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparatorTest.java index b6c43baa2c0e..f97f0b0c1a31 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparatorTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparatorTest.java @@ -35,6 +35,7 @@ class DefaultMigrationAddressComparatorTest { + @SuppressWarnings("all") @Test void test() { DefaultMigrationAddressComparator comparator = new DefaultMigrationAddressComparator(); @@ -69,13 +70,13 @@ void test() { Mockito.when(oldInvoker.hasProxyInvokers()).thenReturn(true); - List newInvokerList = new LinkedList<>(); + List> newInvokerList = new LinkedList<>(); newInvokerList.add(Mockito.mock(Invoker.class)); newInvokerList.add(Mockito.mock(Invoker.class)); newInvokerList.add(Mockito.mock(Invoker.class)); Mockito.when(newDirectory.getAllInvokers()).thenReturn(newInvokerList); - List oldInvokerList = new LinkedList<>(); + List> oldInvokerList = new LinkedList<>(); oldInvokerList.add(Mockito.mock(Invoker.class)); oldInvokerList.add(Mockito.mock(Invoker.class)); Mockito.when(oldDirectory.getAllInvokers()).thenReturn(oldInvokerList); diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationInvokerTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationInvokerTest.java index 5bed016013bf..fd5cf5d11596 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationInvokerTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationInvokerTest.java @@ -54,6 +54,7 @@ public void after() { FrameworkModel.destroyAll(); } + @SuppressWarnings("all") @Test void test() { RegistryProtocol registryProtocol = Mockito.mock(RegistryProtocol.class); @@ -73,10 +74,10 @@ void test() { Mockito.when(invoker.hasProxyInvokers()).thenReturn(true); Mockito.when(serviceDiscoveryInvoker.hasProxyInvokers()).thenReturn(true); - List invokers = new LinkedList<>(); + List> invokers = new LinkedList<>(); invokers.add(Mockito.mock(Invoker.class)); invokers.add(Mockito.mock(Invoker.class)); - List serviceDiscoveryInvokers = new LinkedList<>(); + List> serviceDiscoveryInvokers = new LinkedList<>(); serviceDiscoveryInvokers.add(Mockito.mock(Invoker.class)); serviceDiscoveryInvokers.add(Mockito.mock(Invoker.class)); Mockito.when(directory.getAllInvokers()).thenReturn(invokers); @@ -96,7 +97,7 @@ void test() { Mockito.when(invoker.getUrl()).thenReturn(consumerURL); Mockito.when(serviceDiscoveryInvoker.getUrl()).thenReturn(consumerURL); - MigrationInvoker migrationInvoker = new MigrationInvoker(registryProtocol, null, null, DemoService.class, null, consumerURL); + MigrationInvoker migrationInvoker = new MigrationInvoker<>(registryProtocol, null, null, DemoService.class, null, consumerURL); MigrationRule migrationRule = Mockito.mock(MigrationRule.class); Mockito.when(migrationRule.getForce(Mockito.any())).thenReturn(true); @@ -225,6 +226,7 @@ void test() { Assertions.assertTrue(System.currentTimeMillis() - currentTimeMillis >= 2000); } + @SuppressWarnings("all") @Test void testDecide() { RegistryProtocol registryProtocol = Mockito.mock(RegistryProtocol.class); @@ -244,10 +246,10 @@ void testDecide() { Mockito.when(invoker.hasProxyInvokers()).thenReturn(true); Mockito.when(serviceDiscoveryInvoker.hasProxyInvokers()).thenReturn(true); - List invokers = new LinkedList<>(); + List> invokers = new LinkedList<>(); invokers.add(Mockito.mock(Invoker.class)); invokers.add(Mockito.mock(Invoker.class)); - List serviceDiscoveryInvokers = new LinkedList<>(); + List> serviceDiscoveryInvokers = new LinkedList<>(); serviceDiscoveryInvokers.add(Mockito.mock(Invoker.class)); serviceDiscoveryInvokers.add(Mockito.mock(Invoker.class)); Mockito.when(directory.getAllInvokers()).thenReturn(invokers); @@ -267,7 +269,7 @@ void testDecide() { Mockito.when(invoker.getUrl()).thenReturn(consumerURL); Mockito.when(serviceDiscoveryInvoker.getUrl()).thenReturn(consumerURL); - MigrationInvoker migrationInvoker = new MigrationInvoker(registryProtocol, null, null, DemoService.class, null, consumerURL); + MigrationInvoker migrationInvoker = new MigrationInvoker<>(registryProtocol, null, null, DemoService.class, null, consumerURL); MigrationRule migrationRule = Mockito.mock(MigrationRule.class); Mockito.when(migrationRule.getForce(Mockito.any())).thenReturn(true); diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandlerTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandlerTest.java index a98a4f438546..318da69cbe31 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandlerTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandlerTest.java @@ -28,12 +28,12 @@ class MigrationRuleHandlerTest { @Test void test() { - MigrationClusterInvoker invoker = Mockito.mock(MigrationClusterInvoker.class); + MigrationClusterInvoker invoker = Mockito.mock(MigrationClusterInvoker.class); URL url = Mockito.mock(URL.class); Mockito.when(url.getDisplayServiceKey()).thenReturn("test"); Mockito.when(url.getParameter(Mockito.any(), (String) Mockito.any())).thenAnswer(i->i.getArgument(1)); Mockito.when(url.getOrDefaultApplicationModel()).thenReturn(ApplicationModel.defaultModel()); - MigrationRuleHandler handler = new MigrationRuleHandler(invoker, url); + MigrationRuleHandler handler = new MigrationRuleHandler<>(invoker, url); Mockito.when(invoker.migrateToForceApplicationInvoker(Mockito.any())).thenReturn(true); Mockito.when(invoker.migrateToForceInterfaceInvoker(Mockito.any())).thenReturn(true); @@ -61,7 +61,7 @@ void test() { testMigrationWithStepUnchanged(rule, url, handler, invoker); } - private void testMigrationFailed(MigrationRule rule, URL url, MigrationRuleHandler handler, MigrationClusterInvoker invoker) { + private void testMigrationFailed(MigrationRule rule, URL url, MigrationRuleHandler handler, MigrationClusterInvoker invoker) { Assertions.assertEquals(MigrationStep.FORCE_INTERFACE, handler.getMigrationStep()); Mockito.when(invoker.migrateToForceApplicationInvoker(Mockito.any())).thenReturn(false); @@ -72,7 +72,7 @@ private void testMigrationFailed(MigrationRule rule, URL url, MigrationRuleHandl Assertions.assertEquals(MigrationStep.FORCE_INTERFACE, handler.getMigrationStep()); } - private void testMigrationWithStepUnchanged(MigrationRule rule, URL url, MigrationRuleHandler handler, MigrationClusterInvoker invoker) { + private void testMigrationWithStepUnchanged(MigrationRule rule, URL url, MigrationRuleHandler handler, MigrationClusterInvoker invoker) { // set the same as Mockito.when(rule.getStep(url)).thenReturn(handler.getMigrationStep()); handler.doMigrate(rule); diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/model/MigrationRuleTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/model/MigrationRuleTest.java index 79b20de724e0..e801688f7db8 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/model/MigrationRuleTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/model/MigrationRuleTest.java @@ -30,13 +30,15 @@ import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_TYPE_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; class MigrationRuleTest { - private static ServiceNameMapping mapping = mock(ServiceNameMapping.class); + private static final ServiceNameMapping mapping = mock(ServiceNameMapping.class); @Test void test_parse() { @@ -87,14 +89,14 @@ void test_parse() { assertEquals(0.5f, migrationRule.getThreshold(url)); assertEquals(30, migrationRule.getProportion(url)); assertEquals(30, migrationRule.getDelay(url)); - assertEquals(true, migrationRule.getForce(url)); + assertTrue(migrationRule.getForce(url)); assertEquals(MigrationStep.APPLICATION_FIRST, migrationRule.getStep(url)); Mockito.when(url.getDisplayServiceKey()).thenReturn("GreetingService:1.0.0"); assertEquals(1.0f, migrationRule.getThreshold(url)); assertEquals(60, migrationRule.getProportion(url)); assertEquals(60, migrationRule.getDelay(url)); - assertEquals(false, migrationRule.getForce(url)); + assertFalse(migrationRule.getForce(url)); assertEquals(MigrationStep.FORCE_APPLICATION, migrationRule.getStep(url)); Mockito.when(url.getDisplayServiceKey()).thenReturn("GreetingService:1.0.1"); @@ -107,7 +109,7 @@ void test_parse() { assertEquals(0.3f, migrationRule.getThreshold(url)); assertEquals(20, migrationRule.getProportion(url)); assertEquals(10, migrationRule.getDelay(url)); - assertEquals(false, migrationRule.getForce(url)); + assertFalse(migrationRule.getForce(url)); assertEquals(MigrationStep.FORCE_INTERFACE, migrationRule.getStep(url)); when(mapping.getMapping(any(URL.class))).thenReturn(Collections.emptySet());