Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Use shared executors for broker and geo-replication clients #13839

Merged
merged 3 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -126,6 +127,7 @@
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
Expand Down Expand Up @@ -226,6 +228,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {

private final Consumer<Integer> processTerminator;
protected final EventLoopGroup ioEventLoopGroup;
private final ExecutorProvider brokerClientSharedInternalExecutorProvider;
private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
private final Timer brokerClientSharedTimer;

private MetricsGenerator metricsGenerator;

Expand Down Expand Up @@ -318,6 +323,18 @@ public PulsarService(ServiceConfiguration config,

this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(),
new DefaultThreadFactory("pulsar-io"));
// the internal executor is not used in the broker client or replication clients since this executor is
// used for consumers and the transaction support in the client.
// since an instance is required, a single threaded shared instance is used for all broker client instances
this.brokerClientSharedInternalExecutorProvider =
new ExecutorProvider(1, "broker-client-shared-internal-executor");
// the external executor is not used in the broker client or replication clients since this executor is
// used for consumer listeners.
// since an instance is required, a single threaded shared instance is used for all broker client instances
Comment on lines +331 to +333
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this hint at a problem in the java client? It looks like a listener is declared on consumer initialization. We could change the behavior so that it only gets a thread when there is a listener.

We'll still have the issue of creating an ExecutorProvider per client, so this change still makes sense, but it'd be cheaper for end users to avoid the thread creation for consumers that lack listeners.

I am going to open a PR to update the Java Consumer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this hint at a problem in the java client? It looks like a listener is declared on consumer initialization. We could change the behavior so that it only gets a thread when there is a listener.

Yes the client isn't optimal, but I doubt that it causes an actual measurable overhead. I think that the thread is just pinned out of the single shared thread and nothing will get run on it (in the current usage patterns when client is used in the broker).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the client isn't optimal, but I doubt that it causes an actual measurable overhead. I think that the thread is just pinned out of the single shared thread and nothing will get run on it (in the current usage patterns when client is used in the broker).

I was thinking about user applications that have many consumers. I just looked a bit closer, and the upper limit here is NumListenerThreads in the client configuration. That is the max number of threads a client will spawn for consumer listeners. Was this a problem in the broker because we have many clients?

this.brokerClientSharedExternalExecutorProvider =
new ExecutorProvider(1, "broker-client-shared-external-executor");
this.brokerClientSharedTimer =
new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
}

public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
Expand Down Expand Up @@ -496,6 +513,9 @@ public CompletableFuture<Void> closeAsync() {
transactionReplayExecutor.shutdown();
}

brokerClientSharedExternalExecutorProvider.shutdownNow();
brokerClientSharedInternalExecutorProvider.shutdownNow();
brokerClientSharedTimer.stop();
ioEventLoopGroup.shutdownGracefully();

// add timeout handling for closing executors
Expand Down Expand Up @@ -1296,6 +1316,17 @@ protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImp
return this.offloaderScheduler;
}

public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
throws PulsarClientException {
return PulsarClientImpl.builder()
.conf(clientConf)
.eventLoopGroup(ioEventLoopGroup)
.timer(brokerClientSharedTimer)
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
.build();
}

public synchronized PulsarClient getClient() throws PulsarServerException {
if (this.client == null) {
try {
Expand Down Expand Up @@ -1329,7 +1360,7 @@ public synchronized PulsarClient getClient() throws PulsarServerException {
}

conf.setStatsIntervalSeconds(0);
this.client = new PulsarClientImpl(conf, ioEventLoopGroup);
this.client = createClientImpl(conf);
} catch (Exception e) {
throw new PulsarServerException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import io.netty.channel.EventLoopGroup;
import io.prometheus.client.Counter;
import java.net.URI;
import java.net.URL;
Expand Down Expand Up @@ -1239,7 +1238,7 @@ public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) {

// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, (EventLoopGroup) pulsar.getBrokerService().executor());
return pulsar.createClientImpl(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.BindAddress;
Expand Down Expand Up @@ -1203,7 +1202,7 @@ public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> c
}
// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, workerGroup);
return pulsar.createClientImpl(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down