From 8982c55b6da9499d73c703d3d75dbb73b4af7970 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 16 Jan 2024 22:23:17 +0800 Subject: [PATCH] Use DefaultUncaughtExceptionHandler to log the uncached exception --- .../dolphinscheduler/alert/AlertServer.java | 4 ++ .../alert/metrics/AlertServerMetrics.java | 6 ++ .../api/ApiApplicationServer.java | 4 ++ .../api/metrics/ApiServerMetrics.java | 8 +++ .../common/thread/BaseDaemonThread.java | 2 + .../DefaultUncaughtExceptionHandler.java | 47 ++++++++++++++ .../common/thread/ThreadUtils.java | 20 +++--- .../common/thread/ThreadUtilsTest.java | 37 +++++++++++ .../extract/base/NettyClientHandler.java | 2 +- .../extract/base/NettyRemotingClient.java | 16 ++--- .../extract/base/NettyRemotingServer.java | 17 ++---- .../base/server/JdkDynamicServerHandler.java | 2 +- .../base/utils/NamedThreadFactory.java | 61 ------------------- .../server/master/MasterServer.java | 4 ++ .../master/metrics/MasterServerMetrics.java | 6 ++ .../master/registry/ServerNodeManager.java | 6 +- .../AsyncMasterTaskDelayQueueLooper.java | 2 +- .../MasterAsyncTaskExecutorThreadPool.java | 2 +- .../MasterSyncTaskExecutorThreadPool.java | 2 +- .../task/api/AbstractCommandExecutor.java | 2 +- .../server/worker/WorkerServer.java | 3 + .../worker/metrics/WorkerServerMetrics.java | 6 ++ .../runner/WorkerTaskExecutorThreadPool.java | 4 +- 23 files changed, 159 insertions(+), 104 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/DefaultUncaughtExceptionHandler.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/thread/ThreadUtilsTest.java delete mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/NamedThreadFactory.java diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java index aeb2baf7a22ae..fd3d4b02e34a1 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.alert; +import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics; import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient; import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer; @@ -24,6 +25,7 @@ import org.apache.dolphinscheduler.alert.service.ListenerEventPostService; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; +import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import javax.annotation.PreDestroy; @@ -54,6 +56,8 @@ public class AlertServer { private AlertRegistryClient alertRegistryClient; public static void main(String[] args) { + AlertServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount); + Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance()); Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER); new SpringApplicationBuilder(AlertServer.class).run(args); } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java index 4784aa3b62b2e..ed75e4671aa5d 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java @@ -45,6 +45,12 @@ public void registerPendingAlertGauge(final Supplier supplier) { .register(Metrics.globalRegistry); } + public static void registerUncachedException(final Supplier supplier) { + Gauge.builder("ds.master.uncached.exception", supplier) + .description("number of uncached exception") + .register(Metrics.globalRegistry); + } + public void incAlertSuccessCount() { alertSuccessCounter.increment(); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java index 2bb9b7cf519f5..c7e6d9778f606 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java @@ -17,7 +17,9 @@ package org.apache.dolphinscheduler.api; +import org.apache.dolphinscheduler.api.metrics.ApiServerMetrics; import org.apache.dolphinscheduler.common.enums.PluginType; +import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler; import org.apache.dolphinscheduler.dao.PluginDao; import org.apache.dolphinscheduler.dao.entity.PluginDefine; import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; @@ -51,6 +53,8 @@ public class ApiApplicationServer { private PluginDao pluginDao; public static void main(String[] args) { + ApiServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount); + Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance()); SpringApplication.run(ApiApplicationServer.class); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java index 08f114dc2edad..6135a5ec2f21a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java @@ -18,10 +18,12 @@ package org.apache.dolphinscheduler.api.metrics; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import lombok.experimental.UtilityClass; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; @@ -120,4 +122,10 @@ public void cleanUpApiResponseTimeMetricsByUserId(final int userId) { "ds.api.response.time", "user.id", String.valueOf(userId))); } + + public static void registerUncachedException(final Supplier supplier) { + Gauge.builder("ds.master.uncached.exception", supplier) + .description("number of uncached exception") + .register(Metrics.globalRegistry); + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java index 88a44004cbeab..b495bb7c562e6 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java @@ -25,12 +25,14 @@ public abstract class BaseDaemonThread extends Thread { protected BaseDaemonThread(Runnable runnable) { super(runnable); this.setDaemon(true); + this.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance()); } protected BaseDaemonThread(String threadName) { super(); this.setName(threadName); this.setDaemon(true); + this.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance()); } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/DefaultUncaughtExceptionHandler.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/DefaultUncaughtExceptionHandler.java new file mode 100644 index 0000000000000..62a0fd6911810 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/DefaultUncaughtExceptionHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.thread; + +import java.util.concurrent.atomic.LongAdder; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class DefaultUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + + private static final DefaultUncaughtExceptionHandler INSTANCE = new DefaultUncaughtExceptionHandler(); + + private static final LongAdder uncaughtExceptionCount = new LongAdder(); + + private DefaultUncaughtExceptionHandler() { + } + + public static DefaultUncaughtExceptionHandler getInstance() { + return INSTANCE; + } + + public static long getUncaughtExceptionCount() { + return uncaughtExceptionCount.longValue(); + } + + @Override + public void uncaughtException(Thread t, Throwable e) { + uncaughtExceptionCount.add(1); + log.error("Caught an exception in {}.", t, e); + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java index 5eef04ed82b8e..c4271919b5da1 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java @@ -31,24 +31,20 @@ @Slf4j public class ThreadUtils { - /** - * Wrapper over newDaemonFixedThreadExecutor. - * - * @param threadName threadName - * @param threadsNum threadsNum - * @return ExecutorService - */ public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String threadName, int threadsNum) { - ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build(); - return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, threadFactory); + return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, newDaemonThreadFactory(threadName)); } public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(String threadName) { - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat(threadName) + return Executors.newSingleThreadScheduledExecutor(newDaemonThreadFactory(threadName)); + } + + public static ThreadFactory newDaemonThreadFactory(String threadName) { + return new ThreadFactoryBuilder() .setDaemon(true) + .setNameFormat(threadName) + .setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance()) .build(); - return Executors.newSingleThreadScheduledExecutor(threadFactory); } /** diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/thread/ThreadUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/thread/ThreadUtilsTest.java new file mode 100644 index 0000000000000..7d7b2c0ac6da8 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/thread/ThreadUtilsTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.thread; + +import java.util.concurrent.ThreadPoolExecutor; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class ThreadUtilsTest { + + @Test + void newDaemonFixedThreadExecutor() throws InterruptedException { + ThreadPoolExecutor threadPoolExecutor = ThreadUtils.newDaemonFixedThreadExecutor("DemonThread", 1); + threadPoolExecutor.execute(() -> { + throw new IllegalArgumentException("I am an exception"); + }); + Thread.sleep(1_000); + Assertions.assertEquals(1, DefaultUncaughtExceptionHandler.getUncaughtExceptionCount()); + + } +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java index 2b49b2fec26d7..b0d998af83eea 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java @@ -67,7 +67,7 @@ private void processReceived(final Transporter transporter) { future.release(); if (future.getInvokeCallback() != null) { future.removeFuture(); - this.callbackExecutor.submit(future::executeInvokeCallback); + this.callbackExecutor.execute(future::executeInvokeCallback); } else { future.putResponse(deserialize); } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java index 1ca87091afb4c..e4682f5224bea 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.extract.base; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig; import org.apache.dolphinscheduler.extract.base.exception.RemotingException; import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException; @@ -30,7 +31,6 @@ import org.apache.dolphinscheduler.extract.base.utils.CallerThreadExecutePolicy; import org.apache.dolphinscheduler.extract.base.utils.Constants; import org.apache.dolphinscheduler.extract.base.utils.Host; -import org.apache.dolphinscheduler.extract.base.utils.NamedThreadFactory; import org.apache.dolphinscheduler.extract.base.utils.NettyUtils; import java.net.InetSocketAddress; @@ -40,6 +40,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -80,12 +81,11 @@ public class NettyRemotingClient implements AutoCloseable { public NettyRemotingClient(final NettyClientConfig clientConfig) { this.clientConfig = clientConfig; + ThreadFactory nettyClientThreadFactory = ThreadUtils.newDaemonThreadFactory("NettyClientThread-"); if (Epoll.isAvailable()) { - this.workerGroup = - new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient")); + this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory); } else { - this.workerGroup = - new NioEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient")); + this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory); } this.callbackExecutor = new ThreadPoolExecutor( Constants.CPUS, @@ -93,12 +93,12 @@ public NettyRemotingClient(final NettyClientConfig clientConfig) { 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1000), - new NamedThreadFactory("CallbackExecutor"), + ThreadUtils.newDaemonThreadFactory("NettyClientCallbackThread-"), new CallerThreadExecutePolicy()); this.clientHandler = new NettyClientHandler(this, callbackExecutor); - this.responseFutureExecutor = - Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor")); + this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor( + ThreadUtils.newDaemonThreadFactory("NettyClientResponseFutureThread-")); this.start(); } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java index 7655b804fe177..365a17dd030f8 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.extract.base; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.exception.RemoteException; import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder; @@ -27,15 +28,11 @@ import org.apache.dolphinscheduler.extract.base.utils.NettyUtils; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; @@ -55,8 +52,8 @@ public class NettyRemotingServer { private final ServerBootstrap serverBootstrap = new ServerBootstrap(); - private final ExecutorService defaultExecutor = - Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); + private final ExecutorService defaultExecutor = ThreadUtils + .newDaemonFixedThreadExecutor("NettyRemotingServerThread", Runtime.getRuntime().availableProcessors() * 2); private final EventLoopGroup bossGroup; @@ -68,16 +65,12 @@ public class NettyRemotingServer { private final AtomicBoolean isStarted = new AtomicBoolean(false); - private static final String NETTY_BIND_FAILURE_MSG = "NettyRemotingServer bind %s fail"; - public NettyRemotingServer(final NettyServerConfig serverConfig) { this.serverConfig = serverConfig; ThreadFactory bossThreadFactory = - new ThreadFactoryBuilder().setDaemon(true).setNameFormat(serverConfig.getServerName() + "BossThread_%s") - .build(); + ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + "BossThread_%s"); ThreadFactory workerThreadFactory = - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat(serverConfig.getServerName() + "WorkerThread_%s").build(); + ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + "WorkerThread_%s"); if (Epoll.isAvailable()) { this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory); this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory); diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java index a98362209db15..b4978172f1248 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java @@ -90,7 +90,7 @@ private void processReceived(final Channel channel, final Transporter transporte channel.writeAndFlush(response); return; } - nettyRemotingServer.getDefaultExecutor().submit(() -> { + nettyRemotingServer.getDefaultExecutor().execute(() -> { StandardRpcResponse iRpcResponse; try { StandardRpcRequest standardRpcRequest = diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/NamedThreadFactory.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/NamedThreadFactory.java deleted file mode 100644 index 19589a969854d..0000000000000 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/NamedThreadFactory.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.extract.base.utils; - -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * thread factory - */ -public class NamedThreadFactory implements ThreadFactory { - - private final AtomicInteger increment = new AtomicInteger(1); - - /** - * name - */ - private final String name; - - /** - * count - */ - private final int count; - - public NamedThreadFactory(String name) { - this(name, 0); - } - - public NamedThreadFactory(String name, int count) { - this.name = name; - this.count = count; - } - - /** - * create thread - * @param r runnable - * @return thread - */ - @Override - public Thread newThread(Runnable r) { - final String threadName = count > 0 ? String.format("%s_%d_%d", name, count, increment.getAndIncrement()) - : String.format("%s_%d", name, increment.getAndIncrement()); - Thread t = new Thread(r, threadName); - t.setDaemon(true); - return t; - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 82f3bda7606c4..7d6600cc43390 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; +import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; @@ -84,6 +85,9 @@ public class MasterServer implements IStoppable { private MasterSlotManager masterSlotManager; public static void main(String[] args) { + MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount); + + Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance()); Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER); SpringApplication.run(MasterServer.class); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java index 78eb0b7216107..09ba1cb4ba3e7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java @@ -61,6 +61,12 @@ public void registerMasterMemoryUsageGauge(Supplier supplier) { .register(Metrics.globalRegistry); } + public static void registerUncachedException(final Supplier supplier) { + Gauge.builder("ds.master.uncached.exception", supplier) + .description("number of uncached exception") + .register(Metrics.globalRegistry); + } + public void incMasterOverload() { masterOverloadCounter.increment(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index 5fc0a1990f6f7..7c7095971fe02 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -20,11 +20,11 @@ import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.model.MasterHeartBeat; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; -import org.apache.dolphinscheduler.extract.base.utils.NamedThreadFactory; import org.apache.dolphinscheduler.registry.api.Event; import org.apache.dolphinscheduler.registry.api.Event.Type; import org.apache.dolphinscheduler.registry.api.RegistryClient; @@ -116,8 +116,8 @@ public void afterPropertiesSet() { refreshNodesAndGroupMappings(); // init executor service - executorService = - Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor")); + executorService = Executors + .newSingleThreadScheduledExecutor(ThreadUtils.newDaemonThreadFactory("ServerNodeManagerExecutor")); executorService.scheduleWithFixedDelay( new WorkerNodeInfoAndGroupDbSyncTask(), 0, diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java index 86b711f245ed2..e7babaa4174a6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java @@ -78,7 +78,7 @@ public void run() { "Cannot find the taskInstance from TaskExecutionContextCacheManager, the task may already been killed, will stop the async master task"); continue; } - masterAsyncTaskExecutorThreadPool.getThreadPool().submit(() -> { + masterAsyncTaskExecutorThreadPool.getThreadPool().execute(() -> { final AsyncTaskExecuteFunction asyncTaskExecuteFunction = asyncTaskExecutionContext.getAsyncTaskExecuteFunction(); final AsyncTaskCallbackFunction asyncTaskCallbackFunction = diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java index 868b66b6df822..2761958cd6412 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java @@ -41,7 +41,7 @@ public MasterAsyncTaskExecutorThreadPool(MasterConfig masterConfig) { public boolean submitMasterTaskExecutor(AsyncMasterTaskExecutor asyncMasterTaskExecutor) { synchronized (MasterAsyncTaskExecutorThreadPool.class) { // todo: check if the thread pool is overload - threadPoolExecutor.submit(asyncMasterTaskExecutor); + threadPoolExecutor.execute(asyncMasterTaskExecutor); return true; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java index 3f683076e6379..4ce59f1f49284 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java @@ -41,7 +41,7 @@ public MasterSyncTaskExecutorThreadPool(MasterConfig masterConfig) { public boolean submitMasterTaskExecutor(SyncMasterTaskExecutor syncMasterTaskExecutor) { synchronized (MasterSyncTaskExecutorThreadPool.class) { // todo: check if the thread pool is overload - threadPoolExecutor.submit(syncMasterTaskExecutor); + threadPoolExecutor.execute(syncMasterTaskExecutor); return true; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index 4203516f421ad..e20da27bbd9bf 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -265,7 +265,7 @@ private void parseProcessOutput(Process process) { // todo: remove this this thread pool. ExecutorService getOutputLogService = ThreadUtils .newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + taskRequest.getTaskName()); - getOutputLogService.submit(() -> { + getOutputLogService.execute(() -> { TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 5af0c2617ef69..2420ae52535be 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; +import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; @@ -75,6 +76,8 @@ public class WorkerServer implements IStoppable { * @param args arguments */ public static void main(String[] args) { + WorkerServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount); + Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance()); Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER); SpringApplication.run(WorkerServer.class); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java index a73ce541a3ece..a7673f1240b7d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java @@ -127,4 +127,10 @@ public void registerWorkerMemoryUsageGauge(Supplier supplier) { .register(Metrics.globalRegistry); } + public static void registerUncachedException(final Supplier supplier) { + Gauge.builder("ds.master.uncached.exception", supplier) + .description("number of uncached exception") + .register(Metrics.globalRegistry); + } + } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java index 0e4bb98080a23..4606d9f7e90fc 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java @@ -49,7 +49,7 @@ public boolean submitWorkerTaskExecutor(WorkerTaskExecutor workerTaskExecutor) { synchronized (WorkerTaskExecutorThreadPool.class) { if (TaskExecuteThreadsFullPolicy.CONTINUE.equals(workerConfig.getTaskExecuteThreadsFullPolicy())) { WorkerTaskExecutorHolder.put(workerTaskExecutor); - threadPoolExecutor.submit(workerTaskExecutor); + threadPoolExecutor.execute(workerTaskExecutor); return true; } if (isOverload()) { @@ -58,7 +58,7 @@ public boolean submitWorkerTaskExecutor(WorkerTaskExecutor workerTaskExecutor) { return false; } WorkerTaskExecutorHolder.put(workerTaskExecutor); - threadPoolExecutor.submit(workerTaskExecutor); + threadPoolExecutor.execute(workerTaskExecutor); return true; } }