Skip to content

Commit

Permalink
bump kafka to v3.8.0 and fix compilation errors (#514)
Browse files Browse the repository at this point in the history
  • Loading branch information
francescopellegrini authored Aug 18, 2024
1 parent ceacce0 commit a72c04c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.github.embeddedkafka.ops

import java.nio.file.Path

import kafka.server.{KafkaConfig, KafkaServer}
import io.github.embeddedkafka.{
EmbeddedK,
Expand All @@ -10,6 +9,15 @@ import io.github.embeddedkafka.{
EmbeddedZ
}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.{
ServerConfigs,
ServerLogConfigs,
ZkConfigs
}
import org.apache.kafka.storage.internals.log.CleanerConfig

import scala.jdk.CollectionConverters._

Expand All @@ -31,19 +39,19 @@ trait KafkaOps {
val listener = s"${SecurityProtocol.PLAINTEXT}://localhost:$kafkaPort"

val brokerProperties = Map[String, Object](
KafkaConfig.ZkConnectProp -> zkAddress,
KafkaConfig.BrokerIdProp -> brokerId.toString,
KafkaConfig.ListenersProp -> listener,
KafkaConfig.AdvertisedListenersProp -> listener,
KafkaConfig.AutoCreateTopicsEnableProp -> autoCreateTopics.toString,
KafkaConfig.LogDirProp -> kafkaLogDir.toAbsolutePath.toString,
KafkaConfig.LogFlushIntervalMessagesProp -> 1.toString,
KafkaConfig.OffsetsTopicReplicationFactorProp -> 1.toString,
KafkaConfig.OffsetsTopicPartitionsProp -> 1.toString,
KafkaConfig.TransactionsTopicReplicationFactorProp -> 1.toString,
KafkaConfig.TransactionsTopicMinISRProp -> 1.toString,
ZkConfigs.ZK_CONNECT_CONFIG -> zkAddress,
ServerConfigs.BROKER_ID_CONFIG -> brokerId.toString,
SocketServerConfigs.LISTENERS_CONFIG -> listener,
SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG -> listener,
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> autoCreateTopics.toString,
ServerLogConfigs.LOG_DIRS_CONFIG -> kafkaLogDir.toAbsolutePath.toString,
ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG -> 1.toString,
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString,
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> 1.toString,
TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString,
TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG -> 1.toString,
// The total memory used for log deduplication across all cleaner threads, keep it small to not exhaust suite memory
KafkaConfig.LogCleanerDedupeBufferSizeProp -> logCleanerDedupeBufferSize.toString
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> logCleanerDedupeBufferSize.toString
) ++ customBrokerProperties

val broker = new KafkaServer(new KafkaConfig(brokerProperties.asJava))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.github.embeddedkafka

import kafka.server.KafkaConfig
import io.github.embeddedkafka.EmbeddedKafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs}

import scala.language.postfixOps
import scala.util.Random
Expand All @@ -16,8 +16,8 @@ class EmbeddedKafkaCustomConfigSpec extends EmbeddedKafkaSpecSupport {
"allow pass additional producer parameters" in {
val customBrokerConfig =
Map(
KafkaConfig.ReplicaFetchMaxBytesProp -> s"$ThreeMegabytes",
KafkaConfig.MessageMaxBytesProp -> s"$ThreeMegabytes"
ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG -> s"$ThreeMegabytes",
ServerConfigs.MESSAGE_MAX_BYTES_CONFIG -> s"$ThreeMegabytes"
)

val customProducerConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package io.github.embeddedkafka

import java.util.Collections
import java.util.concurrent.TimeoutException
import kafka.server.KafkaConfig
import kafka.zk.KafkaZkClient
import io.github.embeddedkafka.EmbeddedKafka._
import io.github.embeddedkafka.serializers.{
Expand All @@ -18,6 +17,7 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.storage.internals.log.CleanerConfig
import org.apache.zookeeper.client.ZKClientConfig
import org.scalatest.concurrent.JavaFutures
import org.scalatest.time.{Milliseconds, Seconds, Span}
Expand Down Expand Up @@ -159,7 +159,7 @@ class EmbeddedKafkaMethodsSpec
"create a topic with a custom configuration" in {
implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
customBrokerProperties = Map(
KafkaConfig.LogCleanerDedupeBufferSizeProp -> 2000000.toString
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> 2000000.toString
)
)
val topic = "test_custom_topic"
Expand Down Expand Up @@ -220,7 +220,7 @@ class EmbeddedKafkaMethodsSpec
"either delete of mark for deletion a list of topics" in {
implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
customBrokerProperties = Map(
KafkaConfig.LogCleanerDedupeBufferSizeProp -> 2000000.toString
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> 2000000.toString
)
)
val topics = List("test_topic_deletion_1", "test_topic_deletion_2")
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ object Dependencies {
val Scala3 = "3.3.3"
val Scala213 = "2.13.14"
val Scala212 = "2.12.19"
val Kafka = "3.7.1"
val Kafka = "3.8.0"
val Slf4j = "1.7.36"
val ScalaTest = "3.2.19"
}
Expand Down

0 comments on commit a72c04c

Please sign in to comment.