diff --git a/sdk/spring/CHANGELOG.md b/sdk/spring/CHANGELOG.md index e170dc8976815..8a16e8d36f457 100644 --- a/sdk/spring/CHANGELOG.md +++ b/sdk/spring/CHANGELOG.md @@ -8,6 +8,7 @@ This section includes changes in `spring-cloud-azure-autoconfigure` module. #### Bugs Fixed - Fix bug: Registered the empty value for ineligible definition, it causes NPE when sending message via bean `StreamBridge`. [#43366](https://github.com/Azure/azure-sdk-for-java/issues/43366). +- Fix bug: Not working when using Spring Kafka and Kafka Binder via connection string auth [#43853](https://github.com/Azure/azure-sdk-for-java/issues/43853). ### Spring Messaging Azure Service Bus This section includes changes in the `spring-messaging-azure-servicebus` module. diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/context/AzureServiceClientBuilderFactoryConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/context/AzureServiceClientBuilderFactoryConfiguration.java index 599d3f6c1e6ad..83ba863cb8406 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/context/AzureServiceClientBuilderFactoryConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/context/AzureServiceClientBuilderFactoryConfiguration.java @@ -4,10 +4,8 @@ package com.azure.spring.cloud.autoconfigure.implementation.context; import com.azure.spring.cloud.core.implementation.factory.AbstractAzureServiceClientBuilderFactory; -import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Role; /** * {@code @Configuration} class that registers a {@link AzureServiceClientBuilderFactoryPostProcessor} @@ -16,7 +14,6 @@ * @since 5.19.0 */ @Configuration(proxyBeanMethods = false) -@Role(BeanDefinition.ROLE_INFRASTRUCTURE) class AzureServiceClientBuilderFactoryConfiguration { /** @@ -24,7 +21,6 @@ class AzureServiceClientBuilderFactoryConfiguration { * @return the BPP. */ @Bean - @Role(BeanDefinition.ROLE_INFRASTRUCTURE) static AzureServiceClientBuilderFactoryPostProcessor builderFactoryBeanPostProcessor() { return new AzureServiceClientBuilderFactoryPostProcessor(); } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaAutoConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaAutoConfiguration.java index b2bbf55eacfa6..ed73cf8d43373 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaAutoConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaAutoConfiguration.java @@ -14,13 +14,13 @@ import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; import org.springframework.core.env.Environment; import org.springframework.kafka.core.KafkaTemplate; @@ -36,7 +36,6 @@ @ConditionalOnClass(KafkaTemplate.class) @ConditionalOnProperty(value = "spring.cloud.azure.eventhubs.kafka.enabled", havingValue = "true", matchIfMissing = true) @AutoConfigureAfter({ AzureEventHubsAutoConfiguration.class, AzureEventHubsResourceManagerAutoConfiguration.class }) -@Import(KafkaPropertiesConfiguration.class) public class AzureEventHubsKafkaAutoConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(AzureEventHubsKafkaAutoConfiguration.class); @@ -56,4 +55,10 @@ StaticConnectionStringProvider eventHubsKafkaConnect return new StaticConnectionStringProvider<>(AzureServiceType.EVENT_HUBS, connectionString); } + + @Bean + @ConditionalOnBean(value = AzureServiceType.EventHubs.class, parameterizedContainer = ServiceConnectionStringProvider.class) + static KafkaPropertiesBeanPostProcessor kafkaPropertiesBeanPostProcessor() { + return new KafkaPropertiesBeanPostProcessor(); + } } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/KafkaPropertiesBeanPostProcessor.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/KafkaPropertiesBeanPostProcessor.java index b55ce88e88976..b8e0e0fef93ed 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/KafkaPropertiesBeanPostProcessor.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/KafkaPropertiesBeanPostProcessor.java @@ -8,8 +8,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.core.ResolvableType; import java.util.ArrayList; import java.util.Collections; @@ -22,16 +26,12 @@ /** * {@link BeanPostProcessor} for {@link KafkaProperties} to configure connection string credentials. */ -class KafkaPropertiesBeanPostProcessor implements BeanPostProcessor { +class KafkaPropertiesBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPropertiesBeanPostProcessor.class); private static final String SASL_CONFIG_VALUE = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";%s"; - private final ServiceConnectionStringProvider connectionStringProvider; - - KafkaPropertiesBeanPostProcessor(ServiceConnectionStringProvider connectionStringProvider) { - this.connectionStringProvider = connectionStringProvider; - } + private ApplicationContext applicationContext; @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { @@ -43,8 +43,17 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw + " instead, which can be set as spring.kafka.boostrap-servers=EventHubsNamespacesFQDN:9093."); KafkaProperties kafkaProperties = (KafkaProperties) bean; - String connectionString = connectionStringProvider.getConnectionString(); + ResolvableType provider = ResolvableType.forClassWithGenerics(ServiceConnectionStringProvider.class, AzureServiceType.EventHubs.class); + ObjectProvider> beanProvider = applicationContext.getBeanProvider(provider); + + ServiceConnectionStringProvider connectionStringProvider = beanProvider.getIfAvailable(); + if (connectionStringProvider == null) { + LOGGER.debug("Cannot find a bean of type ServiceConnectionStringProvider, " + + "Spring Cloud Azure will skip performing JAAS enhancements on the KafkaProperties bean."); + return bean; + } + String connectionString = connectionStringProvider.getConnectionString(); String bootstrapServer = new EventHubsConnectionString(connectionString).getFullyQualifiedNamespace() + ":9093"; kafkaProperties.setBootstrapServers(new ArrayList<>(Collections.singletonList(bootstrapServer))); kafkaProperties.getProperties().put(SECURITY_PROTOCOL_CONFIG, SASL_SSL.name()); @@ -55,4 +64,8 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw return bean; } + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/KafkaPropertiesConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/KafkaPropertiesConfiguration.java deleted file mode 100644 index e8c882df3b70b..0000000000000 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/KafkaPropertiesConfiguration.java +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.spring.cloud.autoconfigure.implementation.eventhubs.kafka; - -import com.azure.spring.cloud.autoconfigure.implementation.kafka.AzureEventHubsKafkaOAuth2AutoConfiguration; -import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider; -import com.azure.spring.cloud.core.service.AzureServiceType; -import org.springframework.beans.factory.config.BeanDefinition; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Role; - -/** - * {@code @Configuration} class that registers a {@link KafkaPropertiesBeanPostProcessor} - * bean capable of processing Kafka properties @{@link KafkaProperties}. - * - * @since 5.19.0 - * @deprecated 4.3.0 in favor of {@link AzureEventHubsKafkaOAuth2AutoConfiguration}. - */ -@Deprecated -@Configuration(proxyBeanMethods = false) -@Role(BeanDefinition.ROLE_INFRASTRUCTURE) -class KafkaPropertiesConfiguration { - - @Bean - @Role(BeanDefinition.ROLE_INFRASTRUCTURE) - @ConditionalOnBean(value = AzureServiceType.EventHubs.class, parameterizedContainer = ServiceConnectionStringProvider.class) - static KafkaPropertiesBeanPostProcessor kafkaPropertiesBeanPostProcessor( - ServiceConnectionStringProvider connectionStringProvider) { - return new KafkaPropertiesBeanPostProcessor(connectionStringProvider); - } -} diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jdbc/AzureJdbcAutoConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jdbc/AzureJdbcAutoConfiguration.java index 38a987fc74f06..ac381ec552136 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jdbc/AzureJdbcAutoConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jdbc/AzureJdbcAutoConfiguration.java @@ -3,7 +3,6 @@ package com.azure.spring.cloud.autoconfigure.implementation.jdbc; import com.azure.identity.extensions.implementation.template.AzureAuthenticationTemplate; -import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; @@ -13,7 +12,6 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Role; /** @@ -23,7 +21,6 @@ * @since 4.5.0 */ @Configuration(proxyBeanMethods = false) -@Role(BeanDefinition.ROLE_INFRASTRUCTURE) @ConditionalOnBean(DataSourceProperties.class) @ConditionalOnClass(AzureAuthenticationTemplate.class) @AutoConfigureAfter(DataSourceAutoConfiguration.class) @@ -31,7 +28,6 @@ public class AzureJdbcAutoConfiguration { @Bean @ConditionalOnMissingBean - @Role(BeanDefinition.ROLE_INFRASTRUCTURE) static JdbcPropertiesBeanPostProcessor jdbcConfigurationPropertiesBeanPostProcessor() { return new JdbcPropertiesBeanPostProcessor(); } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/AzureServiceBusJmsPropertiesBeanPostProcessor.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/AzureServiceBusJmsPropertiesBeanPostProcessor.java index 4db04c3dca824..4c600055bba6b 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/AzureServiceBusJmsPropertiesBeanPostProcessor.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/AzureServiceBusJmsPropertiesBeanPostProcessor.java @@ -9,14 +9,18 @@ import org.springframework.beans.BeansException; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.core.ResolvableType; import org.springframework.util.StringUtils; -class AzureServiceBusJmsPropertiesBeanPostProcessor implements BeanPostProcessor { +class AzureServiceBusJmsPropertiesBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware { - private final ObjectProvider> connectionStringProviders; + private ApplicationContext applicationContext; - AzureServiceBusJmsPropertiesBeanPostProcessor(ObjectProvider> connectionStringProviders) { - this.connectionStringProviders = connectionStringProviders; + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; } @Override @@ -24,6 +28,8 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro if (bean instanceof AzureServiceBusJmsProperties) { AzureServiceBusJmsProperties jmsProperties = (AzureServiceBusJmsProperties) bean; if (!StringUtils.hasText(jmsProperties.getConnectionString())) { + ResolvableType providerType = ResolvableType.forClassWithGenerics(ServiceConnectionStringProvider.class, AzureServiceType.ServiceBus.class); + ObjectProvider> connectionStringProviders = applicationContext.getBeanProvider(providerType); connectionStringProviders.ifAvailable(provider -> jmsProperties.setConnectionString(provider.getConnectionString())); } } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsPropertiesConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsPropertiesConfiguration.java index 9097b83924b22..a73e20636a0ee 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsPropertiesConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsPropertiesConfiguration.java @@ -5,14 +5,9 @@ import com.azure.spring.cloud.autoconfigure.implementation.condition.ConditionalOnMissingProperty; import com.azure.spring.cloud.autoconfigure.implementation.jms.properties.AzureServiceBusJmsProperties; -import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider; -import com.azure.spring.cloud.core.service.AzureServiceType; -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Role; /** * {@code @Configuration} class that registers a {@link AzureServiceBusJmsPropertiesBeanPostProcessor} @@ -21,15 +16,12 @@ * @since 5.19.0 */ @Configuration(proxyBeanMethods = false) -@Role(BeanDefinition.ROLE_INFRASTRUCTURE) class ServiceBusJmsPropertiesConfiguration { @Bean @ConditionalOnMissingBean - @Role(BeanDefinition.ROLE_INFRASTRUCTURE) @ConditionalOnMissingProperty(prefix = "spring.jms.servicebus", name = "connection-string") - static AzureServiceBusJmsPropertiesBeanPostProcessor azureServiceBusJmsPropertiesBeanPostProcessor( - ObjectProvider> connectionStringProviders) { - return new AzureServiceBusJmsPropertiesBeanPostProcessor(connectionStringProviders); + static AzureServiceBusJmsPropertiesBeanPostProcessor azureServiceBusJmsPropertiesBeanPostProcessor() { + return new AzureServiceBusJmsPropertiesBeanPostProcessor(); } } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AbstractKafkaPropertiesBeanPostProcessor.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AbstractKafkaPropertiesBeanPostProcessor.java index cf9147764a699..5e34104415015 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AbstractKafkaPropertiesBeanPostProcessor.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AbstractKafkaPropertiesBeanPostProcessor.java @@ -13,9 +13,13 @@ import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.util.ReflectionUtils; import java.lang.reflect.InvocationTargetException; @@ -39,14 +43,16 @@ import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM; import static org.springframework.util.StringUtils.delimitedListToStringArray; -abstract class AbstractKafkaPropertiesBeanPostProcessor implements BeanPostProcessor { +abstract class AbstractKafkaPropertiesBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractKafkaPropertiesBeanPostProcessor.class); static final String SECURITY_PROTOCOL_CONFIG_SASL = SASL_SSL.name(); static final String SASL_MECHANISM_OAUTH = OAUTHBEARER_MECHANISM; static final String AZURE_CONFIGURED_JAAS_OPTIONS_KEY = "azure.configured"; static final String AZURE_CONFIGURED_JAAS_OPTIONS_VALUE = "true"; static final String SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH = KafkaOAuth2AuthenticateCallbackHandler.class.getName(); + protected ApplicationContext applicationContext; protected static final PropertyMapper PROPERTY_MAPPER = new PropertyMapper(); private static final Map KAFKA_OAUTH_CONFIGS; private static final String LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE = "OAUTHBEARER authentication property {} will be configured as {} to support Azure Identity credentials."; @@ -63,18 +69,21 @@ abstract class AbstractKafkaPropertiesBeanPostProcessor implements BeanPostPr KAFKA_OAUTH_CONFIGS = Collections.unmodifiableMap(configs); } - private final AzureGlobalProperties azureGlobalProperties; - - AbstractKafkaPropertiesBeanPostProcessor(AzureGlobalProperties azureGlobalProperties) { - this.azureGlobalProperties = azureGlobalProperties; - } + private AzureGlobalProperties azureGlobalProperties; @SuppressWarnings("unchecked") @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { if (needsPostProcess(bean)) { - T properties = (T) bean; + ObjectProvider beanProvider = applicationContext.getBeanProvider(AzureGlobalProperties.class); + azureGlobalProperties = beanProvider.getIfAvailable(); + if (azureGlobalProperties == null) { + LOGGER.debug("Cannot find a bean of type AzureGlobalProperties, " + + "Spring Cloud Azure will skip performing JAAS enhancements on the {} bean.", beanName); + return bean; + } + T properties = (T) bean; replaceAzurePropertiesWithJaas(getMergedProducerProperties(properties), getRawProducerProperties(properties)); replaceAzurePropertiesWithJaas(getMergedConsumerProperties(properties), getRawConsumerProperties(properties)); replaceAzurePropertiesWithJaas(getMergedAdminProperties(properties), getRawAdminProperties(properties)); @@ -129,6 +138,11 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro protected abstract Logger getLogger(); + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + /** * Process Kafka Spring properties for any customized operations. * @param properties the Kafka Spring properties diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaOAuth2AutoConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaOAuth2AutoConfiguration.java index aa5a62787c0e0..cd86a70cc667d 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaOAuth2AutoConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaOAuth2AutoConfiguration.java @@ -2,15 +2,12 @@ // Licensed under the MIT License. package com.azure.spring.cloud.autoconfigure.implementation.kafka; -import com.azure.spring.cloud.autoconfigure.implementation.context.properties.AzureGlobalProperties; -import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Role; import org.springframework.kafka.core.KafkaTemplate; import static com.azure.spring.cloud.autoconfigure.implementation.context.AzureContextUtils.PASSWORDLESS_KAFKA_PROPERTIES_BEAN_POST_PROCESSOR_BEAN_NAME; @@ -22,14 +19,12 @@ * @since 4.3.0 */ @Configuration(proxyBeanMethods = false) -@Role(BeanDefinition.ROLE_INFRASTRUCTURE) @ConditionalOnClass(KafkaTemplate.class) @ConditionalOnProperty(value = "spring.cloud.azure.eventhubs.kafka.enabled", havingValue = "true", matchIfMissing = true) public class AzureEventHubsKafkaOAuth2AutoConfiguration { @Bean(PASSWORDLESS_KAFKA_PROPERTIES_BEAN_POST_PROCESSOR_BEAN_NAME) - @Role(BeanDefinition.ROLE_INFRASTRUCTURE) - static BeanPostProcessor kafkaPropertiesBeanPostProcessor(AzureGlobalProperties properties) { - return new KafkaPropertiesBeanPostProcessor(properties); + static BeanPostProcessor kafkaPropertiesBeanPostProcessor() { + return new KafkaPropertiesBeanPostProcessor(); } } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureKafkaSpringCloudStreamConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureKafkaSpringCloudStreamConfiguration.java index 44d9b60d10168..b0eeff8eab50f 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureKafkaSpringCloudStreamConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureKafkaSpringCloudStreamConfiguration.java @@ -3,14 +3,11 @@ package com.azure.spring.cloud.autoconfigure.implementation.kafka; import com.azure.spring.cloud.autoconfigure.implementation.context.AzureGlobalPropertiesAutoConfiguration; -import com.azure.spring.cloud.autoconfigure.implementation.context.properties.AzureGlobalProperties; -import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import org.springframework.context.annotation.Role; /** * Configuration for OAuth2 support on Spring Cloud Stream Kafka Binder. Provide Azure Identity-based @@ -19,15 +16,12 @@ * @since 4.3.0 */ @Configuration(proxyBeanMethods = false) -@Role(BeanDefinition.ROLE_INFRASTRUCTURE) @ConditionalOnClass(KafkaMessageChannelBinder.class) @Import(AzureGlobalPropertiesAutoConfiguration.class) public class AzureKafkaSpringCloudStreamConfiguration { @Bean - @Role(BeanDefinition.ROLE_INFRASTRUCTURE) - static KafkaBinderConfigurationPropertiesBeanPostProcessor kafkaBinderConfigurationPropertiesBeanPostProcessor( - AzureGlobalProperties azureGlobalProperties) { - return new KafkaBinderConfigurationPropertiesBeanPostProcessor(azureGlobalProperties); + static KafkaBinderConfigurationPropertiesBeanPostProcessor kafkaBinderConfigurationPropertiesBeanPostProcessor() { + return new KafkaBinderConfigurationPropertiesBeanPostProcessor(); } } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaBinderConfigurationPropertiesBeanPostProcessor.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaBinderConfigurationPropertiesBeanPostProcessor.java index 6db44fb776ca7..c93ad1aa10166 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaBinderConfigurationPropertiesBeanPostProcessor.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaBinderConfigurationPropertiesBeanPostProcessor.java @@ -23,10 +23,6 @@ class KafkaBinderConfigurationPropertiesBeanPostProcessor extends AbstractKafkaP private static final Logger LOGGER = LoggerFactory.getLogger(KafkaBinderConfigurationPropertiesBeanPostProcessor.class); - KafkaBinderConfigurationPropertiesBeanPostProcessor(AzureGlobalProperties azureGlobalProperties) { - super(azureGlobalProperties); - } - @Override protected Map getMergedProducerProperties(KafkaBinderConfigurationProperties properties) { return mergeNonAdminProperties(properties.mergedProducerConfiguration(), properties.getConfiguration()); diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaPropertiesBeanPostProcessor.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaPropertiesBeanPostProcessor.java index 3d5d4beee9cc6..fffbd5885a180 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaPropertiesBeanPostProcessor.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaPropertiesBeanPostProcessor.java @@ -2,7 +2,6 @@ // Licensed under the MIT License. package com.azure.spring.cloud.autoconfigure.implementation.kafka; -import com.azure.spring.cloud.autoconfigure.implementation.context.properties.AzureGlobalProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; @@ -13,10 +12,6 @@ class KafkaPropertiesBeanPostProcessor extends AbstractKafkaPropertiesBeanPostPr private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPropertiesBeanPostProcessor.class); - KafkaPropertiesBeanPostProcessor(AzureGlobalProperties azureGlobalProperties) { - super(azureGlobalProperties); - } - @Override protected Map getMergedProducerProperties(KafkaProperties properties) { return invokeBuildKafkaProperties(properties, "buildProducerProperties"); diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaAutoConfigurationTests.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaAutoConfigurationTests.java index ce5c41189e19d..8eb24d6d593ee 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaAutoConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaAutoConfigurationTests.java @@ -34,8 +34,7 @@ class AzureEventHubsKafkaAutoConfigurationTests { "Endpoint=sb://%s.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=key"; private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() - .withConfiguration(AutoConfigurations.of(AzureEventHubsKafkaAutoConfiguration.class, - KafkaPropertiesConfiguration.class, KafkaAutoConfiguration.class)); + .withConfiguration(AutoConfigurations.of(AzureEventHubsKafkaAutoConfiguration.class, KafkaAutoConfiguration.class)); @Test void shouldNotConfigureWhenAzureEventHubsKafkaDisabled() { this.contextRunner diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaConfigurationTests.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaConfigurationTests.java index 7cbe582e2066f..cd4bb8a1e5aed 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaConfigurationTests.java @@ -36,8 +36,8 @@ class AzureEventHubsKafkaConfigurationTests { private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() .withConfiguration(AutoConfigurations.of(AzureEventHubsKafkaOAuth2AutoConfiguration.class, AzureEventHubsKafkaAutoConfiguration.class, AzureGlobalPropertiesAutoConfiguration.class, - AzureTokenCredentialAutoConfiguration.class, KafkaPropertiesConfiguration.class, - KafkaAutoConfiguration.class, AzureKafkaSpringCloudStreamConfiguration.class, KafkaBinderConfiguration.class)); + AzureTokenCredentialAutoConfiguration.class, KafkaAutoConfiguration.class, + AzureKafkaSpringCloudStreamConfiguration.class, KafkaBinderConfiguration.class)); @Test diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureKafkaOAuth2BinderConfigurationTests.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureKafkaOAuth2BinderConfigurationTests.java index 2824636e5c970..14b9cbee0bce6 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureKafkaOAuth2BinderConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureKafkaOAuth2BinderConfigurationTests.java @@ -40,7 +40,7 @@ class AzureKafkaOAuth2BinderConfigurationTests extends AbstractAzureKafkaOAuth2AutoConfigurationTests { AzureKafkaOAuth2BinderConfigurationTests() { - super(new KafkaBinderConfigurationPropertiesBeanPostProcessor(new AzureGlobalProperties())); + super(new KafkaBinderConfigurationPropertiesBeanPostProcessor()); } private final ApplicationContextRunner contextRunnerWithoutEventHubsURL = new ApplicationContextRunner() diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureKafkaOAuth2BootConfigurationTests.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureKafkaOAuth2BootConfigurationTests.java index 0f018403653f9..797e6dde7b201 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureKafkaOAuth2BootConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureKafkaOAuth2BootConfigurationTests.java @@ -36,7 +36,7 @@ class AzureKafkaOAuth2BootConfigurationTests extends AbstractAzureKafkaOAuth2AutoConfigurationTests { AzureKafkaOAuth2BootConfigurationTests() { - super(new KafkaPropertiesBeanPostProcessor(new AzureGlobalProperties())); + super(new KafkaPropertiesBeanPostProcessor()); } private final ApplicationContextRunner contextRunnerWithoutEventHubsURL = new ApplicationContextRunner() diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaBinderConfigurationPropertiesBeanPostProcessorTest.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaBinderConfigurationPropertiesBeanPostProcessorTest.java index 54288875b0bce..94b25e481ba1b 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaBinderConfigurationPropertiesBeanPostProcessorTest.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaBinderConfigurationPropertiesBeanPostProcessorTest.java @@ -2,13 +2,12 @@ // Licensed under the MIT License. package com.azure.spring.cloud.autoconfigure.implementation.kafka; -import com.azure.spring.cloud.autoconfigure.implementation.context.properties.AzureGlobalProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; class KafkaBinderConfigurationPropertiesBeanPostProcessorTest extends AbstractKafkaPropertiesBeanPostProcessorTest { KafkaBinderConfigurationPropertiesBeanPostProcessorTest() { - super(new KafkaBinderConfigurationPropertiesBeanPostProcessor(new AzureGlobalProperties())); + super(new KafkaBinderConfigurationPropertiesBeanPostProcessor()); } } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaPropertiesBeanPostProcessorTest.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaPropertiesBeanPostProcessorTest.java index da342aaca8950..7f9838bf70149 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaPropertiesBeanPostProcessorTest.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaPropertiesBeanPostProcessorTest.java @@ -2,13 +2,12 @@ // Licensed under the MIT License. package com.azure.spring.cloud.autoconfigure.implementation.kafka; -import com.azure.spring.cloud.autoconfigure.implementation.context.properties.AzureGlobalProperties; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; class KafkaPropertiesBeanPostProcessorTest extends AbstractKafkaPropertiesBeanPostProcessorTest { KafkaPropertiesBeanPostProcessorTest() { - super(new KafkaPropertiesBeanPostProcessor(new AzureGlobalProperties())); + super(new KafkaPropertiesBeanPostProcessor()); } } diff --git a/sdk/spring/spring-cloud-azure-integration-tests/pom.xml b/sdk/spring/spring-cloud-azure-integration-tests/pom.xml index 2d83a9f8e7797..726b650228fbc 100644 --- a/sdk/spring/spring-cloud-azure-integration-tests/pom.xml +++ b/sdk/spring/spring-cloud-azure-integration-tests/pom.xml @@ -104,6 +104,11 @@ spring-cloud-azure-starter-jdbc-mysql 5.20.0-beta.1 + + org.springframework.cloud + spring-cloud-starter-stream-kafka + 4.2.0 + org.springframework.boot spring-boot-starter-test diff --git a/sdk/spring/spring-cloud-azure-integration-tests/src/test/java/com/azure/spring/cloud/integration/tests/eventhubs/binder/EventHubsKafkaBinderIT.java b/sdk/spring/spring-cloud-azure-integration-tests/src/test/java/com/azure/spring/cloud/integration/tests/eventhubs/binder/EventHubsKafkaBinderIT.java new file mode 100644 index 0000000000000..e234d2074ac8f --- /dev/null +++ b/sdk/spring/spring-cloud-azure-integration-tests/src/test/java/com/azure/spring/cloud/integration/tests/eventhubs/binder/EventHubsKafkaBinderIT.java @@ -0,0 +1,79 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.spring.cloud.integration.tests.eventhubs.binder; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.context.ActiveProfiles; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.assertj.core.api.Assertions.assertThat; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) +@ActiveProfiles("eventhubs-kafka-binder") +class EventHubsKafkaBinderIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsKafkaBinderIT.class); + private static final String MESSAGE = UUID.randomUUID().toString(); + private static final CountDownLatch LATCH = new CountDownLatch(1); + + @Autowired + private Sinks.Many> many; + + @TestConfiguration + static class TestConfig { + + @Bean + public MeterRegistry meterRegistry() { + return new SimpleMeterRegistry(); + } + + @Bean + public Sinks.Many> many() { + return Sinks.many().unicast().onBackpressureBuffer(); + } + + @Bean + public Supplier>> supply(Sinks.Many> many) { + return () -> many.asFlux() + .doOnNext(m -> LOGGER.info("Manually sending message {}", m)) + .doOnError(t -> LOGGER.error("Error encountered", t)); + } + + @Bean + public Consumer> consume() { + return message -> { + LOGGER.info("New message received: '{}'", message.getPayload()); + LATCH.countDown(); + }; + } + } + + @Test + void testSendAndReceiveMessage() throws InterruptedException { + LOGGER.info("EventHubsKafkaBinderIT begin."); + // Wait for Kafka Binder initialization to complete + Thread.sleep(20000); + LOGGER.info("Send a message:" + MESSAGE + "."); + many.emitNext(new GenericMessage<>(MESSAGE), Sinks.EmitFailureHandler.FAIL_FAST); + assertThat(EventHubsKafkaBinderIT.LATCH.await(40, TimeUnit.SECONDS)).isTrue(); + LOGGER.info("EventHubsKafkaBinderIT end."); + } +} diff --git a/sdk/spring/spring-cloud-azure-integration-tests/src/test/resources/application-eventhubs-kafka-binder.yml b/sdk/spring/spring-cloud-azure-integration-tests/src/test/resources/application-eventhubs-kafka-binder.yml new file mode 100644 index 0000000000000..b5bc76a1f527d --- /dev/null +++ b/sdk/spring/spring-cloud-azure-integration-tests/src/test/resources/application-eventhubs-kafka-binder.yml @@ -0,0 +1,20 @@ +spring: + cloud: + azure: + eventhubs: + connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} + function: + definition: consume;supply + stream: + bindings: + consume-in-0: + destination: connnection-string-auth + group: $Default + supply-out-0: + destination: connnection-string-auth + binders: + kafka: + environment: + spring: + main: + sources: com.azure.spring.cloud.autoconfigure.implementation.eventhubs.kafka.AzureEventHubsKafkaAutoConfiguration diff --git a/sdk/spring/spring-cloud-azure-integration-tests/src/test/resources/application-servicebus-jms-connection-string.yml b/sdk/spring/spring-cloud-azure-integration-tests/src/test/resources/application-servicebus-jms-connection-string.yml index c9d1b15743b4c..0d715bfeab7a4 100644 --- a/sdk/spring/spring-cloud-azure-integration-tests/src/test/resources/application-servicebus-jms-connection-string.yml +++ b/sdk/spring/spring-cloud-azure-integration-tests/src/test/resources/application-servicebus-jms-connection-string.yml @@ -1,8 +1,4 @@ spring: - cloud: - azure: - credential: - token-credential-bean-name: integrationTestTokenCredential jms: servicebus: connection-string: ${AZURE_SERVICE_BUS_JMS_CONNECTION_STRING} diff --git a/sdk/spring/spring-cloud-azure-integration-tests/test-resources/eventhubs/test-resources.json b/sdk/spring/spring-cloud-azure-integration-tests/test-resources/eventhubs/test-resources.json index a7f8ea582f887..eb38bb796affb 100644 --- a/sdk/spring/spring-cloud-azure-integration-tests/test-resources/eventhubs/test-resources.json +++ b/sdk/spring/spring-cloud-azure-integration-tests/test-resources/eventhubs/test-resources.json @@ -98,6 +98,31 @@ ], "properties": {} }, + { + "type": "Microsoft.EventHub/namespaces/eventhubs", + "apiVersion": "2022-01-01-preview", + "name": "[concat(variables('eventHubsNamespaceName'), '/connnection-string-auth')]", + "location": "[variables('location')]", + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces', variables('eventHubsNamespaceName'))]" + ], + "properties": { + "messageRetentionInDays": 1, + "partitionCount": 1, + "status": "Active" + } + }, + { + "type": "Microsoft.EventHub/namespaces/eventhubs/consumergroups", + "apiVersion": "2022-01-01-preview", + "name": "[concat(variables('eventHubsNamespaceName'), '/connnection-string-auth/$Default')]", + "location": "[variables('location')]", + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventHubsNamespaceName'), 'connnection-string-auth')]", + "[resourceId('Microsoft.EventHub/namespaces', variables('eventHubsNamespaceName'))]" + ], + "properties": {} + }, { "type": "Microsoft.EventHub/namespaces/eventhubs", "apiVersion": "2022-01-01-preview", @@ -465,6 +490,10 @@ "AZURE_EVENTHUB_NAME_FOR_BINDER_SYNC": { "type": "string", "value": "test-eventhub-sync" + }, + "AZURE_EVENT_HUBS_CONNECTION_STRING": { + "type": "string", + "value": "[listKeys(resourceId('Microsoft.EventHub/namespaces/authorizationRules', variables('eventHubsNamespaceName'), 'RootManageSharedAccessKey'), '2017-04-01').primaryConnectionString]" } } }