From a72c04c513fb3adfcf7af66cdf4cba998b75296d Mon Sep 17 00:00:00 2001 From: Francesco Pellegrini Date: Sun, 18 Aug 2024 10:07:27 +0200 Subject: [PATCH] bump kafka to v3.8.0 and fix compilation errors (#514) --- .../github/embeddedkafka/ops/kafkaOps.scala | 34 ++++++++++++------- .../EmbeddedKafkaCustomConfigSpec.scala | 6 ++-- .../EmbeddedKafkaMethodsSpec.scala | 6 ++-- project/Dependencies.scala | 2 +- 4 files changed, 28 insertions(+), 20 deletions(-) diff --git a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala b/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala index f8a6c1f8..1a7e5e3d 100644 --- a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala +++ b/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala @@ -1,7 +1,6 @@ package io.github.embeddedkafka.ops import java.nio.file.Path - import kafka.server.{KafkaConfig, KafkaServer} import io.github.embeddedkafka.{ EmbeddedK, @@ -10,6 +9,15 @@ import io.github.embeddedkafka.{ EmbeddedZ } import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.network.SocketServerConfigs +import org.apache.kafka.server.config.{ + ServerConfigs, + ServerLogConfigs, + ZkConfigs +} +import org.apache.kafka.storage.internals.log.CleanerConfig import scala.jdk.CollectionConverters._ @@ -31,19 +39,19 @@ trait KafkaOps { val listener = s"${SecurityProtocol.PLAINTEXT}://localhost:$kafkaPort" val brokerProperties = Map[String, Object]( - KafkaConfig.ZkConnectProp -> zkAddress, - KafkaConfig.BrokerIdProp -> brokerId.toString, - KafkaConfig.ListenersProp -> listener, - KafkaConfig.AdvertisedListenersProp -> listener, - KafkaConfig.AutoCreateTopicsEnableProp -> autoCreateTopics.toString, - KafkaConfig.LogDirProp -> kafkaLogDir.toAbsolutePath.toString, - KafkaConfig.LogFlushIntervalMessagesProp -> 1.toString, - KafkaConfig.OffsetsTopicReplicationFactorProp -> 1.toString, - KafkaConfig.OffsetsTopicPartitionsProp -> 1.toString, - KafkaConfig.TransactionsTopicReplicationFactorProp -> 1.toString, - KafkaConfig.TransactionsTopicMinISRProp -> 1.toString, + ZkConfigs.ZK_CONNECT_CONFIG -> zkAddress, + ServerConfigs.BROKER_ID_CONFIG -> brokerId.toString, + SocketServerConfigs.LISTENERS_CONFIG -> listener, + SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG -> listener, + ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> autoCreateTopics.toString, + ServerLogConfigs.LOG_DIRS_CONFIG -> kafkaLogDir.toAbsolutePath.toString, + ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG -> 1.toString, + GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString, + GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> 1.toString, + TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString, + TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG -> 1.toString, // The total memory used for log deduplication across all cleaner threads, keep it small to not exhaust suite memory - KafkaConfig.LogCleanerDedupeBufferSizeProp -> logCleanerDedupeBufferSize.toString + CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> logCleanerDedupeBufferSize.toString ) ++ customBrokerProperties val broker = new KafkaServer(new KafkaConfig(brokerProperties.asJava)) diff --git a/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala b/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala index 729ed33b..996b4ee8 100644 --- a/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala +++ b/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala @@ -1,9 +1,9 @@ package io.github.embeddedkafka -import kafka.server.KafkaConfig import io.github.embeddedkafka.EmbeddedKafka._ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs} import scala.language.postfixOps import scala.util.Random @@ -16,8 +16,8 @@ class EmbeddedKafkaCustomConfigSpec extends EmbeddedKafkaSpecSupport { "allow pass additional producer parameters" in { val customBrokerConfig = Map( - KafkaConfig.ReplicaFetchMaxBytesProp -> s"$ThreeMegabytes", - KafkaConfig.MessageMaxBytesProp -> s"$ThreeMegabytes" + ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG -> s"$ThreeMegabytes", + ServerConfigs.MESSAGE_MAX_BYTES_CONFIG -> s"$ThreeMegabytes" ) val customProducerConfig = diff --git a/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaMethodsSpec.scala b/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaMethodsSpec.scala index b1a60932..0b86f5e3 100644 --- a/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaMethodsSpec.scala +++ b/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaMethodsSpec.scala @@ -2,7 +2,6 @@ package io.github.embeddedkafka import java.util.Collections import java.util.concurrent.TimeoutException -import kafka.server.KafkaConfig import kafka.zk.KafkaZkClient import io.github.embeddedkafka.EmbeddedKafka._ import io.github.embeddedkafka.serializers.{ @@ -18,6 +17,7 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.serialization._ import org.apache.kafka.common.utils.Time +import org.apache.kafka.storage.internals.log.CleanerConfig import org.apache.zookeeper.client.ZKClientConfig import org.scalatest.concurrent.JavaFutures import org.scalatest.time.{Milliseconds, Seconds, Span} @@ -159,7 +159,7 @@ class EmbeddedKafkaMethodsSpec "create a topic with a custom configuration" in { implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig( customBrokerProperties = Map( - KafkaConfig.LogCleanerDedupeBufferSizeProp -> 2000000.toString + CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> 2000000.toString ) ) val topic = "test_custom_topic" @@ -220,7 +220,7 @@ class EmbeddedKafkaMethodsSpec "either delete of mark for deletion a list of topics" in { implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig( customBrokerProperties = Map( - KafkaConfig.LogCleanerDedupeBufferSizeProp -> 2000000.toString + CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> 2000000.toString ) ) val topics = List("test_topic_deletion_1", "test_topic_deletion_2") diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 93fd6e45..efab18de 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -6,7 +6,7 @@ object Dependencies { val Scala3 = "3.3.3" val Scala213 = "2.13.14" val Scala212 = "2.12.19" - val Kafka = "3.7.1" + val Kafka = "3.8.0" val Slf4j = "1.7.36" val ScalaTest = "3.2.19" }