Skip to content

Commit

Permalink
Move offset reset to a helper function to let user explicitly delete …
Browse files Browse the repository at this point in the history
…ZK metadata by calling this API
  • Loading branch information
jerryshao committed Nov 11, 2014
1 parent fac8fd6 commit b2c1430
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.concurrent.Executors

import kafka.consumer._
import kafka.serializer.Decoder
import kafka.utils.{ZkUtils, VerifiableProperties}
import kafka.utils.VerifiableProperties

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -95,21 +95,7 @@ class KafkaReceiver[
consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + zkConnect)

// When auto.offset.reset is defined as smallest, it is our responsibility to try and whack the
// consumer group zk node. This parameter is triggered when current offset is out of range,
// default is 'largest', which means shifting the offset to the end of current queue.
//
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick
// copied from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is
// set to 'smallest':
// scalastyle:off
// https://github.com/apache/kafka/blob/0.8.0/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala:181
// scalastyle:on
if (kafkaParams.get("auto.offset.reset") == Some("smallest")) {
ZkUtils.maybeDeletePath(zkConnect, "/consumers/" + kafkaParams("group.id"))
}

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@

package org.apache.spark.streaming.kafka


import scala.reflect.ClassTag
import scala.collection.JavaConversions._

import java.lang.{Integer => JInt}
import java.util.{Map => JMap}

import kafka.serializer.{Decoder, StringDecoder}
import kafka.utils.ZkUtils

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream


object KafkaUtils {
Expand Down Expand Up @@ -145,4 +147,23 @@ object KafkaUtils {
createStream[K, V, U, T](
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}

/**
* Delete the consumer group related Zookeeper metadata immediately,
* force consumer to ignore previous consumer offset and directly read data from the beginning
* or end of the partition. The behavior of reading data from the beginning or end of the
* partition also relies on kafka parameter 'auto.offset.reset':
* When 'auto.offset.reset' = 'smallest', directly read data from beginning,
* will re-read the whole partition.
* When 'auto.offset.reset' = 'largest', directly read data from end, ignore old, unwanted data.
* This is default in Kafka 0.8.
*
* To avoid concurrent deleting Zookeeper metadata in each Receiver when multiple consumers are
* launched, this should be call be createStream().
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
*/
def resetOffset(zkQuorum: String, groupId: String) {
ZkUtils.maybeDeletePath(zkQuorum, s"/consumers/$groupId")
}
}

0 comments on commit b2c1430

Please sign in to comment.