From d144f1ef68affad3b52f9aef591f6e12b52dc2dc Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 1 Jun 2022 02:14:49 -0500 Subject: [PATCH] Configure DLog Bookie, Pulsar, and Admin clients via pass through config (#15818) (cherry picked from commit aa673498f88d0ed4f9d5788a5036355834ea5119) --- conf/broker.conf | 14 ++- conf/functions_worker.yml | 20 +++ conf/proxy.conf | 4 + conf/websocket.conf | 4 + .../broker/BookKeeperClientFactoryImpl.java | 16 +-- .../apache/pulsar/broker/PulsarService.java | 33 +++-- .../broker/namespace/NamespaceService.java | 6 + .../pulsar/broker/service/BrokerService.java | 21 +++- .../pulsar/compaction/CompactorTool.java | 6 + ...ternalClientConfigurationOverrideTest.java | 115 ++++++++++++++++++ ...PulsarClientConfigurationOverrideTest.java | 56 +++++++++ .../proxy/ProxyConfigurationTest.java | 6 + .../client/admin/PulsarAdminBuilder.java | 23 ++++ .../internal/PulsarAdminBuilderImpl.java | 9 +- .../client/internal/PropertiesUtils.java | 64 ++++++++++ .../src/test/resources/test_worker_config.yml | 3 + .../functions/worker/PulsarWorkerService.java | 12 +- .../pulsar/functions/worker/WorkerUtils.java | 45 ++++++- .../functions/worker/WorkerUtilsTest.java | 19 +++ .../bookkeeper/BookKeeperPackagesStorage.java | 8 ++ ...ookKeeperPackagesStorageConfiguration.java | 4 + .../core/PackagesStorageConfiguration.java | 6 + .../DefaultPackagesStorageConfiguration.java | 5 + .../pulsar/proxy/server/ProxyConnection.java | 15 ++- .../pulsar/websocket/WebSocketService.java | 7 +- site2/docs/reference-configuration.md | 21 ++++ 26 files changed, 506 insertions(+), 36 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java diff --git a/conf/broker.conf b/conf/broker.conf index 779a70ddef2b1..d97daf3c9f7ca 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -680,6 +680,9 @@ brokerClientTlsCiphers= # used by the internal client to authenticate with Pulsar brokers brokerClientTlsProtocols= +# You can add extra configuration options for the Pulsar Client and the Pulsar Admin Client +# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration +# and before the above brokerClient configurations named above. ### --- Metadata Store --- ### @@ -936,8 +939,11 @@ managedLedgerDefaultAckQuorum=2 # in case of lack of enough bookies #bookkeeper_opportunisticStriping=false -# you can add other configuration options for the BookKeeper client -# by prefixing them with bookkeeper_ +# You can add other configuration options for the BookKeeper client +# by prefixing them with "bookkeeper_". These configurations are applied +# to all bookkeeper clients started by the broker (including the managed ledger bookkeeper clients as well as +# the BookkeeperPackagesStorage bookkeeper client), except the distributed log bookkeeper client. +# The dlog bookkeeper client is configured in the functions worker configuration file. # How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). # Default is 60 seconds @@ -1403,6 +1409,10 @@ packagesReplicas=1 # The bookkeeper ledger root path packagesManagementLedgerRootPath=/ledgers +# When using BookKeeperPackagesStorageProvider, you can configure the +# bookkeeper client by prefixing configurations with "bookkeeper_". +# This config applies to managed ledger bookkeeper clients, as well. + ### --- Packages management service configuration variables (end) --- ### diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index f12832595c527..733778f16bf1f 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -379,6 +379,26 @@ validateConnectorConfig: false # If it is set to true, you must ensure that it has been initialized by "bin/pulsar initialize-cluster-metadata" command. initializedDlogMetadata: false +########################### +# Arbitrary Configuration +########################### +# When a configuration parameter is not explicitly named in the WorkerConfig class, it is only accessible from the +# properties map. This map can be configured by supplying values to the properties map in this config file. + +# Configure the DLog bookkeeper client by prefixing configurations with "bookkeeper_". Because these are arbitrary, they +# must be added to the properties map to get correctly applied. This configuration applies to the Dlog bookkeeper client +# in both the standalone function workers and function workers initialized in the broker. + +# You can add extra configuration options for the Pulsar Client and the Pulsar Admin Client +# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration +# and before the above brokerClient configurations named above. + +## For example, when using the token authentication provider (AuthenticationProviderToken), you must configure several +## custom configurations. Here is a sample for configuring one of the necessary configs: +#properties: +# tokenPublicKey: "file:///path/to/my/key" +# tokenPublicAlg: "RSA256" + ### --- Deprecated settings --- ### configurationStoreServers: localhost:2181 diff --git a/conf/proxy.conf b/conf/proxy.conf index 9f83278d7a02b..d7484c8148642 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -176,6 +176,10 @@ tlsEnabledWithBroker=false # Tls cert refresh duration in seconds (set 0 to check on every new connection) tlsCertRefreshCheckDurationSec=300 +# You can add extra configuration options for the Pulsar Client +# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration +# and before the above brokerClient configurations named above. + ##### --- Rate Limiting --- ##### # Max concurrent inbound connections. The proxy will reject requests beyond that. diff --git a/conf/websocket.conf b/conf/websocket.conf index 797ea357176d4..1410fe131f113 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -93,6 +93,10 @@ brokerClientAuthenticationPlugin= brokerClientAuthenticationParameters= brokerClientTrustCertsFilePath= +# You can add extra configuration options for the Pulsar Client +# by prefixing them with "brokerClient_". These configurations are applied after hard coded configuration +# and before the above brokerClient configurations named above. + # When this parameter is not empty, unauthenticated users perform as anonymousUserRole anonymousUserRole= diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java index 7a59ef4629fde..62fdd20b1ac9a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java @@ -29,7 +29,6 @@ import java.io.IOException; import java.util.Map; import java.util.Optional; -import java.util.Properties; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; @@ -43,6 +42,7 @@ import org.apache.bookkeeper.stats.StatsLogger; import org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping; import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy; +import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.metadata.api.MetadataStore; @@ -150,15 +150,11 @@ ClientConfiguration createBkClientConfiguration(MetadataStoreExtended store, Ser conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS); bkConf.setGetBookieInfoRetryIntervalSeconds( conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS); - Properties allProps = conf.getProperties(); - allProps.forEach((key, value) -> { - String sKey = key.toString(); - if (sKey.startsWith("bookkeeper_") && value != null) { - String bkExtraConfigKey = sKey.substring(11); - log.info("Extra BookKeeper client configuration {}, setting {}={}", sKey, bkExtraConfigKey, value); - bkConf.setProperty(bkExtraConfigKey, value); - } - }); + PropertiesUtils.filterAndMapProperties(conf.getProperties(), "bookkeeper_") + .forEach((key, value) -> { + log.info("Applying BookKeeper client configuration setting {}={}", key, value); + bkConf.setProperty(key, value); + }); return bkConf; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 1c226b2705a2b..e1ffbe757ad8f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -127,6 +127,8 @@ import org.apache.pulsar.client.api.transaction.TransactionBufferClient; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; +import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; @@ -1363,10 +1365,19 @@ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) public synchronized PulsarClient getClient() throws PulsarServerException { if (this.client == null) { try { - ClientConfigurationData conf = new ClientConfigurationData(); + ClientConfigurationData initialConf = new ClientConfigurationData(); - // Disable memory limit for broker client - conf.setMemoryLimitBytes(0); + // Disable memory limit for broker client and disable stats + initialConf.setMemoryLimitBytes(0); + initialConf.setStatsIntervalSeconds(0); + + // Apply all arbitrary configuration. This must be called before setting any fields annotated as + // @Secret on the ClientConfigurationData object because of the way they are serialized. + // See https://github.com/apache/pulsar/issues/8509 for more information. + Map overrides = PropertiesUtils + .filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_"); + ClientConfigurationData conf = + ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class); conf.setServiceUrl(this.getConfiguration().isTlsEnabled() ? this.brokerServiceUrlTls : this.brokerServiceUrl); @@ -1395,8 +1406,6 @@ public synchronized PulsarClient getClient() throws PulsarServerException { this.getConfiguration().getBrokerClientAuthenticationPlugin(), this.getConfiguration().getBrokerClientAuthenticationParameters())); } - - conf.setStatsIntervalSeconds(0); this.client = createClientImpl(conf); } catch (Exception e) { throw new PulsarServerException(e); @@ -1416,10 +1425,16 @@ public synchronized PulsarAdmin getAdminClient() throws PulsarServerException { + ", webServiceAddressTls: " + webServiceAddressTls + ", webServiceAddress: " + webServiceAddress); } - PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) // - .authentication(// - conf.getBrokerClientAuthenticationPlugin(), // - conf.getBrokerClientAuthenticationParameters()); + PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl); + + // Apply all arbitrary configuration. This must be called before setting any fields annotated as + // @Secret on the ClientConfigurationData object because of the way they are serialized. + // See https://github.com/apache/pulsar/issues/8509 for more information. + builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_")); + + builder.authentication( + conf.getBrokerClientAuthenticationPlugin(), + conf.getBrokerClientAuthenticationParameters()); if (conf.isBrokerClientTlsEnabled()) { builder.tlsCiphers(config.getBrokerClientTlsCiphers()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 6b078107b6d98..58abf48d9b0ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -66,6 +66,7 @@ import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -1255,6 +1256,11 @@ public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) { .enableTcpNoDelay(false) .statsInterval(0, TimeUnit.SECONDS); + // Apply all arbitrary configuration. This must be called before setting any fields annotated as + // @Secret on the ClientConfigurationData object because of the way they are serialized. + // See https://github.com/apache/pulsar/issues/8509 for more information. + clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_")); + if (pulsar.getConfiguration().isAuthenticationEnabled()) { clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), pulsar.getConfiguration().getBrokerClientAuthenticationParameters()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6fe48994bf0c3..e05bd7bd9c282 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -130,6 +130,7 @@ import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.BindAddress; import org.apache.pulsar.common.configuration.FieldContext; @@ -1204,6 +1205,12 @@ public PulsarClient getReplicationClient(String cluster, Optional c // Disable memory limit for replication client clientBuilder.memoryLimit(0, SizeUnit.BYTES); + // Apply all arbitrary configuration. This must be called before setting any fields annotated as + // @Secret on the ClientConfigurationData object because of the way they are serialized. + // See https://github.com/apache/pulsar/issues/8509 for more information. + clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(pulsar.getConfiguration().getProperties(), + "brokerClient_")); + if (data.getAuthenticationPlugin() != null && data.getAuthenticationParameters() != null) { clientBuilder.authentication(data.getAuthenticationPlugin(), data.getAuthenticationParameters()); } else if (pulsar.getConfiguration().isAuthenticationEnabled()) { @@ -1279,10 +1286,16 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional c boolean isTlsUrl = conf.isBrokerClientTlsEnabled() && isNotBlank(data.getServiceUrlTls()); String adminApiUrl = isTlsUrl ? data.getServiceUrlTls() : data.getServiceUrl(); - PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) - .authentication( - conf.getBrokerClientAuthenticationPlugin(), - conf.getBrokerClientAuthenticationParameters()); + PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl); + + // Apply all arbitrary configuration. This must be called before setting any fields annotated as + // @Secret on the ClientConfigurationData object because of the way they are serialized. + // See https://github.com/apache/pulsar/issues/8509 for more information. + builder.loadConf(PropertiesUtils.filterAndMapProperties(conf.getProperties(), "brokerClient_")); + + builder.authentication( + conf.getBrokerClientAuthenticationPlugin(), + conf.getBrokerClientAuthenticationParameters()); if (isTlsUrl) { builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java index 8e397afcf910d..691217d9d77bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java @@ -38,6 +38,7 @@ import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.CmdGenerateDocs; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -105,6 +106,11 @@ public static void main(String[] args) throws Exception { ClientBuilder clientBuilder = PulsarClient.builder(); + // Apply all arbitrary configuration. This must be called before setting any fields annotated as + // @Secret on the ClientConfigurationData object because of the way they are serialized. + // See https://github.com/apache/pulsar/issues/8509 for more information. + clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(brokerConfig.getProperties(), "brokerClient_")); + if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) { clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(), brokerConfig.getBrokerClientAuthenticationParameters()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java new file mode 100644 index 0000000000000..775636c9489fb --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.client.admin.internal.PulsarAdminImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ClusterDataImpl; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Optional; +import java.util.Properties; + +public class BrokerInternalClientConfigurationOverrideTest extends BrokerTestBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testPulsarServiceAdminClientConfiguration() throws PulsarServerException { + Properties config = pulsar.getConfiguration().getProperties(); + config.setProperty("brokerClient_operationTimeoutMs", "60000"); + config.setProperty("brokerClient_statsIntervalSeconds", "10"); + ClientConfigurationData clientConf = ((PulsarAdminImpl) pulsar.getAdminClient()).getClientConfigData(); + Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000); + Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10); + } + + @Test + public void testPulsarServicePulsarClientConfiguration() throws PulsarServerException { + Properties config = pulsar.getConfiguration().getProperties(); + config.setProperty("brokerClient_operationTimeoutMs", "60000"); + config.setProperty("brokerClient_statsIntervalSeconds", "10"); + pulsar.getConfiguration().setBrokerClientAuthenticationParameters("sensitive"); + ClientConfigurationData clientConf = ((PulsarClientImpl) pulsar.getClient()).getConfiguration(); + Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000); + // Config should override internal default, which is 0. + Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10); + Assert.assertEquals(clientConf.getAuthParams(), "sensitive"); + } + + @Test + public void testBrokerServicePulsarClientConfiguration() { + // This data only needs to have the service url for this test. + ClusterData data = ClusterData.builder().serviceUrl("http://localhost:8080").build(); + + // Set the configs and set some configs that won't apply + Properties config = pulsar.getConfiguration().getProperties(); + config.setProperty("brokerClient_operationTimeoutMs", "60000"); + config.setProperty("brokerClient_statsIntervalSeconds", "10"); + config.setProperty("memoryLimitBytes", "10"); + config.setProperty("brokerClient_memoryLimitBytes", "100000"); + + PulsarClientImpl client = (PulsarClientImpl) pulsar.getBrokerService() + .getReplicationClient("an_arbitrary_name", Optional.of(data)); + ClientConfigurationData clientConf = client.getConfiguration(); + Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000); + // Config should override internal default, which is 0. + Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10); + // This config defaults to 0 (for good reason), but it could be overridden by configuration. + Assert.assertEquals(clientConf.getMemoryLimitBytes(), 100000); + } + + @Test + public void testNamespaceServicePulsarClientConfiguration() { + // This data only needs to have the service url for this test. + ClusterDataImpl data = (ClusterDataImpl) ClusterData.builder().serviceUrl("http://localhost:8080").build(); + + // Set the configs and set some configs that won't apply + Properties config = pulsar.getConfiguration().getProperties(); + config.setProperty("brokerClient_operationTimeoutMs", "60000"); + config.setProperty("brokerClient_statsIntervalSeconds", "10"); + config.setProperty("memoryLimitBytes", "10"); + config.setProperty("brokerClient_memoryLimitBytes", "100000"); + + PulsarClientImpl client = pulsar.getNamespaceService().getNamespaceClient(data); + ClientConfigurationData clientConf = client.getConfiguration(); + Assert.assertEquals(clientConf.getOperationTimeoutMs(), 60000); + // Config should override internal default, which is 0. + Assert.assertEquals(clientConf.getStatsIntervalSeconds(), 10); + // This config defaults to 0 (for good reason), but it could be overridden by configuration. + Assert.assertEquals(clientConf.getMemoryLimitBytes(), 100000); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java new file mode 100644 index 0000000000000..4f885ecc46b07 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarClientConfigurationOverrideTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.internal.PropertiesUtils; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Map; + +public class PulsarClientConfigurationOverrideTest { + @Test + public void testFilterAndMapProperties() { + // Create a default config + ServiceConfiguration conf = new ServiceConfiguration(); + conf.getProperties().setProperty("keepAliveIntervalSeconds", "15"); + conf.getProperties().setProperty("brokerClient_keepAliveIntervalSeconds", "25"); + + // Apply the filtering and mapping logic + Map result = PropertiesUtils.filterAndMapProperties(conf.getProperties(), "brokerClient_"); + + // Ensure the results match expectations + Assert.assertEquals(result.size(), 1, "The filtered map should have one entry."); + Assert.assertNull(result.get("brokerClient_keepAliveIntervalSeconds"), + "The mapped prop should not be in the result."); + Assert.assertEquals(result.get("keepAliveIntervalSeconds"), "25", "The original value is overridden."); + + // Create sample ClientBuilder + ClientBuilder builder = PulsarClient.builder(); + Assert.assertEquals( + ((ClientBuilderImpl) builder).getClientConfigurationData().getKeepAliveIntervalSeconds(), 30); + // Note: this test would fail if any @Secret fields were set before the loadConf and the accessed afterwards. + builder.loadConf(result); + Assert.assertEquals( + ((ClientBuilderImpl) builder).getClientConfigurationData().getKeepAliveIntervalSeconds(), 25); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java index f60b1bc0add72..26cf8a0e1549f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java @@ -63,6 +63,9 @@ public Object[][] setProxyConfig() { public void configTest(int numIoThreads, int connectionsPerBroker) throws Exception { config.setWebSocketNumIoThreads(numIoThreads); config.setWebSocketConnectionsPerBroker(connectionsPerBroker); + config.getProperties().setProperty("brokerClient_serviceUrl", "https://broker.com:8080"); + config.setServiceUrl("http://localhost:8080"); + config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100"); WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); service.start(); @@ -70,6 +73,9 @@ public void configTest(int numIoThreads, int connectionsPerBroker) throws Except PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient(); assertEquals(client.getConfiguration().getNumIoThreads(), numIoThreads); assertEquals(client.getConfiguration().getConnectionsPerBroker(), connectionsPerBroker); + assertEquals(client.getConfiguration().getServiceUrl(), "http://localhost:8080", + "brokerClient_ configs take precedence"); + assertEquals(client.getConfiguration().getLookupTimeoutMs(), 100); service.close(); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java index 9f8b4be140908..c685c1f77936d 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java @@ -36,6 +36,29 @@ public interface PulsarAdminBuilder { */ PulsarAdmin build() throws PulsarClientException; + /** + * Load the configuration from provided config map. + * + *

Example: + * + *

+     * {@code
+     * Map config = new HashMap<>();
+     * config.put("serviceHttpUrl", "http://localhost:6650");
+     *
+     * PulsarAdminBuilder builder = ...;
+     * builder = builder.loadConf(config);
+     *
+     * PulsarAdmin client = builder.build();
+     * }
+     * 
+ * + * @param config + * configuration to load + * @return the client builder instance + */ + PulsarAdminBuilder loadConf(Map config); + /** * Create a copy of the current client builder. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java index 70463b7fb4e9a..d86b9e73457ca 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java @@ -28,10 +28,11 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { - protected final ClientConfigurationData conf; + protected ClientConfigurationData conf; private int connectTimeout = PulsarAdminImpl.DEFAULT_CONNECT_TIMEOUT_SECONDS; private int readTimeout = PulsarAdminImpl.DEFAULT_READ_TIMEOUT_SECONDS; private int requestTimeout = PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS; @@ -62,6 +63,12 @@ public PulsarAdminBuilder clone() { return new PulsarAdminBuilderImpl(conf.clone()); } + @Override + public PulsarAdminBuilder loadConf(Map config) { + conf = ConfigurationDataUtils.loadData(config, conf, ClientConfigurationData.class); + return this; + } + @Override public PulsarAdminBuilder serviceHttpUrl(String serviceHttpUrl) { conf.setServiceUrl(serviceHttpUrl); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java new file mode 100644 index 0000000000000..4a418b1d5158e --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PropertiesUtils.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.internal; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Internal utility methods for filtering and mapping {@link Properties} objects. + */ +public class PropertiesUtils { + + /** + * Filters the {@link Properties} object so that only properties with the configured prefix are retained, + * and then removes that prefix and puts the key value pairs into the result map. + * @param props - the properties object to filter + * @param prefix - the prefix to filter against and then remove for keys in the resulting map + * @return a map of properties + */ + public static Map filterAndMapProperties(Properties props, String prefix) { + return filterAndMapProperties(props, prefix, ""); + } + + /** + * Filters the {@link Properties} object so that only properties with the configured prefix are retained, + * and then replaces the srcPrefix with the targetPrefix when putting the key value pairs in the resulting map. + * @param props - the properties object to filter + * @param srcPrefix - the prefix to filter against and then remove for keys in the resulting map + * @param targetPrefix - the prefix to add to keys in the result map + * @return a map of properties + */ + public static Map filterAndMapProperties(Properties props, String srcPrefix, String targetPrefix) { + Map result = new HashMap<>(); + int prefixLength = srcPrefix.length(); + props.forEach((keyObject, value) -> { + if (!(keyObject instanceof String)) { + return; + } + String key = (String) keyObject; + if (key.startsWith(srcPrefix) && value != null) { + String truncatedKey = key.substring(prefixLength); + result.put(targetPrefix + truncatedKey, value); + } + }); + return result; + } +} diff --git a/pulsar-functions/src/test/resources/test_worker_config.yml b/pulsar-functions/src/test/resources/test_worker_config.yml index 4614ca3cfd1c2..f0ecf2bd71bc6 100644 --- a/pulsar-functions/src/test/resources/test_worker_config.yml +++ b/pulsar-functions/src/test/resources/test_worker_config.yml @@ -23,4 +23,7 @@ pulsarServiceUrl: pulsar://localhost:6650 functionMetadataTopicName: test-function-metadata-topic numFunctionPackageReplicas: 3 maxPendingAsyncRequests: 200 +properties: + # Fake Bookkeeper Client config to be applied to the DLog Bookkeeper Client + bookkeeper_testKey: "fakeValue" diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java index 93ce035e94ffb..6dbdb592eaff2 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java @@ -133,7 +133,8 @@ public PulsarAdmin newPulsarAdmin(String pulsarServiceUrl, WorkerConfig workerCo workerConfig.getBrokerClientAuthenticationParameters(), workerConfig.getBrokerClientTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection(), - workerConfig.isTlsEnableHostnameVerification()); + workerConfig.isTlsEnableHostnameVerification(), + workerConfig); } else { return WorkerUtils.getPulsarAdminClient( pulsarServiceUrl, @@ -141,7 +142,8 @@ public PulsarAdmin newPulsarAdmin(String pulsarServiceUrl, WorkerConfig workerCo null, null, workerConfig.isTlsAllowInsecureConnection(), - workerConfig.isTlsEnableHostnameVerification()); + workerConfig.isTlsEnableHostnameVerification(), + workerConfig); } } @@ -156,7 +158,8 @@ public PulsarClient newPulsarClient(String pulsarServiceUrl, WorkerConfig worker workerConfig.isUseTls(), workerConfig.getBrokerClientTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection(), - workerConfig.isTlsEnableHostnameVerification()); + workerConfig.isTlsEnableHostnameVerification(), + workerConfig); } else { return WorkerUtils.getPulsarClient( pulsarServiceUrl, @@ -165,7 +168,8 @@ public PulsarClient newPulsarClient(String pulsarServiceUrl, WorkerConfig worker null, null, workerConfig.isTlsAllowInsecureConnection(), - workerConfig.isTlsEnableHostnameVerification()); + workerConfig.isTlsEnableHostnameVerification(), + workerConfig); } } }; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java index d5733ce3e0ab3..d50d57ee27c19 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl; @@ -160,6 +161,13 @@ public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig) workerConfig.getBookkeeperClientAuthenticationParameters()); } } + // Map arbitrary bookkeeper client configuration into DLog Config. Note that this only configures the + // bookie client. + PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), "bookkeeper_", "bkc.") + .forEach((key, value) -> { + log.info("Applying DLog BookKeeper client configuration setting {}={}", key, value); + conf.setProperty(key, value); + }); return conf; } @@ -219,12 +227,20 @@ public static URI initializeDlogNamespace(InternalConfigurationData internalConf } public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl) { - return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null, null, null); + return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null, null, null, null); } public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, String authPlugin, String authParams, String tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection, Boolean enableTlsHostnameVerificationEnable) { + return getPulsarAdminClient(pulsarWebServiceUrl, authPlugin, authParams, tlsTrustCertsFilePath, + allowTlsInsecureConnection, enableTlsHostnameVerificationEnable, null); + } + + public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, String authPlugin, String authParams, + String tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection, + Boolean enableTlsHostnameVerificationEnable, + WorkerConfig workerConfig) { log.info("Create Pulsar Admin to service url {}: " + "authPlugin = {}, authParams = {}, " + "tlsTrustCerts = {}, allowTlsInsecureConnector = {}, enableTlsHostnameVerification = {}", @@ -232,6 +248,13 @@ public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, Strin tlsTrustCertsFilePath, allowTlsInsecureConnection, enableTlsHostnameVerificationEnable); try { PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl); + if (workerConfig != null) { + // Apply all arbitrary configuration. This must be called before setting any fields annotated as + // @Secret on the ClientConfigurationData object because of the way they are serialized. + // See https://github.com/apache/pulsar/issues/8509 for more information. + adminBuilder.loadConf( + PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), "brokerClient_")); + } if (isNotBlank(authPlugin) && isNotBlank(authParams)) { adminBuilder.authentication(authPlugin, authParams); } @@ -244,6 +267,7 @@ public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, Strin if (enableTlsHostnameVerificationEnable != null) { adminBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable); } + return adminBuilder.build(); } catch (PulsarClientException e) { log.error("Error creating pulsar admin client", e); @@ -253,17 +277,33 @@ public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, Strin public static PulsarClient getPulsarClient(String pulsarServiceUrl) { return getPulsarClient(pulsarServiceUrl, null, null, null, - null, null, null); + null, null, null, null); } public static PulsarClient getPulsarClient(String pulsarServiceUrl, String authPlugin, String authParams, Boolean useTls, String tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection, Boolean enableTlsHostnameVerificationEnable) { + return getPulsarClient(pulsarServiceUrl, authPlugin, authParams, useTls, tlsTrustCertsFilePath, + allowTlsInsecureConnection, enableTlsHostnameVerificationEnable, null); + } + + public static PulsarClient getPulsarClient(String pulsarServiceUrl, String authPlugin, String authParams, + Boolean useTls, String tlsTrustCertsFilePath, + Boolean allowTlsInsecureConnection, + Boolean enableTlsHostnameVerificationEnable, + WorkerConfig workerConfig) { try { ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl); + if (workerConfig != null) { + // Apply all arbitrary configuration. This must be called before setting any fields annotated as + // @Secret on the ClientConfigurationData object because of the way they are serialized. + // See https://github.com/apache/pulsar/issues/8509 for more information. + clientBuilder.loadConf( + PropertiesUtils.filterAndMapProperties(workerConfig.getProperties(), "brokerClient_")); + } if (isNotBlank(authPlugin) && isNotBlank(authParams)) { clientBuilder.authentication(authPlugin, authParams); @@ -280,7 +320,6 @@ && isNotBlank(authParams)) { if (enableTlsHostnameVerificationEnable != null) { clientBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable); } - return clientBuilder.build(); } catch (PulsarClientException e) { log.error("Error creating pulsar client", e); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java index d899db1323748..b2e0f0f354cbc 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java @@ -40,8 +40,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import org.apache.distributedlog.DistributedLogConfiguration; public class WorkerUtilsTest { @@ -99,4 +104,18 @@ public Boolean get() { } } + + @Test + public void testDLogConfiguration() throws URISyntaxException, IOException { + // The config yml is seeded with a fake bookie config. + URL yamlUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + WorkerConfig config = WorkerConfig.load(yamlUrl.toURI().getPath()); + + // Map the config. + DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(config); + + // Verify the outcome. + assertEquals(dlogConf.getString("bkc.testKey"), "fakeValue", + "The bookkeeper client config mapping should apply."); + } } \ No newline at end of file diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java index 3da138475ce8c..1571fd504ad13 100644 --- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java +++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java @@ -37,6 +37,7 @@ import org.apache.distributedlog.impl.metadata.BKDLConfig; import org.apache.distributedlog.metadata.DLMetadata; import org.apache.distributedlog.namespace.NamespaceDriver; +import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.packages.management.core.PackagesStorage; import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration; import org.apache.zookeeper.KeeperException; @@ -73,6 +74,13 @@ public void initialize() { configuration.getBookkeeperClientAuthenticationParameters()); } } + // Map arbitrary bookkeeper client configuration into DLog Config. Note that this only configures the + // bookie client. + PropertiesUtils.filterAndMapProperties(configuration.getProperties(), "bookkeeper_", "bkc.") + .forEach((key, value) -> { + log.info("Applying DLog BookKeeper client configuration setting {}={}", key, value); + conf.setProperty(key, value); + }); try { this.namespace = NamespaceBuilder.newBuilder() .conf(conf).clientId(NS_CLIENT_ID).uri(initializeDlogNamespace()).build(); diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java index 226b80abeaa30..ce6acecdd5100 100644 --- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java +++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java @@ -58,6 +58,10 @@ String getBookkeeperClientAuthenticationParameters() { return getProperty("bookkeeperClientAuthenticationParameters"); } + @Override + public Properties getProperties() { + return configuration.getProperties(); + } @Override public String getProperty(String key) { diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java index b4044a6338c20..5c346a0d05c42 100644 --- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java +++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java @@ -50,4 +50,10 @@ public interface PackagesStorageConfiguration { * a group of the property */ void setProperty(Properties properties); + + /** + * Get all properties for the configuration. + * @return all properties for the configuration + */ + Properties getProperties(); } diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java index cb35048a360b5..d3c5d7494b370 100644 --- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java +++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java @@ -39,4 +39,9 @@ public void setProperty(String key, String value) { public void setProperty(Properties properties) { this.properties = properties; } + + @Override + public Properties getProperties() { + return this.properties; + } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 08c7c3e53bd77..d770e63dcee2e 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -31,6 +31,7 @@ import java.net.SocketAddress; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; @@ -50,6 +51,8 @@ import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarChannelInitializer; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; +import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.CommandAuthResponse; import org.apache.pulsar.common.api.proto.CommandConnect; @@ -519,9 +522,17 @@ protected void handleLookup(CommandLookupTopic lookup) { } ClientConfigurationData createClientConfiguration() { - ClientConfigurationData clientConf = new ClientConfigurationData(); - clientConf.setServiceUrl(service.getServiceUrl()); + ClientConfigurationData initialConf = new ClientConfigurationData(); + initialConf.setServiceUrl(service.getServiceUrl()); ProxyConfiguration proxyConfig = service.getConfiguration(); + // Apply all arbitrary configuration. This must be called before setting any fields annotated as + // @Secret on the ClientConfigurationData object because of the way they are serialized. + // See https://github.com/apache/pulsar/issues/8509 for more information. + Map overrides = PropertiesUtils + .filterAndMapProperties(proxyConfig.getProperties(), "brokerClient_"); + ClientConfigurationData clientConf = ConfigurationDataUtils + .loadData(overrides, initialConf, ClientConfigurationData.class); + clientConf.setAuthentication(this.getClientAuthentication()); if (proxyConfig.isTlsEnabledWithBroker()) { clientConf.setUseTls(true); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 5a81d9f21a2fe..dbac405a0e0cf 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -181,6 +182,11 @@ private PulsarClient createClientInstance(ClusterData clusterData) throws IOExce .ioThreads(config.getWebSocketNumIoThreads()) // .connectionsPerBroker(config.getWebSocketConnectionsPerBroker()); + // Apply all arbitrary configuration. This must be called before setting any fields annotated as + // @Secret on the ClientConfigurationData object because of the way they are serialized. + // See https://github.com/apache/pulsar/issues/8509 for more information. + clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_")); + if (isNotBlank(config.getBrokerClientAuthenticationPlugin()) && isNotBlank(config.getBrokerClientAuthenticationParameters())) { clientBuilder.authentication(config.getBrokerClientAuthenticationPlugin(), @@ -198,7 +204,6 @@ && isNotBlank(config.getBrokerClientAuthenticationParameters())) { } else { clientBuilder.serviceUrl(clusterData.getServiceUrl()); } - return clientBuilder.build(); } diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index 4ca94c5a65f6e..449f159f2ff8c 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -371,6 +371,15 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater | brokerEntryMetadataInterceptors | Set broker entry metadata interceptors.

Multiple interceptors should be separated by commas.

Available values:

  • org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor
  • org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor


  • Example
    brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor, org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor|N/A | | enableExposingBrokerEntryMetadataToClient|Whether to expose broker entry metadata to client or not.

    Available values:
  • true
  • false

  • Example
    enableExposingBrokerEntryMetadataToClient=true | false | +#### Configuration Override For Clients Internal to Broker + +It's possible to configure some clients by using the appropriate prefix. + +|Prefix|Description| +|brokerClient_| Configure **all** the broker's Pulsar Clients and Pulsar Admin Clients. These configurations are applied after hard coded configuration and before the above brokerClient configurations named above.| +|bookkeeper_| Configure the broker's bookkeeper clients used by managed ledgers and the BookkeeperPackagesStorage bookkeeper client. Takes precedence over most other configuration values.| + +Note: when running the function worker within the broker, these prefixed configurations do not apply to any of those clients. You must instead configure those clients using the `functions_worker.yml`. #### Deprecated parameters of Broker The following parameters have been deprecated in the `conf/broker.conf` file. @@ -737,6 +746,12 @@ The following parameters have been deprecated in the `conf/standalone.conf` file |tlsCertificateFilePath||| |tlsKeyFilePath ||| |tlsTrustCertsFilePath||| +#### Configuration Override For Clients Internal to WebSocket + +It's possible to configure some clients by using the appropriate prefix. + +|Prefix|Description| +|brokerClient_| Configure **all** the broker's Pulsar Clients. These configurations are applied after hard coded configuration and before the above brokerClient configurations named above.| #### Deprecated parameters of WebSocket The following parameters have been deprecated in the `conf/websocket.conf` file. @@ -802,6 +817,12 @@ The [Pulsar proxy](concepts-architecture-overview.md#pulsar-proxy) can be config |haProxyProtocolEnabled | Enable or disable the [HAProxy](http://www.haproxy.org/) protocol. |false| | numIOThreads | Number of threads used for Netty IO. | 2 * Runtime.getRuntime().availableProcessors() | | numAcceptorThreads | Number of threads used for Netty Acceptor. | 1 | +#### Configuration Override For Clients Internal to Proxy + +It's possible to configure some clients by using the appropriate prefix. + +|Prefix|Description| +|brokerClient_| Configure **all** the proxy's Pulsar Clients. These configurations are applied after hard coded configuration and before the above brokerClient configurations named above.| #### Deprecated parameters of Pulsar proxy The following parameters have been deprecated in the `conf/proxy.conf` file.