Skip to content

Commit

Permalink
Address the comment to remove the resetOffset() function
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Nov 11, 2014
1 parent de3a4c8 commit d6ae94d
Showing 1 changed file with 2 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,19 @@

package org.apache.spark.streaming.kafka

import java.lang.{Integer => JInt}
import java.util.{Map => JMap}

import scala.reflect.ClassTag
import scala.collection.JavaConversions._

import java.lang.{Integer => JInt}
import java.util.{Map => JMap}

import kafka.serializer.{Decoder, StringDecoder}
import kafka.utils.ZkUtils

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream


object KafkaUtils {
/**
* Create an input stream that pulls messages from a Kafka Broker.
Expand Down Expand Up @@ -147,23 +144,4 @@ object KafkaUtils {
createStream[K, V, U, T](
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}

/**
* Delete the consumer group related Zookeeper metadata immediately,
* force consumer to ignore previous consumer offset and directly read data from the beginning
* or end of the partition. The behavior of reading data from the beginning or end of the
* partition also relies on kafka parameter 'auto.offset.reset':
* When 'auto.offset.reset' = 'smallest', directly read data from beginning,
* will re-read the whole partition.
* When 'auto.offset.reset' = 'largest', directly read data from end, ignore old, unwanted data.
* This is default in Kafka 0.8.
*
* To avoid deleting existing Zookeeper metadata in each Receiver when multiple consumers are
* launched, this should be called be createStream().
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
*/
def resetOffset(zkQuorum: String, groupId: String) {
ZkUtils.maybeDeletePath(zkQuorum, s"/consumers/$groupId")
}
}

0 comments on commit d6ae94d

Please sign in to comment.