From c03682dc3be1123592dc1bf641d1987ef72ca6ef Mon Sep 17 00:00:00 2001 From: fholmqvist Date: Wed, 17 Aug 2016 13:28:42 +0200 Subject: [PATCH 1/7] Update to kafka 0.10.0.1 kafka client. --- app/kafka/manager/utils/zero90/GroupMetadataManager.scala | 5 +++-- build.sbt | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/app/kafka/manager/utils/zero90/GroupMetadataManager.scala b/app/kafka/manager/utils/zero90/GroupMetadataManager.scala index f6b419aa3..e14c2c764 100644 --- a/app/kafka/manager/utils/zero90/GroupMetadataManager.scala +++ b/app/kafka/manager/utils/zero90/GroupMetadataManager.scala @@ -18,9 +18,10 @@ package kafka.manager.utils.zero90 import java.nio.ByteBuffer -import kafka.common.{KafkaException, TopicAndPartition, OffsetAndMetadata} +import kafka.common.{KafkaException, OffsetAndMetadata} import kafka.coordinator.{GroupMetadataKey, GroupTopicPartition, OffsetKey, BaseKey} import org.apache.kafka.clients.consumer.internals.ConsumerProtocol +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.types.Type._ import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct} @@ -199,7 +200,7 @@ object GroupMetadataManager { val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String] val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int] - OffsetKey(version, GroupTopicPartition(group, TopicAndPartition(topic, partition))) + OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, partition))) } else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) { // version 2 refers to offset diff --git a/build.sbt b/build.sbt index 406ec4b91..b9c759c18 100644 --- a/build.sbt +++ b/build.sbt @@ -35,7 +35,7 @@ libraryDependencies ++= Seq( "org.slf4j" % "log4j-over-slf4j" % "1.7.12", "com.adrianhurt" %% "play-bootstrap3" % "0.4.5-P24", "org.clapper" %% "grizzled-slf4j" % "1.0.2", - "org.apache.kafka" %% "kafka" % "0.9.0.1" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), + "org.apache.kafka" %% "kafka" % "0.10.0.1" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), "com.beachape" %% "enumeratum" % "1.4.4", "org.scalatest" %% "scalatest" % "2.2.1" % "test", "org.apache.curator" % "curator-test" % "2.10.0" % "test", From dd92b03c6a0c791c197bb4cb0fa928f5c1b66b51 Mon Sep 17 00:00:00 2001 From: fholmqvist Date: Wed, 17 Aug 2016 13:30:46 +0200 Subject: [PATCH 2/7] Include which cluster the warning is about. --- app/kafka/manager/actor/cluster/KafkaStateActor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index 716f72fb6..625e3193b 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -295,7 +295,7 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext lastUpdateTimeMillis = System.currentTimeMillis() } catch { case e: Exception => - warn("Failed to process a message from offset topic!", e) + warn(s"Failed to process a message from offset topic on cluster ${clusterContext.config.name}!", e) } } } finally { From 5f20535518bd634304092af27c7dfc2d14e72d84 Mon Sep 17 00:00:00 2001 From: fholmqvist Date: Wed, 17 Aug 2016 13:33:13 +0200 Subject: [PATCH 3/7] MinHeaderSize was removed as it was same as MinMessageOverhead --- app/kafka/manager/utils/zero90/LogConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/kafka/manager/utils/zero90/LogConfig.scala b/app/kafka/manager/utils/zero90/LogConfig.scala index e2088c00e..ce8f9d2c1 100644 --- a/app/kafka/manager/utils/zero90/LogConfig.scala +++ b/app/kafka/manager/utils/zero90/LogConfig.scala @@ -133,7 +133,7 @@ object LogConfig extends TopicConfigs { import ConfigDef.Importance._ new ConfigDef() - .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(Message.MinHeaderSize), MEDIUM, SegmentSizeDoc) + .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(Message.MinMessageOverhead), MEDIUM, SegmentSizeDoc) .define(SegmentMsProp, LONG, Defaults.SegmentMs, atLeast(0), MEDIUM, SegmentMsDoc) .define(SegmentJitterMsProp, LONG, Defaults.SegmentJitterMs, atLeast(0), MEDIUM, SegmentJitterMsDoc) .define(SegmentIndexBytesProp, INT, Defaults.MaxIndexSize, atLeast(0), MEDIUM, MaxIndexSizeDoc) From f4d006d2f8ef1d66a53e5970033f47113d02b670 Mon Sep 17 00:00:00 2001 From: fholmqvist Date: Wed, 17 Aug 2016 13:34:24 +0200 Subject: [PATCH 4/7] Enable Kafka 0.10.0.0 and 0.10.0.1. Seems to work ok, needs proper testing. --- app/controllers/Logkafka.scala | 8 ++++++++ app/controllers/Topic.scala | 6 ++++++ app/kafka/manager/actor/cluster/KafkaStateActor.scala | 2 +- app/kafka/manager/model/model.scala | 10 +++++++++- app/kafka/manager/utils/LogkafkaNewConfigs.scala | 4 +++- app/kafka/manager/utils/TopicConfigs.scala | 4 +++- 6 files changed, 30 insertions(+), 4 deletions(-) diff --git a/app/controllers/Logkafka.scala b/app/controllers/Logkafka.scala index 33b4fe2ea..8ee899c6c 100644 --- a/app/controllers/Logkafka.scala +++ b/app/controllers/Logkafka.scala @@ -66,6 +66,10 @@ class Logkafka (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaMana LogkafkaNewConfigs.configMaps(Kafka_0_9_0_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) val kafka_0_9_0_1_Default = CreateLogkafka("","", LogkafkaNewConfigs.configMaps(Kafka_0_9_0_1).map{case(k,v) => LKConfig(k,Some(v))}.toList) + val kafka_0_10_0_0_Default = CreateLogkafka("","", + LogkafkaNewConfigs.configMaps(Kafka_0_10_0_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) + val kafka_0_10_0_1_Default = CreateLogkafka("","", + LogkafkaNewConfigs.configMaps(Kafka_0_10_0_1).map{case(k,v) => LKConfig(k,Some(v))}.toList) val defaultCreateForm = Form( mapping( @@ -110,6 +114,8 @@ class Logkafka (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaMana case Kafka_0_8_2_2 => (defaultCreateForm.fill(kafka_0_8_2_2_Default), clusterContext) case Kafka_0_9_0_0 => (defaultCreateForm.fill(kafka_0_9_0_0_Default), clusterContext) case Kafka_0_9_0_1 => (defaultCreateForm.fill(kafka_0_9_0_1_Default), clusterContext) + case Kafka_0_10_0_0 => (defaultCreateForm.fill(kafka_0_10_0_0_Default), clusterContext) + case Kafka_0_10_0_1 => (defaultCreateForm.fill(kafka_0_10_0_1_Default), clusterContext) } } } @@ -198,6 +204,8 @@ class Logkafka (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaMana case Kafka_0_8_2_2 => LogkafkaNewConfigs.configNames(Kafka_0_8_2_2).map(n => (n,LKConfig(n,None))).toMap case Kafka_0_9_0_0 => LogkafkaNewConfigs.configNames(Kafka_0_9_0_0).map(n => (n,LKConfig(n,None))).toMap case Kafka_0_9_0_1 => LogkafkaNewConfigs.configNames(Kafka_0_9_0_1).map(n => (n,LKConfig(n,None))).toMap + case Kafka_0_10_0_0 => LogkafkaNewConfigs.configNames(Kafka_0_10_0_0).map(n => (n,LKConfig(n,None))).toMap + case Kafka_0_10_0_1 => LogkafkaNewConfigs.configNames(Kafka_0_10_0_1).map(n => (n,LKConfig(n,None))).toMap } val identityOption = li.identityMap.get(log_path) if (identityOption.isDefined) { diff --git a/app/controllers/Topic.scala b/app/controllers/Topic.scala index f5f3e15bc..178d02da1 100644 --- a/app/controllers/Topic.scala +++ b/app/controllers/Topic.scala @@ -52,6 +52,8 @@ class Topic (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManager val kafka_0_8_2_2_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_8_2_2).map(n => TConfig(n,None)).toList) val kafka_0_9_0_0_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_9_0_0).map(n => TConfig(n,None)).toList) val kafka_0_9_0_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_9_0_1).map(n => TConfig(n,None)).toList) + val kafka_0_10_0_0_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_0_0).map(n => TConfig(n,None)).toList) + val kafka_0_10_0_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_0_1).map(n => TConfig(n,None)).toList) val defaultCreateForm = Form( mapping( @@ -136,6 +138,8 @@ class Topic (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManager case Kafka_0_8_2_2 => (defaultCreateForm.fill(kafka_0_8_2_2_Default), clusterContext) case Kafka_0_9_0_0 => (defaultCreateForm.fill(kafka_0_9_0_0_Default), clusterContext) case Kafka_0_9_0_1 => (defaultCreateForm.fill(kafka_0_9_0_1_Default), clusterContext) + case Kafka_0_10_0_0 => (defaultCreateForm.fill(kafka_0_10_0_0_Default), clusterContext) + case Kafka_0_10_0_1 => (defaultCreateForm.fill(kafka_0_10_0_1_Default), clusterContext) } } } @@ -370,6 +374,8 @@ class Topic (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManager case Kafka_0_8_2_2 => TopicConfigs.configNames(Kafka_0_8_2_2).map(n => (n,TConfig(n,None))).toMap case Kafka_0_9_0_0 => TopicConfigs.configNames(Kafka_0_9_0_0).map(n => (n,TConfig(n,None))).toMap case Kafka_0_9_0_1 => TopicConfigs.configNames(Kafka_0_9_0_1).map(n => (n,TConfig(n,None))).toMap + case Kafka_0_10_0_0 => TopicConfigs.configNames(Kafka_0_10_0_0).map(n => (n,TConfig(n,None))).toMap + case Kafka_0_10_0_1 => TopicConfigs.configNames(Kafka_0_10_0_1).map(n => (n,TConfig(n,None))).toMap } val combinedMap = defaultConfigMap ++ ti.config.toMap.map(tpl => tpl._1 -> TConfig(tpl._1,Option(tpl._2))) (defaultUpdateConfigForm.fill(UpdateTopicConfig(ti.topic,combinedMap.toList.map(_._2),ti.configReadVersion)), diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index 625e3193b..6c118cfe8 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -143,7 +143,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath object KafkaManagedOffsetCache { - val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1) + val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1) val ConsumerOffsetTopic = "__consumer_offsets" def isSupported(version: KafkaVersion) : Boolean = { diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index bfcafbc0c..7fa08502f 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -37,6 +37,12 @@ case object Kafka_0_9_0_0 extends KafkaVersion { case object Kafka_0_9_0_1 extends KafkaVersion { override def toString = "0.9.0.1" } +case object Kafka_0_10_0_0 extends KafkaVersion { + override def toString = "0.10.0.0" +} +case object Kafka_0_10_0_1 extends KafkaVersion { + override def toString = "0.10.0.1" +} object KafkaVersion { val supportedVersions: Map[String,KafkaVersion] = Map( @@ -46,7 +52,9 @@ object KafkaVersion { "0.8.2.1" -> Kafka_0_8_2_1, "0.8.2.2" -> Kafka_0_8_2_2, "0.9.0.0" -> Kafka_0_9_0_0, - "0.9.0.1" -> Kafka_0_9_0_1 + "0.9.0.1" -> Kafka_0_9_0_1, + "0.10.0.0" -> Kafka_0_10_0_0, + "0.10.0.1" -> Kafka_0_10_0_1 ) val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)) diff --git a/app/kafka/manager/utils/LogkafkaNewConfigs.scala b/app/kafka/manager/utils/LogkafkaNewConfigs.scala index 014661d45..22eceeea7 100644 --- a/app/kafka/manager/utils/LogkafkaNewConfigs.scala +++ b/app/kafka/manager/utils/LogkafkaNewConfigs.scala @@ -23,7 +23,9 @@ object LogkafkaNewConfigs { Kafka_0_8_2_1 -> logkafka82.LogConfig, Kafka_0_8_2_2 -> logkafka82.LogConfig, Kafka_0_9_0_0 -> logkafka82.LogConfig, - Kafka_0_9_0_1 -> logkafka82.LogConfig + Kafka_0_9_0_1 -> logkafka82.LogConfig, + Kafka_0_10_0_0 -> logkafka82.LogConfig, + Kafka_0_10_0_1 -> logkafka82.LogConfig ) def configNames(version: KafkaVersion) : Set[String] = { diff --git a/app/kafka/manager/utils/TopicConfigs.scala b/app/kafka/manager/utils/TopicConfigs.scala index fad3e22bb..ac3930748 100644 --- a/app/kafka/manager/utils/TopicConfigs.scala +++ b/app/kafka/manager/utils/TopicConfigs.scala @@ -23,7 +23,9 @@ object TopicConfigs { Kafka_0_8_2_1 -> zero82.LogConfig, Kafka_0_8_2_2 -> zero82.LogConfig, Kafka_0_9_0_0 -> zero90.LogConfig, - Kafka_0_9_0_1 -> zero90.LogConfig + Kafka_0_9_0_1 -> zero90.LogConfig, + Kafka_0_10_0_0 -> zero90.LogConfig, + Kafka_0_10_0_1 -> zero90.LogConfig ) def configNames(version: KafkaVersion) : Set[String] = { From d6e351349182f7b65f8750992823168bd6bb6629 Mon Sep 17 00:00:00 2001 From: fholmqvist Date: Wed, 17 Aug 2016 16:03:58 +0200 Subject: [PATCH 5/7] Fix deprecations by using new Java API. --- test/kafka/manager/TestKafkaManager.scala | 2 +- test/kafka/test/SeededBroker.scala | 24 ++++++++++------------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/test/kafka/manager/TestKafkaManager.scala b/test/kafka/manager/TestKafkaManager.scala index 8f884875a..cf09121e5 100644 --- a/test/kafka/manager/TestKafkaManager.scala +++ b/test/kafka/manager/TestKafkaManager.scala @@ -85,7 +85,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { var count = 0 while(!hlShutdown.get()) { simpleProducer.foreach { p => - p.send(s"simple message $count") + p.send(s"simple message $count", null) count+=1 Thread.sleep(500) } diff --git a/test/kafka/test/SeededBroker.scala b/test/kafka/test/SeededBroker.scala index c7707c8ff..bbde4721f 100644 --- a/test/kafka/test/SeededBroker.scala +++ b/test/kafka/test/SeededBroker.scala @@ -12,13 +12,13 @@ import kafka.consumer._ import kafka.manager.model.Kafka_0_8_2_0 import kafka.manager.utils.AdminUtils import kafka.message.{NoCompressionCodec, DefaultCompressionCodec} -import kafka.producer.{KeyedMessage, ProducerConfig, Producer} import kafka.serializer.DefaultDecoder import org.apache.curator.framework.imps.CuratorFrameworkState import org.apache.curator.framework.{CuratorFrameworkFactory, CuratorFramework} import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.curator.test.TestingServer import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer} +import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord} import scala.util.Try @@ -171,27 +171,23 @@ case class SimpleProducer(topic: String, props.put("compression.codec", codec.toString) props.put("producer.type", if(synchronously) "sync" else "async") - props.put("metadata.broker.list", brokerList) + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + props.put("batch.num.messages", batchSize.toString) props.put("message.send.max.retries", messageSendMaxRetries.toString) props.put("request.required.acks",requestRequiredAcks.toString) props.put("client.id",clientId.toString) - val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props)) - - def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = { - if (partition == null) { - new KeyedMessage(topic,message) - } else { - new KeyedMessage(topic,partition,message) - } - } + val producer = new KafkaProducer[AnyRef, AnyRef](props) - def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8")) + def send(message: String, partition: Integer): Unit = send(message.getBytes("UTF8"), partition) - def send(message: Array[Byte], partition: Array[Byte]): Unit = { + def send(message: Array[Byte], partition: Integer): Unit = { try { - producer.send(kafkaMesssage(message, partition)) + val future = producer.send(new ProducerRecord[AnyRef,AnyRef](topic, partition, null, message)) + if(synchronously) future.get() } catch { case e: Exception => e.printStackTrace From 091f205679be3c725ad716b04240f535d3599cbb Mon Sep 17 00:00:00 2001 From: fholmqvist Date: Tue, 6 Dec 2016 15:17:15 +0100 Subject: [PATCH 6/7] Add 0.10.1.0 in scala, no new client jar The client jar for 0.10.1.0 isnt backwards compatible. Probably kafka versions should be broken out to plugins with their own jar. --- app/controllers/Logkafka.scala | 4 ++++ app/controllers/Topic.scala | 3 +++ app/kafka/manager/actor/cluster/KafkaStateActor.scala | 2 +- app/kafka/manager/model/model.scala | 7 ++++++- app/kafka/manager/utils/LogkafkaNewConfigs.scala | 3 ++- app/kafka/manager/utils/TopicConfigs.scala | 3 ++- 6 files changed, 18 insertions(+), 4 deletions(-) diff --git a/app/controllers/Logkafka.scala b/app/controllers/Logkafka.scala index 8ee899c6c..39c48e08f 100644 --- a/app/controllers/Logkafka.scala +++ b/app/controllers/Logkafka.scala @@ -70,6 +70,8 @@ class Logkafka (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaMana LogkafkaNewConfigs.configMaps(Kafka_0_10_0_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) val kafka_0_10_0_1_Default = CreateLogkafka("","", LogkafkaNewConfigs.configMaps(Kafka_0_10_0_1).map{case(k,v) => LKConfig(k,Some(v))}.toList) + val kafka_0_10_1_0_Default = CreateLogkafka("","", + LogkafkaNewConfigs.configMaps(Kafka_0_10_1_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) val defaultCreateForm = Form( mapping( @@ -116,6 +118,7 @@ class Logkafka (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaMana case Kafka_0_9_0_1 => (defaultCreateForm.fill(kafka_0_9_0_1_Default), clusterContext) case Kafka_0_10_0_0 => (defaultCreateForm.fill(kafka_0_10_0_0_Default), clusterContext) case Kafka_0_10_0_1 => (defaultCreateForm.fill(kafka_0_10_0_1_Default), clusterContext) + case Kafka_0_10_1_0 => (defaultCreateForm.fill(kafka_0_10_1_0_Default), clusterContext) } } } @@ -206,6 +209,7 @@ class Logkafka (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaMana case Kafka_0_9_0_1 => LogkafkaNewConfigs.configNames(Kafka_0_9_0_1).map(n => (n,LKConfig(n,None))).toMap case Kafka_0_10_0_0 => LogkafkaNewConfigs.configNames(Kafka_0_10_0_0).map(n => (n,LKConfig(n,None))).toMap case Kafka_0_10_0_1 => LogkafkaNewConfigs.configNames(Kafka_0_10_0_1).map(n => (n,LKConfig(n,None))).toMap + case Kafka_0_10_1_0 => LogkafkaNewConfigs.configNames(Kafka_0_10_1_0).map(n => (n,LKConfig(n,None))).toMap } val identityOption = li.identityMap.get(log_path) if (identityOption.isDefined) { diff --git a/app/controllers/Topic.scala b/app/controllers/Topic.scala index 178d02da1..de21b2909 100644 --- a/app/controllers/Topic.scala +++ b/app/controllers/Topic.scala @@ -54,6 +54,7 @@ class Topic (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManager val kafka_0_9_0_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_9_0_1).map(n => TConfig(n,None)).toList) val kafka_0_10_0_0_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_0_0).map(n => TConfig(n,None)).toList) val kafka_0_10_0_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_0_1).map(n => TConfig(n,None)).toList) + val kafka_0_10_1_0_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_10_1_0).map(n => TConfig(n,None)).toList) val defaultCreateForm = Form( mapping( @@ -140,6 +141,7 @@ class Topic (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManager case Kafka_0_9_0_1 => (defaultCreateForm.fill(kafka_0_9_0_1_Default), clusterContext) case Kafka_0_10_0_0 => (defaultCreateForm.fill(kafka_0_10_0_0_Default), clusterContext) case Kafka_0_10_0_1 => (defaultCreateForm.fill(kafka_0_10_0_1_Default), clusterContext) + case Kafka_0_10_1_0 => (defaultCreateForm.fill(kafka_0_10_1_0_Default), clusterContext) } } } @@ -376,6 +378,7 @@ class Topic (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManager case Kafka_0_9_0_1 => TopicConfigs.configNames(Kafka_0_9_0_1).map(n => (n,TConfig(n,None))).toMap case Kafka_0_10_0_0 => TopicConfigs.configNames(Kafka_0_10_0_0).map(n => (n,TConfig(n,None))).toMap case Kafka_0_10_0_1 => TopicConfigs.configNames(Kafka_0_10_0_1).map(n => (n,TConfig(n,None))).toMap + case Kafka_0_10_1_0 => TopicConfigs.configNames(Kafka_0_10_1_0).map(n => (n,TConfig(n,None))).toMap } val combinedMap = defaultConfigMap ++ ti.config.toMap.map(tpl => tpl._1 -> TConfig(tpl._1,Option(tpl._2))) (defaultUpdateConfigForm.fill(UpdateTopicConfig(ti.topic,combinedMap.toList.map(_._2),ti.configReadVersion)), diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index 6c118cfe8..fd337254a 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -143,7 +143,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath object KafkaManagedOffsetCache { - val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1) + val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0) val ConsumerOffsetTopic = "__consumer_offsets" def isSupported(version: KafkaVersion) : Boolean = { diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index bff293e95..1c7dd5e27 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -44,6 +44,10 @@ case object Kafka_0_10_0_1 extends KafkaVersion { override def toString = "0.10.0.1" } +case object Kafka_0_10_1_0 extends KafkaVersion { + override def toString = "0.10.1.0" +} + object KafkaVersion { val supportedVersions: Map[String,KafkaVersion] = Map( "0.8.1.1" -> Kafka_0_8_1_1, @@ -54,7 +58,8 @@ object KafkaVersion { "0.9.0.0" -> Kafka_0_9_0_0, "0.9.0.1" -> Kafka_0_9_0_1, "0.10.0.0" -> Kafka_0_10_0_0, - "0.10.0.1" -> Kafka_0_10_0_1 + "0.10.0.1" -> Kafka_0_10_0_1, + "0.10.1.0" -> Kafka_0_10_1_0 ) val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)) diff --git a/app/kafka/manager/utils/LogkafkaNewConfigs.scala b/app/kafka/manager/utils/LogkafkaNewConfigs.scala index 22eceeea7..1398e7a80 100644 --- a/app/kafka/manager/utils/LogkafkaNewConfigs.scala +++ b/app/kafka/manager/utils/LogkafkaNewConfigs.scala @@ -25,7 +25,8 @@ object LogkafkaNewConfigs { Kafka_0_9_0_0 -> logkafka82.LogConfig, Kafka_0_9_0_1 -> logkafka82.LogConfig, Kafka_0_10_0_0 -> logkafka82.LogConfig, - Kafka_0_10_0_1 -> logkafka82.LogConfig + Kafka_0_10_0_1 -> logkafka82.LogConfig, + Kafka_0_10_1_0 -> logkafka82.LogConfig ) def configNames(version: KafkaVersion) : Set[String] = { diff --git a/app/kafka/manager/utils/TopicConfigs.scala b/app/kafka/manager/utils/TopicConfigs.scala index ac3930748..07d110a33 100644 --- a/app/kafka/manager/utils/TopicConfigs.scala +++ b/app/kafka/manager/utils/TopicConfigs.scala @@ -25,7 +25,8 @@ object TopicConfigs { Kafka_0_9_0_0 -> zero90.LogConfig, Kafka_0_9_0_1 -> zero90.LogConfig, Kafka_0_10_0_0 -> zero90.LogConfig, - Kafka_0_10_0_1 -> zero90.LogConfig + Kafka_0_10_0_1 -> zero90.LogConfig, + Kafka_0_10_1_0 -> zero90.LogConfig ) def configNames(version: KafkaVersion) : Set[String] = { From 180ee24169d9182503d3ad7f104bb5d8a71fad36 Mon Sep 17 00:00:00 2001 From: Frank Lyaruu Date: Tue, 6 Dec 2016 17:26:14 +0100 Subject: [PATCH 7/7] Fix incompatible 0.10.1 message format. You only notice when there are 0.10.1 clients talking to the cluster, which will lead to "Unknown version 1 for group metadata message" errors. --- .../utils/zero90/GroupMetadataManager.scala | 71 ++++++++++++++++++- 1 file changed, 68 insertions(+), 3 deletions(-) diff --git a/app/kafka/manager/utils/zero90/GroupMetadataManager.scala b/app/kafka/manager/utils/zero90/GroupMetadataManager.scala index e14c2c764..802ebf6fa 100644 --- a/app/kafka/manager/utils/zero90/GroupMetadataManager.scala +++ b/app/kafka/manager/utils/zero90/GroupMetadataManager.scala @@ -61,12 +61,23 @@ object GroupMetadataManager { private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING)) private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group") - private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING), + private val MEMBER_METADATA_V0 = new Schema( + new Field("member_id", STRING), new Field("client_id", STRING), new Field("client_host", STRING), new Field("session_timeout", INT32), new Field("subscription", BYTES), new Field("assignment", BYTES)) + + private val MEMBER_METADATA_V1 = new Schema( + new Field("member_id", STRING), + new Field("client_id", STRING), + new Field("client_host", STRING), + new Field("session_timeout", INT32), + new Field("rebalance_timeout", INT32), + new Field("subscription", BYTES), + new Field("assignment", BYTES)) + private val MEMBER_METADATA_MEMBER_ID_V0 = MEMBER_METADATA_V0.get("member_id") private val MEMBER_METADATA_CLIENT_ID_V0 = MEMBER_METADATA_V0.get("client_id") private val MEMBER_METADATA_CLIENT_HOST_V0 = MEMBER_METADATA_V0.get("client_host") @@ -74,18 +85,41 @@ object GroupMetadataManager { private val MEMBER_METADATA_SUBSCRIPTION_V0 = MEMBER_METADATA_V0.get("subscription") private val MEMBER_METADATA_ASSIGNMENT_V0 = MEMBER_METADATA_V0.get("assignment") + private val MEMBER_METADATA_MEMBER_ID_V1 = MEMBER_METADATA_V1.get("member_id") + private val MEMBER_METADATA_CLIENT_ID_V1 = MEMBER_METADATA_V1.get("client_id") + private val MEMBER_METADATA_CLIENT_HOST_V1 = MEMBER_METADATA_V1.get("client_host") + private val MEMBER_METADATA_SESSION_TIMEOUT_V1 = MEMBER_METADATA_V1.get("session_timeout") + private val MEMBER_METADATA_REBALANCE_TIMEOUT_V1 = MEMBER_METADATA_V1.get("rebalance_timeout") + private val MEMBER_METADATA_SUBSCRIPTION_V1 = MEMBER_METADATA_V1.get("subscription") + private val MEMBER_METADATA_ASSIGNMENT_V1 = MEMBER_METADATA_V1.get("assignment") + - private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(new Field("protocol_type", STRING), + private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema( + new Field("protocol_type", STRING), new Field("generation", INT32), new Field("protocol", STRING), new Field("leader", STRING), new Field("members", new ArrayOf(MEMBER_METADATA_V0))) + + private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema( + new Field("protocol_type", STRING), + new Field("generation", INT32), + new Field("protocol", NULLABLE_STRING), + new Field("leader", NULLABLE_STRING), + new Field("members", new ArrayOf(MEMBER_METADATA_V1))) + private val GROUP_METADATA_PROTOCOL_TYPE_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol_type") private val GROUP_METADATA_GENERATION_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("generation") private val GROUP_METADATA_PROTOCOL_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol") private val GROUP_METADATA_LEADER_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("leader") private val GROUP_METADATA_MEMBERS_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("members") + private val GROUP_METADATA_PROTOCOL_TYPE_V1 = GROUP_METADATA_VALUE_SCHEMA_V1.get("protocol_type") + private val GROUP_METADATA_GENERATION_V1 = GROUP_METADATA_VALUE_SCHEMA_V1.get("generation") + private val GROUP_METADATA_PROTOCOL_V1 = GROUP_METADATA_VALUE_SCHEMA_V1.get("protocol") + private val GROUP_METADATA_LEADER_V1 = GROUP_METADATA_VALUE_SCHEMA_V1.get("leader") + private val GROUP_METADATA_MEMBERS_V1 = GROUP_METADATA_VALUE_SCHEMA_V1.get("members") + // map of versions to key schemas as data types private val MESSAGE_TYPE_SCHEMAS = Map( 0 -> OFFSET_COMMIT_KEY_SCHEMA, @@ -99,7 +133,7 @@ object GroupMetadataManager { private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort // map of version of group metadata value schemas - private val GROUP_VALUE_SCHEMAS = Map(0 -> GROUP_METADATA_VALUE_SCHEMA_V0) + private val GROUP_VALUE_SCHEMAS = Map(0 -> GROUP_METADATA_VALUE_SCHEMA_V0,1 -> GROUP_METADATA_VALUE_SCHEMA_V1) private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 0.toShort private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION) @@ -290,6 +324,37 @@ object GroupMetadataManager { group.add(memberId, member) } group + } else if (version == 1){ + val protocolType = value.get(GROUP_METADATA_PROTOCOL_TYPE_V1).asInstanceOf[String] + + val generationId = value.get(GROUP_METADATA_GENERATION_V1).asInstanceOf[Int] + val leaderId = value.get(GROUP_METADATA_LEADER_V1).asInstanceOf[String] + val protocol = value.get(GROUP_METADATA_PROTOCOL_V1).asInstanceOf[String] + val group = new GroupMetadata(groupId, protocolType, generationId, leaderId, protocol) + + value.getArray(GROUP_METADATA_MEMBERS_V1).foreach { + case memberMetadataObj => + val memberMetadata = memberMetadataObj.asInstanceOf[Struct] + val memberId = memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V1).asInstanceOf[String] + val clientId = memberMetadata.get(MEMBER_METADATA_CLIENT_ID_V1).asInstanceOf[String] + val clientHost = memberMetadata.get(MEMBER_METADATA_CLIENT_HOST_V1).asInstanceOf[String] + //val sessionTimeout = memberMetadata.get(MEMBER_METADATA_SESSION_TIMEOUT_V0).asInstanceOf[Int] + val subscription = ConsumerProtocol.deserializeSubscription(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V1).asInstanceOf[ByteBuffer]) + val assignment = ConsumerProtocol.deserializeAssignment(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V1).asInstanceOf[ByteBuffer]) + + import collection.JavaConverters._ + val member = new MemberMetadata( + memberId + , groupId + , clientId + , clientHost + //, sessionTimeout + , List((group.protocol, subscription.topics().asScala.toSet)) + , assignment.partitions().asScala.map(tp => tp.topic() -> tp.partition()).toSet + ) + group.add(memberId, member) + } + group } else { throw new IllegalStateException("Unknown group metadata message version") }