Skip to content

Commit

Permalink
[Broker] Use shared executors for broker and geo-replication clients (#…
Browse files Browse the repository at this point in the history
…13839)

* [Broker] Use shared executors for broker clients and geo-replication clients

* Remove brokerClientNumIOThreads configuration key and default to 1

* Revisit the shared timer creation

- don't ever make it a daemon thread

(cherry picked from commit 4924e6d)
  • Loading branch information
lhotari authored and congbobo184 committed Nov 10, 2022
1 parent df3e2ce commit a75a22d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -234,6 +235,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {

private final Consumer<Integer> processTerminator;
protected final EventLoopGroup ioEventLoopGroup;
private final ExecutorProvider brokerClientSharedInternalExecutorProvider;
private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
private final Timer brokerClientSharedTimer;

private MetricsGenerator metricsGenerator;

Expand Down Expand Up @@ -326,6 +330,18 @@ public PulsarService(ServiceConfiguration config,

this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(),
new DefaultThreadFactory("pulsar-io"));
// the internal executor is not used in the broker client or replication clients since this executor is
// used for consumers and the transaction support in the client.
// since an instance is required, a single threaded shared instance is used for all broker client instances
this.brokerClientSharedInternalExecutorProvider =
new ExecutorProvider(1, "broker-client-shared-internal-executor");
// the external executor is not used in the broker client or replication clients since this executor is
// used for consumer listeners.
// since an instance is required, a single threaded shared instance is used for all broker client instances
this.brokerClientSharedExternalExecutorProvider =
new ExecutorProvider(1, "broker-client-shared-external-executor");
this.brokerClientSharedTimer =
new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
}

public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
Expand Down Expand Up @@ -518,6 +534,9 @@ public CompletableFuture<Void> closeAsync() {
transactionExecutorProvider.shutdownNow();
}

brokerClientSharedExternalExecutorProvider.shutdownNow();
brokerClientSharedInternalExecutorProvider.shutdownNow();
brokerClientSharedTimer.stop();
ioEventLoopGroup.shutdownGracefully();

// add timeout handling for closing executors
Expand Down Expand Up @@ -1344,6 +1363,17 @@ protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImp
return this.offloaderScheduler;
}

public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
throws PulsarClientException {
return PulsarClientImpl.builder()
.conf(clientConf)
.eventLoopGroup(ioEventLoopGroup)
.timer(brokerClientSharedTimer)
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
.build();
}

public synchronized PulsarClient getClient() throws PulsarServerException {
if (this.client == null) {
try {
Expand Down Expand Up @@ -1388,7 +1418,7 @@ public synchronized PulsarClient getClient() throws PulsarServerException {
}

conf.setStatsIntervalSeconds(0);
this.client = new PulsarClientImpl(conf, ioEventLoopGroup);
this.client = createClientImpl(conf);
} catch (Exception e) {
throw new PulsarServerException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import io.netty.channel.EventLoopGroup;
import io.prometheus.client.Counter;
import java.net.URI;
import java.net.URL;
Expand Down Expand Up @@ -1306,7 +1305,7 @@ public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) {

// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, (EventLoopGroup) pulsar.getBrokerService().executor());
return pulsar.createClientImpl(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
Expand Down Expand Up @@ -1177,7 +1176,7 @@ public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> c
}
// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, workerGroup);
return pulsar.createClientImpl(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit a75a22d

Please sign in to comment.