Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Use shared executors for broker and geo-replication clients #13839

Merged
merged 3 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors();

@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of shared threads to use for broker clients."
+ " Default is set to `2 * Runtime.getRuntime().availableProcessors()`"
)
private int brokerClientNumIOThreads = 2 * Runtime.getRuntime().availableProcessors();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say that we should prob reuse the broker IO threads here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that broker IO threads is a EventLoopGroup instance which is already used in the Pulsar Client.

this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(),
new DefaultThreadFactory("pulsar-io"));

The Pulsar Client has this internal/IO and external/listener executors in addition to the eventLoopGroup.

this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider :
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");

It might be a risky change in Pulsar Client to replace the internal executor to delegate directly to eventLoopGroup.

This is the reason why I think it's necessary to have a separate setting to control the amount of threads used to run the Pulsar Client internal executors. Before this PR each client has had a single threaded internal executor for each client instance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think the confusion comes from the name brokerClientNumIOThreads, since it's client internal executor and not the client event loop which is already shared.

Additionally, the client internal executor shouldn't really be used a lot in broker, since it's used mainly for dispatching the partitioned/multi topic consumers, so we could just default to 1 thread instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think the confusion comes from the name brokerClientNumIOThreads, since it's client internal executor and not the client event loop which is already shared.

That's true. The Pulsar Client uses the numIOThreads parameter for configuring the number of threads for the eventLoopGroup and also for the internal/IO executor.

return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), conf.isEnableBusyWait(), threadFactory);

this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");

This is confusing. The internal executor was introduced in #8208 and that change uses numIOThreads parameter also for configuring the number of threads for the internal executor. @merlimat I wonder what would be a way to improve this situation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, the client internal executor shouldn't really be used a lot in broker, since it's used mainly for dispatching the partitioned/multi topic consumers, so we could just default to 1 thread instead.

yes, that is true. I wonder if the value needs to be configurable at all? I'll remove brokerClientNumIOThreads key and just use the value 1 as I did for the external executor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the changes now. @merlimat PTAL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for orderedExecutor."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -126,6 +127,7 @@
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
Expand Down Expand Up @@ -226,6 +228,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {

private final Consumer<Integer> processTerminator;
protected final EventLoopGroup ioEventLoopGroup;
private final ExecutorProvider brokerClientSharedInternalExecutorProvider;
private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
private final Timer brokerClientSharedTimer;

private MetricsGenerator metricsGenerator;

Expand Down Expand Up @@ -318,6 +323,14 @@ public PulsarService(ServiceConfiguration config,

this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(),
new DefaultThreadFactory("pulsar-io"));
this.brokerClientSharedInternalExecutorProvider =
new ExecutorProvider(config.getBrokerClientNumIOThreads(), "broker-client-shared-internal-executor");
// the external executor is not used in the Pulsar Proxy since this executor is used for consumer listeners
// since an instance is required, a single threaded shared instance is used for all broker client instances
this.brokerClientSharedExternalExecutorProvider =
new ExecutorProvider(1, "broker-client-shared-external-executor");
this.brokerClientSharedTimer = new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer",
Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS);
}

public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
Expand Down Expand Up @@ -496,6 +509,9 @@ public CompletableFuture<Void> closeAsync() {
transactionReplayExecutor.shutdown();
}

brokerClientSharedExternalExecutorProvider.shutdownNow();
brokerClientSharedInternalExecutorProvider.shutdownNow();
brokerClientSharedTimer.stop();
ioEventLoopGroup.shutdownGracefully();

// add timeout handling for closing executors
Expand Down Expand Up @@ -1296,6 +1312,17 @@ protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImp
return this.offloaderScheduler;
}

public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
throws PulsarClientException {
return PulsarClientImpl.builder()
.conf(clientConf)
.eventLoopGroup(ioEventLoopGroup)
.timer(brokerClientSharedTimer)
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
.build();
}

public synchronized PulsarClient getClient() throws PulsarServerException {
if (this.client == null) {
try {
Expand Down Expand Up @@ -1329,7 +1356,7 @@ public synchronized PulsarClient getClient() throws PulsarServerException {
}

conf.setStatsIntervalSeconds(0);
this.client = new PulsarClientImpl(conf, ioEventLoopGroup);
this.client = createClientImpl(conf);
} catch (Exception e) {
throw new PulsarServerException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import io.netty.channel.EventLoopGroup;
import io.prometheus.client.Counter;
import java.net.URI;
import java.net.URL;
Expand Down Expand Up @@ -1239,7 +1238,7 @@ public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) {

// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, (EventLoopGroup) pulsar.getBrokerService().executor());
return pulsar.createClientImpl(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.BindAddress;
Expand Down Expand Up @@ -1203,7 +1202,7 @@ public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> c
}
// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, workerGroup);
return pulsar.createClientImpl(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down