Skip to content

Commit

Permalink
Use ThreadPoolExecutor instead of EventLoop (apache#8208)
Browse files Browse the repository at this point in the history
### Motivation
When Netty's `EventLoop` receives a new task,it will call `eventFdWrite`, and then trigger system calls, such as: system_call_fastpath, eventfd_write
After we replaced EventLoop with a native jdk thread pool, the performance improved

### Modifications
Use ThreadPoolExecutor instead of EventLoop

### Verifying this change
We use pulsar perf for testing
before:
Aggregated throughput stats --- 11715556 records received --- 68813.420 msg/s --- 537.605 Mbit/s

after:
Aggregated throughput stats --- 18392800 records received --- 133314.602 msg/s --- 1041.520 Mbit/s
  • Loading branch information
315157973 authored and flowchartsman committed Nov 17, 2020
1 parent ed6172c commit f164959
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,7 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
} else {
// Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
// recursion and stack overflow
client.eventLoopGroup().execute(() -> {
receiveMessageFromConsumer(consumer);
});
client.getInternalExecutorService().execute(() -> receiveMessageFromConsumer(consumer));
}
});
}
Expand Down Expand Up @@ -298,7 +296,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() {
break;
}

client.eventLoopGroup().execute(() -> {
client.getInternalExecutorService().execute(() -> {
receiveMessageFromConsumer(consumer);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -65,7 +67,6 @@
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.transaction.TransactionBuilder;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
Expand Down Expand Up @@ -96,6 +97,7 @@ public class PulsarClientImpl implements PulsarClient {
private final ConnectionPool cnxPool;
private final Timer timer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorService;

public enum State {
Open, Closing, Closed
Expand Down Expand Up @@ -145,6 +147,7 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr
conf.getAuthentication().start();
this.cnxPool = cnxPool;
externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), getThreadFactory("pulsar-external-listener"));
internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), getThreadFactory("pulsar-client-internal"));
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, eventLoopGroup);
} else {
Expand Down Expand Up @@ -591,6 +594,7 @@ public void shutdown() throws PulsarClientException {
cnxPool.close();
timer.stop();
externalExecutorProvider.shutdownNow();
internalExecutorService.shutdownNow();
conf.getAuthentication().close();
} catch (Throwable t) {
log.warn("Failed to shutdown Pulsar client", t);
Expand Down Expand Up @@ -815,6 +819,9 @@ protected <T> CompletableFuture<Schema<T>> preProcessSchemaBeforeSubscribe(Pulsa
return CompletableFuture.completedFuture(schema);
}

public ExecutorService getInternalExecutorService() {
return internalExecutorService.getExecutor();
}
//
// Transaction related API
//
Expand Down

0 comments on commit f164959

Please sign in to comment.