From a381ee054c16d59c072b84cb2eb691330a6b6315 Mon Sep 17 00:00:00 2001 From: jetoile Date: Wed, 27 Nov 2019 15:18:25 +0100 Subject: [PATCH] allow to deactivate pulsar worker and set clientAuthenticationParameters and clientAuthenticationPlugin --- .../hadoopunit/component/PulsarBootstrap.java | 95 ++++++++++++------- .../hadoopunit/component/PulsarConfig.java | 4 + .../resources/hadoop-unit-default.properties | 4 + 3 files changed, 67 insertions(+), 36 deletions(-) diff --git a/hadoop-unit-pulsar/src/main/java/fr/jetoile/hadoopunit/component/PulsarBootstrap.java b/hadoop-unit-pulsar/src/main/java/fr/jetoile/hadoopunit/component/PulsarBootstrap.java index 46dd6783..f6d6c804 100644 --- a/hadoop-unit-pulsar/src/main/java/fr/jetoile/hadoopunit/component/PulsarBootstrap.java +++ b/hadoop-unit-pulsar/src/main/java/fr/jetoile/hadoopunit/component/PulsarBootstrap.java @@ -48,6 +48,9 @@ public class PulsarBootstrap implements Bootstrap { private String ip; private String tmpDirPath; private int streamerStoragePort; + private boolean workerEnabled; + private String workerClientAuthenticationParameters; + private String workerClientAuthenticationPlugin; private boolean authenticationEnabled; private String authenticationProviders; @@ -116,6 +119,10 @@ private void loadConfig() { String[] extraConfsAsList = configuration.getStringArray(PulsarConfig.PULSAR_EXTRA_CONF_KEY); extraConf = Arrays.asList(extraConfsAsList).stream().collect(Collectors.toMap(c -> c.split("=")[0], c -> c.split("=")[1])); + workerEnabled = configuration.getBoolean(PulsarConfig.PULSAR_WORKER_ENABLED_KEY, true); + workerClientAuthenticationParameters = configuration.getString(PulsarConfig.PULSAR_WORKER_CLIENT_AUTHENTICATION_PARAMETERS_KEY); + workerClientAuthenticationPlugin = configuration.getString(PulsarConfig.PULSAR_WORKER_CLIENT_AUTHENTICATION_PLUGIN_KEY); + zookeeperHost = configuration.getString(ZOOKEEPER_HOST_CLIENT_KEY); zookeeperPort = configuration.getInt(ZOOKEEPER_PORT_KEY); } @@ -157,6 +164,15 @@ public void loadConfig(Map configs) { String[] extraConfsString = extraConfList.split(","); extraConf = Arrays.asList(extraConfsString).stream().collect(Collectors.toMap(c -> c.split("=")[0], c -> c.split("=")[1])); } + if (StringUtils.isNotEmpty(configs.get(PulsarConfig.PULSAR_WORKER_ENABLED_KEY))) { + workerEnabled = Boolean.parseBoolean(configs.get(PulsarConfig.PULSAR_WORKER_ENABLED_KEY)); + } + if (StringUtils.isNotEmpty(configs.get(PulsarConfig.PULSAR_WORKER_CLIENT_AUTHENTICATION_PLUGIN_KEY))) { + workerClientAuthenticationPlugin = configs.get(PulsarConfig.PULSAR_WORKER_CLIENT_AUTHENTICATION_PLUGIN_KEY); + } + if (StringUtils.isNotEmpty(configs.get(PulsarConfig.PULSAR_WORKER_CLIENT_AUTHENTICATION_PARAMETERS_KEY))) { + workerClientAuthenticationParameters = configs.get(PulsarConfig.PULSAR_WORKER_CLIENT_AUTHENTICATION_PARAMETERS_KEY); + } if (StringUtils.isNotEmpty(configs.get(ZOOKEEPER_PORT_KEY))) { zookeeperPort = Integer.parseInt(configs.get(ZOOKEEPER_PORT_KEY)); @@ -200,43 +216,50 @@ private void build() throws IOException { serviceConfiguration.setProperties(properties); } - workerConfig = new WorkerConfig(); - - workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + port); - workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + httpPort); - workerConfig.setWorkerHostname(ip); - workerConfig.setWorkerPort(httpPort); - workerConfig.setWorkerId("c-" + name + "-fw-" + serviceConfiguration.getAdvertisedAddress() + "-" + workerConfig.getWorkerPort()); - workerConfig.setConfigurationStoreServers(zookeeperServers); - workerConfig.setZooKeeperSessionTimeoutMillis(10000); - workerConfig.setZooKeeperOperationTimeoutSeconds(10000); - workerConfig.setDownloadDirectory(tmpDirPath + "/pulsar_functions"); - workerConfig.setPulsarFunctionsNamespace("public/functions"); - workerConfig.setPulsarFunctionsCluster(name); - workerConfig.setSchedulerClassName("org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler"); - workerConfig.setRescheduleTimeoutMs(60000); - workerConfig.setFailureCheckFreqMs(30000); - workerConfig.setInitialBrokerReconnectMaxRetries(60); - workerConfig.setAssignmentWriteMaxRetries(60); - workerConfig.setInstanceLivenessCheckFreqMs(30000); - workerConfig.setTopicCompactionFrequencySec(1800); - workerConfig.setFunctionAssignmentTopicName("assignments"); - workerConfig.setFunctionMetadataTopicName("metadata"); - workerConfig.setClusterCoordinationTopicName("coordinate"); - workerConfig.setProcessContainerFactory(new WorkerConfig.ProcessContainerFactory() - .setExtraFunctionDependenciesDir(tmpDirPath + "/extraFunctionDependencies") - .setJavaInstanceJarLocation(tmpDirPath + "/javaInstanceJar") - .setLogDirectory(tmpDirPath + "/log")); - workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("functions-thread")); - workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" + streamerStoragePort); - - - functionsWorkerService = new WorkerService(workerConfig); - - // init pulsar service -// pulsarService = new PulsarService(serviceConfiguration, Optional.empty()); - pulsarService = new PulsarService(serviceConfiguration, Optional.ofNullable(functionsWorkerService)); + if (workerEnabled) { + workerConfig = new WorkerConfig(); + + workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + port); + workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + httpPort); + workerConfig.setWorkerHostname(ip); + workerConfig.setWorkerPort(httpPort); + workerConfig.setWorkerId("c-" + name + "-fw-" + serviceConfiguration.getAdvertisedAddress() + "-" + workerConfig.getWorkerPort()); + workerConfig.setConfigurationStoreServers(zookeeperServers); + workerConfig.setZooKeeperSessionTimeoutMillis(10000); + workerConfig.setZooKeeperOperationTimeoutSeconds(10000); + workerConfig.setDownloadDirectory(tmpDirPath + "/pulsar_functions"); + workerConfig.setPulsarFunctionsNamespace("public/functions"); + workerConfig.setPulsarFunctionsCluster(name); + workerConfig.setSchedulerClassName("org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler"); + workerConfig.setRescheduleTimeoutMs(60000); + workerConfig.setFailureCheckFreqMs(30000); + workerConfig.setInitialBrokerReconnectMaxRetries(60); + workerConfig.setAssignmentWriteMaxRetries(60); + workerConfig.setInstanceLivenessCheckFreqMs(30000); + workerConfig.setTopicCompactionFrequencySec(1800); + workerConfig.setFunctionAssignmentTopicName("assignments"); + workerConfig.setFunctionMetadataTopicName("metadata"); + workerConfig.setClusterCoordinationTopicName("coordinate"); + workerConfig.setProcessContainerFactory(new WorkerConfig.ProcessContainerFactory() + .setExtraFunctionDependenciesDir(tmpDirPath + "/extraFunctionDependencies") + .setJavaInstanceJarLocation(tmpDirPath + "/javaInstanceJar") + .setLogDirectory(tmpDirPath + "/log")); + workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("functions-thread")); + workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" + streamerStoragePort); + + if (StringUtils.isNotEmpty(workerClientAuthenticationPlugin)) { + workerConfig.setClientAuthenticationPlugin(workerClientAuthenticationPlugin); + } + if (StringUtils.isNotEmpty(workerClientAuthenticationParameters)) { + workerConfig.setClientAuthenticationParameters(workerClientAuthenticationParameters); + } + functionsWorkerService = new WorkerService(workerConfig); + + pulsarService = new PulsarService(serviceConfiguration, Optional.ofNullable(functionsWorkerService)); + } else { + pulsarService = new PulsarService(serviceConfiguration, Optional.empty()); + } } diff --git a/hadoop-unit-pulsar/src/main/java/fr/jetoile/hadoopunit/component/PulsarConfig.java b/hadoop-unit-pulsar/src/main/java/fr/jetoile/hadoopunit/component/PulsarConfig.java index 4212e2c7..2b5e3692 100644 --- a/hadoop-unit-pulsar/src/main/java/fr/jetoile/hadoopunit/component/PulsarConfig.java +++ b/hadoop-unit-pulsar/src/main/java/fr/jetoile/hadoopunit/component/PulsarConfig.java @@ -23,6 +23,10 @@ public class PulsarConfig { public static final String PULSAR_TEMP_DIR_KEY = "pulsar.temp.dir"; public static final String PULSAR_STREAMER_STORAGE_PORT_KEY = "pulsar.streamer.storage.port"; + public static final String PULSAR_WORKER_ENABLED_KEY = "pulsar.worker.enabled"; + public static final String PULSAR_WORKER_CLIENT_AUTHENTICATION_PARAMETERS_KEY = "pulsar.worker.clientAuthenticationParameters"; + public static final String PULSAR_WORKER_CLIENT_AUTHENTICATION_PLUGIN_KEY = "pulsar.worker.clientAuthenticationPlugin"; + public static final String PULSAR_AUTHENTICATION_ENABLED_KEY = "pulsar.authentication.enabled"; public static final String PULSAR_AUTHENTICATION_PROVIDERS_KEY = "pulsar.authentication.provider"; public static final String PULSAR_AUTHORIZATION_ENABLED_KEY = "pulsar.authorization.enabled"; diff --git a/hadoop-unit-pulsar/src/main/resources/hadoop-unit-default.properties b/hadoop-unit-pulsar/src/main/resources/hadoop-unit-default.properties index e77608f2..bb663df5 100644 --- a/hadoop-unit-pulsar/src/main/resources/hadoop-unit-default.properties +++ b/hadoop-unit-pulsar/src/main/resources/hadoop-unit-default.properties @@ -143,6 +143,10 @@ pulsar.http.port=22023 pulsar.temp.dir=/pulsar pulsar.streamer.storage.port=4181 +pulsar.worker.enabled=true +#pulsar.worker.clientAuthenticationParameters= +#pulsar.worker.clientAuthenticationPlugin= + #pulsar.authentication.enabled=true #pulsar.authentication.provider= #pulsar.authorization.enabled=true