Skip to content

Commit

Permalink
[Client] Support passing existing executor providers to the client
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Sep 14, 2021
1 parent 7180c9c commit 8a965f3
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@
package org.apache.pulsar.client.impl;

import static org.apache.commons.lang3.StringUtils.isBlank;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;

import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -49,7 +46,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import lombok.Builder;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.AuthenticationFactory;
Expand Down Expand Up @@ -93,12 +90,13 @@ public class PulsarClientImpl implements PulsarClient {
private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);

protected final ClientConfigurationData conf;
private final boolean createdExecutorProviders;
private LookupService lookup;
private final ConnectionPool cnxPool;
private final Timer timer;
private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorService;
private final ExecutorProvider internalExecutorProvider;
private final boolean createdEventLoopGroup;
private final boolean createdCnxPool;

Expand Down Expand Up @@ -133,48 +131,53 @@ public SchemaInfoProvider load(String topicName) {
private TransactionCoordinatorClientImpl tcClient;

public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
this(conf, getEventLoopGroup(conf), true);
this(conf, null, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, false, true);
this(conf, eventLoopGroup, null, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
this(conf, eventLoopGroup, cnxPool, null, false, false);
this(conf, eventLoopGroup, cnxPool, null, null, null);
}

public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer)
throws PulsarClientException {
this(conf, eventLoopGroup, cnxPool, timer, false, false);
this(conf, eventLoopGroup, cnxPool, timer, null, null);
}

private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, boolean createdEventLoopGroup)
throws PulsarClientException {
this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup), null, createdEventLoopGroup, true);
}

private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer,
boolean createdEventLoopGroup, boolean createdCnxPool) throws PulsarClientException {
@Builder(builderClassName = "PulsarClientImplBuilder")
private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
Timer timer, ExecutorProvider externalExecutorProvider,
ExecutorProvider internalExecutorProvider) throws PulsarClientException {
EventLoopGroup eventLoopGroupReference = null;
ConnectionPool connectionPoolReference = null;
try {
this.createdEventLoopGroup = createdEventLoopGroup;
this.createdCnxPool = createdCnxPool;
if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
this.createdEventLoopGroup = eventLoopGroup == null;
this.createdCnxPool = connectionPool == null;
if ((externalExecutorProvider == null) != (internalExecutorProvider == null)) {
throw new IllegalStateException("Both externalExecutorProvider and internalExecutorProvider must be specified or unspecified.");
}
this.createdExecutorProviders = externalExecutorProvider == null;
eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf);
this.eventLoopGroup = eventLoopGroupReference;
if (conf == null || isBlank(conf.getServiceUrl()) || this.eventLoopGroup == null) {
throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
}
this.eventLoopGroup = eventLoopGroup;
setAuth(conf);
this.conf = conf;
clientClock = conf.getClock();
conf.getAuthentication().start();
this.cnxPool = cnxPool;
externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
connectionPoolReference = connectionPool != null ? connectionPool : new ConnectionPool(conf, this.eventLoopGroup);
this.cnxPool = connectionPoolReference;
this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider : new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, eventLoopGroup);
lookup = new HttpLookupService(conf, this.eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), externalExecutorProvider.getExecutor());
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), this.externalExecutorProvider.getExecutor());
}
if (timer == null) {
this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
Expand All @@ -199,8 +202,8 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
state.set(State.Open);
} catch (Throwable t) {
shutdown();
shutdownEventLoopGroup(eventLoopGroup);
closeCnxPool(cnxPool);
shutdownEventLoopGroup(eventLoopGroupReference);
closeCnxPool(connectionPoolReference);
throw t;
}
}
Expand Down Expand Up @@ -786,27 +789,29 @@ private void shutdownEventLoopGroup(EventLoopGroup eventLoopGroup) throws Pulsar
}

private void shutdownExecutors() throws PulsarClientException {
PulsarClientException pulsarClientException = null;
if (createdExecutorProviders) {
PulsarClientException pulsarClientException = null;

if (externalExecutorProvider != null && !externalExecutorProvider.isShutdown()) {
try {
externalExecutorProvider.shutdownNow();
} catch (Throwable t) {
log.warn("Failed to shutdown externalExecutorProvider", t);
pulsarClientException = PulsarClientException.unwrap(t);
if (externalExecutorProvider != null && !externalExecutorProvider.isShutdown()) {
try {
externalExecutorProvider.shutdownNow();
} catch (Throwable t) {
log.warn("Failed to shutdown externalExecutorProvider", t);
pulsarClientException = PulsarClientException.unwrap(t);
}
}
}
if (internalExecutorService != null && !internalExecutorService.isShutdown()) {
try {
internalExecutorService.shutdownNow();
} catch (Throwable t) {
log.warn("Failed to shutdown internalExecutorService", t);
pulsarClientException = PulsarClientException.unwrap(t);
if (internalExecutorProvider != null && !internalExecutorProvider.isShutdown()) {
try {
internalExecutorProvider.shutdownNow();
} catch (Throwable t) {
log.warn("Failed to shutdown internalExecutorService", t);
pulsarClientException = PulsarClientException.unwrap(t);
}
}
}

if (pulsarClientException != null) {
throw pulsarClientException;
if (pulsarClientException != null) {
throw pulsarClientException;
}
}
}

Expand Down Expand Up @@ -1044,7 +1049,7 @@ protected <T> CompletableFuture<Schema<T>> preProcessSchemaBeforeSubscribe(Pulsa
}

public ExecutorService getInternalExecutorService() {
return internalExecutorService.getExecutor();
return internalExecutorProvider.getExecutor();
}
//
// Transaction related API
Expand All @@ -1059,5 +1064,4 @@ public TransactionBuilder newTransaction() throws PulsarClientException {
}
return new TransactionBuilderImpl(this, tcClient);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@
import java.util.concurrent.ThreadFactory;
import java.util.regex.Pattern;

import lombok.Cleanup;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -217,4 +219,32 @@ public void testResourceCleanup() throws PulsarClientException {
assertFalse(eventLoopGroup.isShutdown());
}
}

@Test
public void testInitializingWithExecutorProviders() throws PulsarClientException {
ClientConfigurationData conf = clientImpl.conf;
@Cleanup("shutdownNow")
ExecutorProvider executorProvider = new ExecutorProvider(2, "shared-executor");
@Cleanup
PulsarClientImpl client2 = PulsarClientImpl.builder().conf(conf)
.internalExecutorProvider(executorProvider)
.externalExecutorProvider(executorProvider)
.build();
@Cleanup
PulsarClientImpl client3 = PulsarClientImpl.builder().conf(conf)
.internalExecutorProvider(executorProvider)
.externalExecutorProvider(executorProvider)
.build();
}

@Test(expectedExceptions = IllegalStateException.class)
public void testBothExecutorProvidersMustBeSpecified() throws PulsarClientException {
ClientConfigurationData conf = clientImpl.conf;
@Cleanup("shutdownNow")
ExecutorProvider executorProvider = new ExecutorProvider(2, "shared-executor");
@Cleanup
PulsarClientImpl client2 = PulsarClientImpl.builder().conf(conf)
.internalExecutorProvider(executorProvider)
.build();
}
}

0 comments on commit 8a965f3

Please sign in to comment.