Skip to content

Commit

Permalink
allow to deactivate pulsar worker and set clientAuthenticationParameters
Browse files Browse the repository at this point in the history
and clientAuthenticationPlugin
  • Loading branch information
jetoile committed Nov 27, 2019
1 parent cad7abf commit a381ee0
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public class PulsarBootstrap implements Bootstrap {
private String ip;
private String tmpDirPath;
private int streamerStoragePort;
private boolean workerEnabled;
private String workerClientAuthenticationParameters;
private String workerClientAuthenticationPlugin;

private boolean authenticationEnabled;
private String authenticationProviders;
Expand Down Expand Up @@ -116,6 +119,10 @@ private void loadConfig() {
String[] extraConfsAsList = configuration.getStringArray(PulsarConfig.PULSAR_EXTRA_CONF_KEY);
extraConf = Arrays.asList(extraConfsAsList).stream().collect(Collectors.toMap(c -> c.split("=")[0], c -> c.split("=")[1]));

workerEnabled = configuration.getBoolean(PulsarConfig.PULSAR_WORKER_ENABLED_KEY, true);
workerClientAuthenticationParameters = configuration.getString(PulsarConfig.PULSAR_WORKER_CLIENT_AUTHENTICATION_PARAMETERS_KEY);
workerClientAuthenticationPlugin = configuration.getString(PulsarConfig.PULSAR_WORKER_CLIENT_AUTHENTICATION_PLUGIN_KEY);

zookeeperHost = configuration.getString(ZOOKEEPER_HOST_CLIENT_KEY);
zookeeperPort = configuration.getInt(ZOOKEEPER_PORT_KEY);
}
Expand Down Expand Up @@ -157,6 +164,15 @@ public void loadConfig(Map<String, String> configs) {
String[] extraConfsString = extraConfList.split(",");
extraConf = Arrays.asList(extraConfsString).stream().collect(Collectors.toMap(c -> c.split("=")[0], c -> c.split("=")[1]));
}
if (StringUtils.isNotEmpty(configs.get(PulsarConfig.PULSAR_WORKER_ENABLED_KEY))) {
workerEnabled = Boolean.parseBoolean(configs.get(PulsarConfig.PULSAR_WORKER_ENABLED_KEY));
}
if (StringUtils.isNotEmpty(configs.get(PulsarConfig.PULSAR_WORKER_CLIENT_AUTHENTICATION_PLUGIN_KEY))) {
workerClientAuthenticationPlugin = configs.get(PulsarConfig.PULSAR_WORKER_CLIENT_AUTHENTICATION_PLUGIN_KEY);
}
if (StringUtils.isNotEmpty(configs.get(PulsarConfig.PULSAR_WORKER_CLIENT_AUTHENTICATION_PARAMETERS_KEY))) {
workerClientAuthenticationParameters = configs.get(PulsarConfig.PULSAR_WORKER_CLIENT_AUTHENTICATION_PARAMETERS_KEY);
}

if (StringUtils.isNotEmpty(configs.get(ZOOKEEPER_PORT_KEY))) {
zookeeperPort = Integer.parseInt(configs.get(ZOOKEEPER_PORT_KEY));
Expand Down Expand Up @@ -200,43 +216,50 @@ private void build() throws IOException {
serviceConfiguration.setProperties(properties);
}

workerConfig = new WorkerConfig();

workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + port);
workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + httpPort);
workerConfig.setWorkerHostname(ip);
workerConfig.setWorkerPort(httpPort);
workerConfig.setWorkerId("c-" + name + "-fw-" + serviceConfiguration.getAdvertisedAddress() + "-" + workerConfig.getWorkerPort());
workerConfig.setConfigurationStoreServers(zookeeperServers);
workerConfig.setZooKeeperSessionTimeoutMillis(10000);
workerConfig.setZooKeeperOperationTimeoutSeconds(10000);
workerConfig.setDownloadDirectory(tmpDirPath + "/pulsar_functions");
workerConfig.setPulsarFunctionsNamespace("public/functions");
workerConfig.setPulsarFunctionsCluster(name);
workerConfig.setSchedulerClassName("org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler");
workerConfig.setRescheduleTimeoutMs(60000);
workerConfig.setFailureCheckFreqMs(30000);
workerConfig.setInitialBrokerReconnectMaxRetries(60);
workerConfig.setAssignmentWriteMaxRetries(60);
workerConfig.setInstanceLivenessCheckFreqMs(30000);
workerConfig.setTopicCompactionFrequencySec(1800);
workerConfig.setFunctionAssignmentTopicName("assignments");
workerConfig.setFunctionMetadataTopicName("metadata");
workerConfig.setClusterCoordinationTopicName("coordinate");
workerConfig.setProcessContainerFactory(new WorkerConfig.ProcessContainerFactory()
.setExtraFunctionDependenciesDir(tmpDirPath + "/extraFunctionDependencies")
.setJavaInstanceJarLocation(tmpDirPath + "/javaInstanceJar")
.setLogDirectory(tmpDirPath + "/log"));
workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("functions-thread"));
workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" + streamerStoragePort);


functionsWorkerService = new WorkerService(workerConfig);

// init pulsar service
// pulsarService = new PulsarService(serviceConfiguration, Optional.empty());
pulsarService = new PulsarService(serviceConfiguration, Optional.ofNullable(functionsWorkerService));
if (workerEnabled) {
workerConfig = new WorkerConfig();

workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + port);
workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + httpPort);
workerConfig.setWorkerHostname(ip);
workerConfig.setWorkerPort(httpPort);
workerConfig.setWorkerId("c-" + name + "-fw-" + serviceConfiguration.getAdvertisedAddress() + "-" + workerConfig.getWorkerPort());
workerConfig.setConfigurationStoreServers(zookeeperServers);
workerConfig.setZooKeeperSessionTimeoutMillis(10000);
workerConfig.setZooKeeperOperationTimeoutSeconds(10000);
workerConfig.setDownloadDirectory(tmpDirPath + "/pulsar_functions");
workerConfig.setPulsarFunctionsNamespace("public/functions");
workerConfig.setPulsarFunctionsCluster(name);
workerConfig.setSchedulerClassName("org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler");
workerConfig.setRescheduleTimeoutMs(60000);
workerConfig.setFailureCheckFreqMs(30000);
workerConfig.setInitialBrokerReconnectMaxRetries(60);
workerConfig.setAssignmentWriteMaxRetries(60);
workerConfig.setInstanceLivenessCheckFreqMs(30000);
workerConfig.setTopicCompactionFrequencySec(1800);
workerConfig.setFunctionAssignmentTopicName("assignments");
workerConfig.setFunctionMetadataTopicName("metadata");
workerConfig.setClusterCoordinationTopicName("coordinate");
workerConfig.setProcessContainerFactory(new WorkerConfig.ProcessContainerFactory()
.setExtraFunctionDependenciesDir(tmpDirPath + "/extraFunctionDependencies")
.setJavaInstanceJarLocation(tmpDirPath + "/javaInstanceJar")
.setLogDirectory(tmpDirPath + "/log"));
workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("functions-thread"));
workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" + streamerStoragePort);

if (StringUtils.isNotEmpty(workerClientAuthenticationPlugin)) {
workerConfig.setClientAuthenticationPlugin(workerClientAuthenticationPlugin);
}
if (StringUtils.isNotEmpty(workerClientAuthenticationParameters)) {
workerConfig.setClientAuthenticationParameters(workerClientAuthenticationParameters);
}

functionsWorkerService = new WorkerService(workerConfig);

pulsarService = new PulsarService(serviceConfiguration, Optional.ofNullable(functionsWorkerService));
} else {
pulsarService = new PulsarService(serviceConfiguration, Optional.empty());
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ public class PulsarConfig {
public static final String PULSAR_TEMP_DIR_KEY = "pulsar.temp.dir";
public static final String PULSAR_STREAMER_STORAGE_PORT_KEY = "pulsar.streamer.storage.port";

public static final String PULSAR_WORKER_ENABLED_KEY = "pulsar.worker.enabled";
public static final String PULSAR_WORKER_CLIENT_AUTHENTICATION_PARAMETERS_KEY = "pulsar.worker.clientAuthenticationParameters";
public static final String PULSAR_WORKER_CLIENT_AUTHENTICATION_PLUGIN_KEY = "pulsar.worker.clientAuthenticationPlugin";

public static final String PULSAR_AUTHENTICATION_ENABLED_KEY = "pulsar.authentication.enabled";
public static final String PULSAR_AUTHENTICATION_PROVIDERS_KEY = "pulsar.authentication.provider";
public static final String PULSAR_AUTHORIZATION_ENABLED_KEY = "pulsar.authorization.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ pulsar.http.port=22023
pulsar.temp.dir=/pulsar
pulsar.streamer.storage.port=4181

pulsar.worker.enabled=true
#pulsar.worker.clientAuthenticationParameters=
#pulsar.worker.clientAuthenticationPlugin=

#pulsar.authentication.enabled=true
#pulsar.authentication.provider=
#pulsar.authorization.enabled=true
Expand Down

0 comments on commit a381ee0

Please sign in to comment.