diff --git a/README.md b/README.md index 3185f19..9f48e86 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,11 @@ KAFKA_MAX_BLOCK_MS=10000 A full list of available configurations can be found in the [official kafka docs](https://kafka.apache.org/documentation/#producerconfigs). + +| :warning: WARNING | +|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Because some environments have difficulties with empty string variables, a workaround for `SSL_ENDPOINT_IDENTIFICATION_ALGORITHM` was implemented. To disable the host name verification set the value to `disabled`. The module will transfer the value to an empty string when creating the kafka client. | + ### Kafka client using secure connection As mentioned above the kafka client can be configured by passing parameters to the start command. To make kafka open a SSL/TLS secured connection you can add the following parameters: diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java index 58cb06b..3b67b8d 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java @@ -2,100 +2,102 @@ import java.util.HashMap; import java.util.Map; - import org.keycloak.Config.Scope; public class KafkaProducerConfig { - // https://kafka.apache.org/documentation/#producerconfigs + // https://kafka.apache.org/documentation/#producerconfigs - public static Map init(Scope scope) { - Map propertyMap = new HashMap<>(); - KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values(); + public static Map init(Scope scope) { + Map propertyMap = new HashMap<>(); + KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values(); - for (KafkaProducerProperty property : producerProperties) { - String propertyEnv = System.getenv("KAFKA_" + property.name()); + for (KafkaProducerProperty property : producerProperties) { + String propertyEnv = System.getenv("KAFKA_" + property.name()); - if (property.getName() != null && scope.get(property.getName(), propertyEnv) != null) { - propertyMap.put(property.getName(), scope.get(property.getName(), propertyEnv)); - } - } + if (property == KafkaProducerProperty.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM && + ("disabled").equalsIgnoreCase(scope.get(property.getName(), propertyEnv))) { + propertyMap.put(property.getName(), ""); + } else if (property.getName() != null && scope.get(property.getName(), propertyEnv) != null) { + propertyMap.put(property.getName(), scope.get(property.getName(), propertyEnv)); + } + } - return propertyMap; - } + return propertyMap; + } - enum KafkaProducerProperty { - ACKS("acks"), // - BUFFER_MEMORY("buffer.memory"), // - COMPRESSION_TYPE("compression.type"), // - RETRIES("retries"), // - SSL_KEY_PASSWORD("ssl.key.password"), // - SSL_KEYSTORE_CERTIFICATE_CHAIN("ssl.keystore.certificate.chain"), // - SSL_KEYSTORE_LOCATION("ssl.keystore.location"), // - SSL_KEYSTORE_PASSWORD("ssl.keystore.password"), // - SSL_TRUSTSTORE_LOCATION("ssl.truststore.location"), // - SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password"), // - BATCH_SIZE("batch.size"), // - CLIENT_DNS_LOOKUP("client.dns.lookup"), // - CONNECTION_MAX_IDLE_MS("connections.max.idle.ms"), // - DELIVERY_TIMEOUT_MS("delivery.timeout.ms"), // - LINGER_MS("linger.ms"), // - MAX_BLOCK_MS("max.block.ms"), // - MAX_REQUEST_SIZE("max.request.size"), // - PARTITIONER_CLASS("partitioner.class"), // - RECEIVE_BUFFER_BYTES("receive.buffer.bytes"), // - REQUEST_TIMEOUT_MS("request.timeout.ms"), // - SASL_CLIENT_CALLBACK_HANDLER_CLASS("sasl.client.callback.handler.class"), // - SASL_JAAS_CONFIG("sasl.jaas.config"), // - SASL_KERBEROS_SERVICE_NAME("sasl.kerberos.service.name"), // - SASL_LOGIN_CALLBACK_HANDLER_CLASS("sasl.login.callback.handler.class"), // - SASL_LOGIN_CLASS("sasl.login.class"), // - SASL_MECHANISM("sasl.mechanism"), // - SECURITY_PROTOCOL("security.protocol"), // - SEND_BUFFER_BYTES("send.buffer.bytes"), // - SSL_ENABLED_PROTOCOLS("ssl.enabled.protocols"), // - SSL_KEYSTORE_TYPE("ssl.keystore.type"), // - SSL_PROTOCOL("ssl.protocol"), // - SSL_PROVIDER("ssl.provider"), // - SSL_TRUSTSTORE_TYPE("ssl.truststore.type"), // - ENABLE_IDEMPOTENCE("enable.idempotence"), // - INTERCEPTOR_CLASS("interceptor.classes"), // - MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION("max.in.flight.requests.per.connection"), // - METADATA_MAX_AGE_MS("metadata.max.age.ms"), // - METADATA_MAX_IDLE_MS("metadata.max.idle.ms"), // - METRIC_REPORTERS("metric.reporters"), // - METRIC_NUM_SAMPLES("metrics.num.samples"), // - METRICS_RECORDING_LEVEL("metrics.recording.level"), // - METRICS_SAMPLE_WINDOW_MS("metrics.sample.window.ms"), // - RECONNECT_BACKOFF_MAX_MS("reconnect.backoff.max.ms"), // - RECONNECT_BACKOFF_MS("reconnect.backoff.ms"), // - RETRY_BACKOFF_MS("retry.backoff.ms"), // - SASL_KERBEROS_KINIT_CMD("sasl.kerberos.kinit.cmd"), // - SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN("sasl.kerberos.min.time.before.relogin"), // - SASL_KERBEROS_TICKET_RENEW_JITTER("sasl.kerberos.ticket.renew.jitter"), // - SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR("sasl.kerberos.ticket.renew.window.factor"), // - SASL_LOGIN_REFRESH_BUFFER_SECONDS("sasl.login.refresh.buffer.seconds"), // - SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS("sasl.login.refresh.min.period.seconds"), // - SASL_LOGIN_REFRESH_WINDOW_FACTOR("sasl.login.refresh.window.factor"), // - SASL_LOGIN_REFRESH_WINDOW_JITTER("sasl.login.refresh.window.jitter"), // - SECURITY_PROVIDERS("security.providers"), // - SSL_CIPHER_SUITES("ssl.cipher.suites"), // - SSL_ENDPOINT_IDENTIFICATION_ALGORITHM("ssl.endpoint.identification.algorithm"), // - SSL_KEYMANAGER_ALGORITHM("ssl.keymanager.algorithm"), // - SSL_SECURE_RANDOM_IMPLEMENTATION("ssl.secure.random.implementation"), // - SSL_TRUSTMANAGER_ALGORITHM("ssl.trustmanager.algorithm"), // - TRANSACTION_TIMEOUT_MS("transaction.timeout.ms"), // - TRANSACTION_ID("transactional.id"); + enum KafkaProducerProperty { + ACKS("acks"), // + BUFFER_MEMORY("buffer.memory"), // + COMPRESSION_TYPE("compression.type"), // + RETRIES("retries"), // + SSL_KEY_PASSWORD("ssl.key.password"), // + SSL_KEYSTORE_CERTIFICATE_CHAIN("ssl.keystore.certificate.chain"), // + SSL_KEYSTORE_LOCATION("ssl.keystore.location"), // + SSL_KEYSTORE_PASSWORD("ssl.keystore.password"), // + SSL_TRUSTSTORE_LOCATION("ssl.truststore.location"), // + SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password"), // + BATCH_SIZE("batch.size"), // + CLIENT_DNS_LOOKUP("client.dns.lookup"), // + CONNECTION_MAX_IDLE_MS("connections.max.idle.ms"), // + DELIVERY_TIMEOUT_MS("delivery.timeout.ms"), // + LINGER_MS("linger.ms"), // + MAX_BLOCK_MS("max.block.ms"), // + MAX_REQUEST_SIZE("max.request.size"), // + PARTITIONER_CLASS("partitioner.class"), // + RECEIVE_BUFFER_BYTES("receive.buffer.bytes"), // + REQUEST_TIMEOUT_MS("request.timeout.ms"), // + SASL_CLIENT_CALLBACK_HANDLER_CLASS("sasl.client.callback.handler.class"), // + SASL_JAAS_CONFIG("sasl.jaas.config"), // + SASL_KERBEROS_SERVICE_NAME("sasl.kerberos.service.name"), // + SASL_LOGIN_CALLBACK_HANDLER_CLASS("sasl.login.callback.handler.class"), // + SASL_LOGIN_CLASS("sasl.login.class"), // + SASL_MECHANISM("sasl.mechanism"), // + SECURITY_PROTOCOL("security.protocol"), // + SEND_BUFFER_BYTES("send.buffer.bytes"), // + SSL_ENABLED_PROTOCOLS("ssl.enabled.protocols"), // + SSL_KEYSTORE_TYPE("ssl.keystore.type"), // + SSL_PROTOCOL("ssl.protocol"), // + SSL_PROVIDER("ssl.provider"), // + SSL_TRUSTSTORE_TYPE("ssl.truststore.type"), // + ENABLE_IDEMPOTENCE("enable.idempotence"), // + INTERCEPTOR_CLASS("interceptor.classes"), // + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION("max.in.flight.requests.per.connection"), // + METADATA_MAX_AGE_MS("metadata.max.age.ms"), // + METADATA_MAX_IDLE_MS("metadata.max.idle.ms"), // + METRIC_REPORTERS("metric.reporters"), // + METRIC_NUM_SAMPLES("metrics.num.samples"), // + METRICS_RECORDING_LEVEL("metrics.recording.level"), // + METRICS_SAMPLE_WINDOW_MS("metrics.sample.window.ms"), // + RECONNECT_BACKOFF_MAX_MS("reconnect.backoff.max.ms"), // + RECONNECT_BACKOFF_MS("reconnect.backoff.ms"), // + RETRY_BACKOFF_MS("retry.backoff.ms"), // + SASL_KERBEROS_KINIT_CMD("sasl.kerberos.kinit.cmd"), // + SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN("sasl.kerberos.min.time.before.relogin"), // + SASL_KERBEROS_TICKET_RENEW_JITTER("sasl.kerberos.ticket.renew.jitter"), // + SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR("sasl.kerberos.ticket.renew.window.factor"), // + SASL_LOGIN_REFRESH_BUFFER_SECONDS("sasl.login.refresh.buffer.seconds"), // + SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS("sasl.login.refresh.min.period.seconds"), // + SASL_LOGIN_REFRESH_WINDOW_FACTOR("sasl.login.refresh.window.factor"), // + SASL_LOGIN_REFRESH_WINDOW_JITTER("sasl.login.refresh.window.jitter"), // + SECURITY_PROVIDERS("security.providers"), // + SSL_CIPHER_SUITES("ssl.cipher.suites"), // + SSL_ENDPOINT_IDENTIFICATION_ALGORITHM("ssl.endpoint.identification.algorithm"), // + SSL_KEYMANAGER_ALGORITHM("ssl.keymanager.algorithm"), // + SSL_SECURE_RANDOM_IMPLEMENTATION("ssl.secure.random.implementation"), // + SSL_TRUSTMANAGER_ALGORITHM("ssl.trustmanager.algorithm"), // + TRANSACTION_TIMEOUT_MS("transaction.timeout.ms"), // + TRANSACTION_ID("transactional.id"); - private String name; + private String name; - private KafkaProducerProperty(String name) { - this.name = name; - } + private KafkaProducerProperty(String name) { + this.name = name; + } - public String getName() { - return name; - } - } + public String getName() { + return name; + } + } }