diff --git a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/ThreadPoolMetricsSampler.java b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/ThreadPoolMetricsSampler.java index ec4a74b98de..9b6c9b4710a 100644 --- a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/ThreadPoolMetricsSampler.java +++ b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/ThreadPoolMetricsSampler.java @@ -56,6 +56,7 @@ public class ThreadPoolMetricsSampler implements MetricsSampler { private DataStore dataStore; private final Map sampleThreadPoolExecutor = new ConcurrentHashMap<>(); private final ConcurrentHashMap threadPoolMetricMap = new ConcurrentHashMap<>(); + public ThreadPoolMetricsSampler(DefaultMetricsCollector collector) { this.collector = collector; } @@ -87,23 +88,29 @@ private List createMetricsSample(String name, ThreadPoolExecutor e list.add(new GaugeMetricSample<>(MetricsKey.THREAD_POOL_ACTIVE_SIZE, threadPoolMetric.getTags(), THREAD_POOL, threadPoolMetric, ThreadPoolMetric::getActiveCount)); list.add(new GaugeMetricSample<>(MetricsKey.THREAD_POOL_THREAD_COUNT, threadPoolMetric.getTags(), THREAD_POOL, threadPoolMetric, ThreadPoolMetric::getPoolSize)); list.add(new GaugeMetricSample<>(MetricsKey.THREAD_POOL_QUEUE_SIZE, threadPoolMetric.getTags(), THREAD_POOL, threadPoolMetric, ThreadPoolMetric::getQueueSize)); - return list; } - public void registryDefaultSampleThreadPoolExecutor() { ApplicationModel applicationModel = collector.getApplicationModel(); if (applicationModel == null) { return; } - try { - if (this.frameworkExecutorRepository == null) { - this.frameworkExecutorRepository = collector.getApplicationModel().getBeanFactory() - .getBean(FrameworkExecutorRepository.class); + addRpcExecutors(); + addFrameworkExecutors(); + addExecutorRejectMetrics(); + } + + private void addExecutorRejectMetrics() { + ThreadRejectMetricsCountSampler threadRejectMetricsCountSampler = new ThreadRejectMetricsCountSampler(collector); + this.sampleThreadPoolExecutor.entrySet().stream().filter(entry -> entry.getKey().startsWith(SERVER_THREAD_POOL_NAME)).forEach(entry -> { + if (entry.getValue().getRejectedExecutionHandler() instanceof AbortPolicyWithReport) { + MetricThreadPoolExhaustedListener metricThreadPoolExhaustedListener = new MetricThreadPoolExhaustedListener(entry.getKey(), threadRejectMetricsCountSampler); + ((AbortPolicyWithReport) entry.getValue().getRejectedExecutionHandler()).addThreadPoolExhaustedEventListener(metricThreadPoolExhaustedListener); } - } catch (Exception ex) { - logger.warn(COMMON_METRICS_COLLECTOR_EXCEPTION, "", "", "ThreadPoolMetricsSampler! frameworkExecutorRepository non-init"); - } + }); + } + + private void addRpcExecutors() { if (this.dataStore == null) { this.dataStore = collector.getApplicationModel().getExtensionLoader(DataStore.class).getDefaultExtension(); } @@ -113,7 +120,7 @@ public void registryDefaultSampleThreadPoolExecutor() { for (Map.Entry entry : executors.entrySet()) { ExecutorService executor = (ExecutorService) entry.getValue(); if (executor instanceof ThreadPoolExecutor) { - this.addExecutors( SERVER_THREAD_POOL_NAME + "-" + entry.getKey(), executor); + this.addExecutors(SERVER_THREAD_POOL_NAME + "-" + entry.getKey(), executor); } } executors = dataStore.get(CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY); @@ -123,18 +130,29 @@ public void registryDefaultSampleThreadPoolExecutor() { this.addExecutors(CLIENT_THREAD_POOL_NAME + "-" + entry.getKey(), executor); } } + } + } - ThreadRejectMetricsCountSampler threadRejectMetricsCountSampler = new ThreadRejectMetricsCountSampler(collector); - this.sampleThreadPoolExecutor.entrySet().stream().filter(entry->entry.getKey().startsWith(SERVER_THREAD_POOL_NAME)).forEach(entry->{ - if(entry.getValue().getRejectedExecutionHandler() instanceof AbortPolicyWithReport) { - MetricThreadPoolExhaustedListener metricThreadPoolExhaustedListener=new MetricThreadPoolExhaustedListener(entry.getKey(),threadRejectMetricsCountSampler); - ((AbortPolicyWithReport) entry.getValue().getRejectedExecutionHandler()).addThreadPoolExhaustedEventListener(metricThreadPoolExhaustedListener); - } - }); + private void addFrameworkExecutors() { + try { + if (this.frameworkExecutorRepository == null) { + this.frameworkExecutorRepository = collector.getApplicationModel().getBeanFactory() + .getBean(FrameworkExecutorRepository.class); + } + } catch (Exception ex) { + logger.warn(COMMON_METRICS_COLLECTOR_EXCEPTION, "", "", "ThreadPoolMetricsSampler! frameworkExecutorRepository non-init"); } - if (this.frameworkExecutorRepository != null) { - this.addExecutors("sharedExecutor", frameworkExecutorRepository.getSharedExecutor()); + if (this.frameworkExecutorRepository == null) { + return; } + this.addExecutors("poolRouterExecutor", frameworkExecutorRepository.getPoolRouterExecutor()); + this.addExecutors("metadataRetryExecutor", frameworkExecutorRepository.getMetadataRetryExecutor()); + this.addExecutors("internalServiceExecutor", frameworkExecutorRepository.getInternalServiceExecutor()); + this.addExecutors("connectivityScheduledExecutor", frameworkExecutorRepository.getConnectivityScheduledExecutor()); + this.addExecutors("cacheRefreshingScheduledExecutor", frameworkExecutorRepository.getCacheRefreshingScheduledExecutor()); + this.addExecutors("sharedExecutor", frameworkExecutorRepository.getSharedExecutor()); + this.addExecutors("sharedScheduledExecutor", frameworkExecutorRepository.getSharedScheduledExecutor()); + this.addExecutors("mappingRefreshingExecutor", frameworkExecutorRepository.getMappingRefreshingExecutor()); } - } + diff --git a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/sample/ThreadPoolMetricsSamplerTest.java b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/sample/ThreadPoolMetricsSamplerTest.java index 2c8e2f984d8..dab94857717 100644 --- a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/sample/ThreadPoolMetricsSamplerTest.java +++ b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/sample/ThreadPoolMetricsSamplerTest.java @@ -169,7 +169,7 @@ public void testRegistryDefaultSampleThreadPoolExecutor() throws NoSuchFieldExce f.setAccessible(true); Map executors = (Map) f.get(sampler2); - Assertions.assertEquals(3, executors.size()); + Assertions.assertEquals(8, executors.size()); Assertions.assertTrue(executors.containsKey("DubboServerHandler-server1")); Assertions.assertTrue(executors.containsKey("DubboClientHandler-client1")); Assertions.assertTrue(executors.containsKey("sharedExecutor"));