Skip to content

Commit

Permalink
[HUDI-4086] Use CustomizedThreadFactory in async compaction and clust…
Browse files Browse the repository at this point in the history
…ering (apache#5563)

Co-authored-by: 苏承祥 <sucx@tuya.com>
  • Loading branch information
2 people authored and yihua committed Jun 3, 2022
1 parent 0632728 commit ba70ced
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;

Expand All @@ -42,13 +43,12 @@
*/
public abstract class AsyncClusteringService extends HoodieAsyncTableService {

public static final String CLUSTERING_POOL_NAME = "hoodiecluster";
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class);
public static final String CLUSTERING_POOL_NAME = "hoodiecluster";

private final int maxConcurrentClustering;
private transient BaseClusterer clusteringClient;
protected transient HoodieEngineContext context;
private transient BaseClusterer clusteringClient;

public AsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient writeClient) {
this(context, writeClient, false);
Expand All @@ -69,12 +69,7 @@ public AsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient
@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentClustering,
r -> {
Thread t = new Thread(r, "async_clustering_thread");
t.setDaemon(isRunInDaemonMode());
return t;
});

new CustomizedThreadFactory("async_clustering_thread", isRunInDaemonMode()));
return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
// Set Compactor Pool Name for allowing users to prioritize compaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;

Expand All @@ -39,17 +40,15 @@
*/
public abstract class AsyncCompactService extends HoodieAsyncTableService {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);

/**
* This is the job pool used by async compaction.
*/
public static final String COMPACT_POOL_NAME = "hoodiecompact";

private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);
private final int maxConcurrentCompaction;
private transient BaseCompactor compactor;
protected transient HoodieEngineContext context;
private transient BaseCompactor compactor;

public AsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient client) {
this(context, client, false);
Expand All @@ -70,11 +69,7 @@ public AsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient cl
@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction,
r -> {
Thread t = new Thread(r, "async_compact_thread");
t.setDaemon(isRunInDaemonMode());
return t;
});
new CustomizedThreadFactory("async_compact_thread", isRunInDaemonMode()));
return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
// Set Compactor Pool Name for allowing users to prioritize compaction
Expand Down Expand Up @@ -107,9 +102,9 @@ protected Pair<CompletableFuture, ExecutorService> startService() {
}, executor)).toArray(CompletableFuture[]::new)), executor);
}


/**
* Check whether compactor thread needs to be stopped.
*
* @return
*/
protected boolean shouldStopCompactor() {
Expand Down

0 comments on commit ba70ced

Please sign in to comment.