-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for optional Kafka properties from external file #8743
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,30 +30,31 @@ public class KafkaClientsModule | |
protected void setup(Binder binder) | ||
{ | ||
configBinder(binder).bindConfig(KafkaSecurityConfig.class); | ||
installClientModule(SecurityProtocol.PLAINTEXT, KafkaClientsModule::configurePlainText); | ||
installClientModule(null, KafkaClientsModule::configureDefault); | ||
installClientModule(SecurityProtocol.PLAINTEXT, KafkaClientsModule::configureDefault); | ||
installClientModule(SecurityProtocol.SSL, KafkaClientsModule::configureSsl); | ||
} | ||
|
||
private void installClientModule(SecurityProtocol securityProtocol, Module module) | ||
{ | ||
install(conditionalModule( | ||
KafkaSecurityConfig.class, | ||
config -> config.getSecurityProtocol().equals(securityProtocol), | ||
config -> config.getSecurityProtocol().orElse(null) == securityProtocol, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Applying this fails |
||
module)); | ||
} | ||
|
||
private static void configurePlainText(Binder binder) | ||
private static void configureDefault(Binder binder) | ||
{ | ||
binder.bind(KafkaConsumerFactory.class).to(PlainTextKafkaConsumerFactory.class).in(Scopes.SINGLETON); | ||
binder.bind(KafkaProducerFactory.class).to(PlainTextKafkaProducerFactory.class).in(Scopes.SINGLETON); | ||
binder.bind(KafkaAdminFactory.class).to(PlainTextKafkaAdminFactory.class).in(Scopes.SINGLETON); | ||
binder.bind(KafkaConsumerFactory.class).to(DefaultKafkaConsumerFactory.class).in(Scopes.SINGLETON); | ||
binder.bind(KafkaProducerFactory.class).to(DefaultKafkaProducerFactory.class).in(Scopes.SINGLETON); | ||
binder.bind(KafkaAdminFactory.class).to(DefaultKafkaAdminFactory.class).in(Scopes.SINGLETON); | ||
} | ||
|
||
private static void configureSsl(Binder binder) | ||
{ | ||
binder.bind(KafkaConsumerFactory.class).annotatedWith(ForKafkaSsl.class).to(PlainTextKafkaConsumerFactory.class).in(Scopes.SINGLETON); | ||
binder.bind(KafkaProducerFactory.class).annotatedWith(ForKafkaSsl.class).to(PlainTextKafkaProducerFactory.class).in(Scopes.SINGLETON); | ||
binder.bind(KafkaAdminFactory.class).annotatedWith(ForKafkaSsl.class).to(PlainTextKafkaAdminFactory.class).in(Scopes.SINGLETON); | ||
binder.bind(KafkaConsumerFactory.class).annotatedWith(ForKafkaSsl.class).to(DefaultKafkaConsumerFactory.class).in(Scopes.SINGLETON); | ||
binder.bind(KafkaProducerFactory.class).annotatedWith(ForKafkaSsl.class).to(DefaultKafkaProducerFactory.class).in(Scopes.SINGLETON); | ||
binder.bind(KafkaAdminFactory.class).annotatedWith(ForKafkaSsl.class).to(DefaultKafkaAdminFactory.class).in(Scopes.SINGLETON); | ||
|
||
binder.bind(KafkaConsumerFactory.class).to(SslKafkaConsumerFactory.class).in(Scopes.SINGLETON); | ||
binder.bind(KafkaProducerFactory.class).to(SslKafkaProducerFactory.class).in(Scopes.SINGLETON); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,8 @@ | |
import org.apache.kafka.common.security.auth.SecurityProtocol; | ||
|
||
import javax.annotation.PostConstruct; | ||
import javax.validation.constraints.NotNull; | ||
|
||
import java.util.Optional; | ||
|
||
import static com.google.common.base.Preconditions.checkState; | ||
import static java.lang.String.format; | ||
|
@@ -27,12 +28,11 @@ | |
|
||
public class KafkaSecurityConfig | ||
{ | ||
private SecurityProtocol securityProtocol = PLAINTEXT; | ||
private SecurityProtocol securityProtocol; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have here as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Setting it as public class TestKafkaSecurityConfig
{
@Test
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(KafkaSecurityConfig.class)
.setSecurityProtocol(null)); The signature |
||
|
||
@NotNull | ||
public SecurityProtocol getSecurityProtocol() | ||
public Optional<SecurityProtocol> getSecurityProtocol() | ||
{ | ||
return securityProtocol; | ||
return Optional.ofNullable(securityProtocol); | ||
} | ||
|
||
@Config("kafka.security-protocol") | ||
|
@@ -47,7 +47,7 @@ public KafkaSecurityConfig setSecurityProtocol(SecurityProtocol securityProtocol | |
public void validate() | ||
{ | ||
checkState( | ||
securityProtocol.equals(PLAINTEXT) || securityProtocol.equals(SSL), | ||
format("Only %s and %s security protocols are supported", PLAINTEXT, SSL)); | ||
securityProtocol == null || securityProtocol.equals(PLAINTEXT) || securityProtocol.equals(SSL), | ||
format("Only %s and %s security protocols are supported. See 'kafka.config.resources' if other security protocols are needed", PLAINTEXT, SSL)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of not specifying any modules, can we delegate to
DefaultText...
for other SecurityProtocol ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Praveen2112 , thanks for reviewing this PR. Can you kindly provide an example for your request?
This line currently does install the default module when no security protocol are explicitly provided by trino client.