From 0f5b9eae89b4a18b2ab9cb2d33549cc1dba29753 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E6=89=BF=E7=A5=A5?= Date: Thu, 12 May 2022 14:59:16 +0800 Subject: [PATCH] thread name fix --- .../hudi/async/AsyncClusteringService.java | 13 ++++--------- .../apache/hudi/async/AsyncCompactService.java | 17 ++++++----------- 2 files changed, 10 insertions(+), 20 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java index 7fece5c885f8a..1e4d4d1f593af 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.CustomizedThreadFactory; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; @@ -42,13 +43,12 @@ */ public abstract class AsyncClusteringService extends HoodieAsyncTableService { + public static final String CLUSTERING_POOL_NAME = "hoodiecluster"; private static final long serialVersionUID = 1L; private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class); - public static final String CLUSTERING_POOL_NAME = "hoodiecluster"; - private final int maxConcurrentClustering; - private transient BaseClusterer clusteringClient; protected transient HoodieEngineContext context; + private transient BaseClusterer clusteringClient; public AsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient writeClient) { this(context, writeClient, false); @@ -69,12 +69,7 @@ public AsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient @Override protected Pair startService() { ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentClustering, - r -> { - Thread t = new Thread(r, "async_clustering_thread"); - t.setDaemon(isRunInDaemonMode()); - return t; - }); - + new CustomizedThreadFactory("async_clustering_thread", isRunInDaemonMode())); return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> { try { // Set Compactor Pool Name for allowing users to prioritize compaction diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java index f1f7f416e466c..a62beae02bbdb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.CustomizedThreadFactory; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; @@ -39,17 +40,15 @@ */ public abstract class AsyncCompactService extends HoodieAsyncTableService { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class); - /** * This is the job pool used by async compaction. */ public static final String COMPACT_POOL_NAME = "hoodiecompact"; - + private static final long serialVersionUID = 1L; + private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class); private final int maxConcurrentCompaction; - private transient BaseCompactor compactor; protected transient HoodieEngineContext context; + private transient BaseCompactor compactor; public AsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient client) { this(context, client, false); @@ -70,11 +69,7 @@ public AsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient cl @Override protected Pair startService() { ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction, - r -> { - Thread t = new Thread(r, "async_compact_thread"); - t.setDaemon(isRunInDaemonMode()); - return t; - }); + new CustomizedThreadFactory("async_compact_thread", isRunInDaemonMode())); return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> { try { // Set Compactor Pool Name for allowing users to prioritize compaction @@ -107,9 +102,9 @@ protected Pair startService() { }, executor)).toArray(CompletableFuture[]::new)), executor); } - /** * Check whether compactor thread needs to be stopped. + * * @return */ protected boolean shouldStopCompactor() {