Skip to content

Commit

Permalink
Changes to align with Kafka 0.8
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Nov 11, 2014
1 parent 300887b commit fac8fd6
Showing 1 changed file with 13 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ import java.util.concurrent.Executors

import kafka.consumer._
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient._
import kafka.utils.{ZkUtils, VerifiableProperties}

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -97,10 +95,18 @@ class KafkaReceiver[
consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + zkConnect)

// When auto.offset.reset is defined, it is our responsibility to try and whack the
// consumer group zk node.
if (kafkaParams.contains("auto.offset.reset")) {
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
// When auto.offset.reset is defined as smallest, it is our responsibility to try and whack the
// consumer group zk node. This parameter is triggered when current offset is out of range,
// default is 'largest', which means shifting the offset to the end of current queue.
//
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick
// copied from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is
// set to 'smallest':
// scalastyle:off
// https://github.com/apache/kafka/blob/0.8.0/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala:181
// scalastyle:on
if (kafkaParams.get("auto.offset.reset") == Some("smallest")) {
ZkUtils.maybeDeletePath(zkConnect, "/consumers/" + kafkaParams("group.id"))
}

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
Expand Down Expand Up @@ -139,26 +145,4 @@ class KafkaReceiver[
}
}
}

// It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
// is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
//
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
// from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
// 'smallest'/'largest':
// scalastyle:off
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
// scalastyle:on
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
val dir = "/consumers/" + groupId
logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
try {
zk.deleteRecursive(dir)
} catch {
case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
} finally {
zk.close()
}
}
}

0 comments on commit fac8fd6

Please sign in to comment.