Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix kafka connection string auth configuration condition and improve BPP #43854

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions sdk/spring/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This section includes changes in `spring-cloud-azure-autoconfigure` module.

#### Bugs Fixed
- Fix bug: Registered the empty value for ineligible definition, it causes NPE when sending message via bean `StreamBridge`. [#43366](https://github.com/Azure/azure-sdk-for-java/issues/43366).
- Fix bug: Not working when using Spring Kafka and Kafka Binder via connection string auth [#43853](https://github.com/Azure/azure-sdk-for-java/issues/43853).

### Spring Messaging Azure Service Bus
This section includes changes in the `spring-messaging-azure-servicebus` module.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
package com.azure.spring.cloud.autoconfigure.implementation.context;

import com.azure.spring.cloud.core.implementation.factory.AbstractAzureServiceClientBuilderFactory;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Role;

/**
* {@code @Configuration} class that registers a {@link AzureServiceClientBuilderFactoryPostProcessor}
Expand All @@ -16,15 +14,13 @@
* @since 5.19.0
*/
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
class AzureServiceClientBuilderFactoryConfiguration {

/**
* The BeanPostProcessor to apply the default token credential and resolver to all service client builder factories.
* @return the BPP.
*/
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
static AzureServiceClientBuilderFactoryPostProcessor builderFactoryBeanPostProcessor() {
return new AzureServiceClientBuilderFactoryPostProcessor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.KafkaTemplate;

Expand All @@ -36,7 +36,6 @@
@ConditionalOnClass(KafkaTemplate.class)
@ConditionalOnProperty(value = "spring.cloud.azure.eventhubs.kafka.enabled", havingValue = "true", matchIfMissing = true)
@AutoConfigureAfter({ AzureEventHubsAutoConfiguration.class, AzureEventHubsResourceManagerAutoConfiguration.class })
@Import(KafkaPropertiesConfiguration.class)
public class AzureEventHubsKafkaAutoConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(AzureEventHubsKafkaAutoConfiguration.class);
Expand All @@ -56,4 +55,10 @@ StaticConnectionStringProvider<AzureServiceType.EventHubs> eventHubsKafkaConnect

return new StaticConnectionStringProvider<>(AzureServiceType.EVENT_HUBS, connectionString);
}

@Bean
@ConditionalOnBean(value = AzureServiceType.EventHubs.class, parameterizedContainer = ServiceConnectionStringProvider.class)
static KafkaPropertiesBeanPostProcessor kafkaPropertiesBeanPostProcessor() {
return new KafkaPropertiesBeanPostProcessor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.ResolvableType;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -22,16 +27,12 @@
/**
* {@link BeanPostProcessor} for {@link KafkaProperties} to configure connection string credentials.
*/
class KafkaPropertiesBeanPostProcessor implements BeanPostProcessor {
class KafkaPropertiesBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPropertiesBeanPostProcessor.class);
private static final String SASL_CONFIG_VALUE = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";%s";

private final ServiceConnectionStringProvider<AzureServiceType.EventHubs> connectionStringProvider;

KafkaPropertiesBeanPostProcessor(ServiceConnectionStringProvider<AzureServiceType.EventHubs> connectionStringProvider) {
this.connectionStringProvider = connectionStringProvider;
}
private ApplicationContext applicationContext;

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Expand All @@ -43,8 +44,15 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
+ " instead, which can be set as spring.kafka.boostrap-servers=EventHubsNamespacesFQDN:9093.");

KafkaProperties kafkaProperties = (KafkaProperties) bean;
String connectionString = connectionStringProvider.getConnectionString();
ResolvableType provider = ResolvableType.forClassWithGenerics(ServiceConnectionStringProvider.class, AzureServiceType.EventHubs.class);
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.EventHubs>> beanProvider = applicationContext.getBeanProvider(provider);

ServiceConnectionStringProvider<AzureServiceType.EventHubs> connectionStringProvider = beanProvider.getIfAvailable();
if (connectionStringProvider == null) {
moarychan marked this conversation as resolved.
Show resolved Hide resolved
throw new NoSuchBeanDefinitionException("Not found ServiceConnectionStringProvider<AzureServiceType.EventHubs> bean.");
}

String connectionString = connectionStringProvider.getConnectionString();
String bootstrapServer = new EventHubsConnectionString(connectionString).getFullyQualifiedNamespace() + ":9093";
kafkaProperties.setBootstrapServers(new ArrayList<>(Collections.singletonList(bootstrapServer)));
kafkaProperties.getProperties().put(SECURITY_PROTOCOL_CONFIG, SASL_SSL.name());
Expand All @@ -55,4 +63,8 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
return bean;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package com.azure.spring.cloud.autoconfigure.implementation.jdbc;

import com.azure.identity.extensions.implementation.template.AzureAuthenticationTemplate;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
Expand All @@ -13,7 +12,6 @@
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Role;


/**
Expand All @@ -23,15 +21,13 @@
* @since 4.5.0
*/
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@ConditionalOnBean(DataSourceProperties.class)
@ConditionalOnClass(AzureAuthenticationTemplate.class)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)
public class AzureJdbcAutoConfiguration {

@Bean
@ConditionalOnMissingBean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
static JdbcPropertiesBeanPostProcessor jdbcConfigurationPropertiesBeanPostProcessor() {
return new JdbcPropertiesBeanPostProcessor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,27 @@
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.ResolvableType;
import org.springframework.util.StringUtils;

class AzureServiceBusJmsPropertiesBeanPostProcessor implements BeanPostProcessor {
class AzureServiceBusJmsPropertiesBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware {

private final ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.ServiceBus>> connectionStringProviders;
private ApplicationContext applicationContext;

AzureServiceBusJmsPropertiesBeanPostProcessor(ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.ServiceBus>> connectionStringProviders) {
this.connectionStringProviders = connectionStringProviders;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof AzureServiceBusJmsProperties) {
AzureServiceBusJmsProperties jmsProperties = (AzureServiceBusJmsProperties) bean;
if (!StringUtils.hasText(jmsProperties.getConnectionString())) {
ResolvableType providerType = ResolvableType.forClassWithGenerics(ServiceConnectionStringProvider.class, AzureServiceType.ServiceBus.class);
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.ServiceBus>> connectionStringProviders = applicationContext.getBeanProvider(providerType);
connectionStringProviders.ifAvailable(provider -> jmsProperties.setConnectionString(provider.getConnectionString()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,9 @@

import com.azure.spring.cloud.autoconfigure.implementation.condition.ConditionalOnMissingProperty;
import com.azure.spring.cloud.autoconfigure.implementation.jms.properties.AzureServiceBusJmsProperties;
import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider;
import com.azure.spring.cloud.core.service.AzureServiceType;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Role;

/**
* {@code @Configuration} class that registers a {@link AzureServiceBusJmsPropertiesBeanPostProcessor}
Expand All @@ -21,15 +16,12 @@
* @since 5.19.0
*/
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
class ServiceBusJmsPropertiesConfiguration {

@Bean
@ConditionalOnMissingBean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@ConditionalOnMissingProperty(prefix = "spring.jms.servicebus", name = "connection-string")
static AzureServiceBusJmsPropertiesBeanPostProcessor azureServiceBusJmsPropertiesBeanPostProcessor(
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.ServiceBus>> connectionStringProviders) {
return new AzureServiceBusJmsPropertiesBeanPostProcessor(connectionStringProviders);
static AzureServiceBusJmsPropertiesBeanPostProcessor azureServiceBusJmsPropertiesBeanPostProcessor() {
return new AzureServiceBusJmsPropertiesBeanPostProcessor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.InvocationTargetException;
Expand All @@ -39,14 +41,15 @@
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
import static org.springframework.util.StringUtils.delimitedListToStringArray;

abstract class AbstractKafkaPropertiesBeanPostProcessor<T> implements BeanPostProcessor {
abstract class AbstractKafkaPropertiesBeanPostProcessor<T> implements BeanPostProcessor, ApplicationContextAware {

static final String SECURITY_PROTOCOL_CONFIG_SASL = SASL_SSL.name();
static final String SASL_MECHANISM_OAUTH = OAUTHBEARER_MECHANISM;
static final String AZURE_CONFIGURED_JAAS_OPTIONS_KEY = "azure.configured";
static final String AZURE_CONFIGURED_JAAS_OPTIONS_VALUE = "true";
static final String SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH =
KafkaOAuth2AuthenticateCallbackHandler.class.getName();
protected ApplicationContext applicationContext;
protected static final PropertyMapper PROPERTY_MAPPER = new PropertyMapper();
private static final Map<String, String> KAFKA_OAUTH_CONFIGS;
private static final String LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE = "OAUTHBEARER authentication property {} will be configured as {} to support Azure Identity credentials.";
Expand All @@ -63,16 +66,13 @@ abstract class AbstractKafkaPropertiesBeanPostProcessor<T> implements BeanPostPr
KAFKA_OAUTH_CONFIGS = Collections.unmodifiableMap(configs);
}

private final AzureGlobalProperties azureGlobalProperties;

AbstractKafkaPropertiesBeanPostProcessor(AzureGlobalProperties azureGlobalProperties) {
this.azureGlobalProperties = azureGlobalProperties;
}
private AzureGlobalProperties azureGlobalProperties;

@SuppressWarnings("unchecked")
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (needsPostProcess(bean)) {
azureGlobalProperties = applicationContext.getBean(AzureGlobalProperties.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to handle the case when the bean is not present?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the getBean will throw exception when not found the global properties.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the other two BPP won't throw any exceptions, so do we need to throw the exception here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared with the previous implementation, here the global properties bean should be available, can we assume it's always exist? if not, then exception should be thrown.

T properties = (T) bean;

replaceAzurePropertiesWithJaas(getMergedProducerProperties(properties), getRawProducerProperties(properties));
Expand Down Expand Up @@ -129,6 +129,11 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro

protected abstract Logger getLogger();

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

/**
* Process Kafka Spring properties for any customized operations.
* @param properties the Kafka Spring properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@
// Licensed under the MIT License.
package com.azure.spring.cloud.autoconfigure.implementation.kafka;

import com.azure.spring.cloud.autoconfigure.implementation.context.properties.AzureGlobalProperties;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Role;
import org.springframework.kafka.core.KafkaTemplate;

import static com.azure.spring.cloud.autoconfigure.implementation.context.AzureContextUtils.PASSWORDLESS_KAFKA_PROPERTIES_BEAN_POST_PROCESSOR_BEAN_NAME;
Expand All @@ -22,14 +19,12 @@
* @since 4.3.0
*/
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@ConditionalOnClass(KafkaTemplate.class)
@ConditionalOnProperty(value = "spring.cloud.azure.eventhubs.kafka.enabled", havingValue = "true", matchIfMissing = true)
public class AzureEventHubsKafkaOAuth2AutoConfiguration {

@Bean(PASSWORDLESS_KAFKA_PROPERTIES_BEAN_POST_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
static BeanPostProcessor kafkaPropertiesBeanPostProcessor(AzureGlobalProperties properties) {
return new KafkaPropertiesBeanPostProcessor(properties);
static BeanPostProcessor kafkaPropertiesBeanPostProcessor() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean there will be two kafka properties bpp in the context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, one for jaas based on conn string, another for jaas based on OAuth2.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are they exclusive? Will they be in the context at the same time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are logically mutually exclusive, but they do not affect each other; oauth2's BPP will be executed first (postProcessBeforeInitialization), and conn string's BPP will be executed later (postProcessAfterInitialization). When both exist at the same time, oauth2 BPP will not make any changes, and conn string BPP will continue to execute and use conn string for auth.

return new KafkaPropertiesBeanPostProcessor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
package com.azure.spring.cloud.autoconfigure.implementation.kafka;

import com.azure.spring.cloud.autoconfigure.implementation.context.AzureGlobalPropertiesAutoConfiguration;
import com.azure.spring.cloud.autoconfigure.implementation.context.properties.AzureGlobalProperties;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Role;

/**
* Configuration for OAuth2 support on Spring Cloud Stream Kafka Binder. Provide Azure Identity-based
Expand All @@ -19,15 +16,12 @@
* @since 4.3.0
*/
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@ConditionalOnClass(KafkaMessageChannelBinder.class)
@Import(AzureGlobalPropertiesAutoConfiguration.class)
public class AzureKafkaSpringCloudStreamConfiguration {

@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
static KafkaBinderConfigurationPropertiesBeanPostProcessor kafkaBinderConfigurationPropertiesBeanPostProcessor(
AzureGlobalProperties azureGlobalProperties) {
return new KafkaBinderConfigurationPropertiesBeanPostProcessor(azureGlobalProperties);
static KafkaBinderConfigurationPropertiesBeanPostProcessor kafkaBinderConfigurationPropertiesBeanPostProcessor() {
return new KafkaBinderConfigurationPropertiesBeanPostProcessor();
}
}
Loading
Loading