Skip to content

Commit

Permalink
Add some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Nov 11, 2014
1 parent 77c3e50 commit 0894aef
Showing 1 changed file with 51 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ object KafkaUtils {
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
*
*/
def createStream(
jssc: JavaStreamingContext,
Expand Down Expand Up @@ -145,6 +144,16 @@ object KafkaUtils {
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}

/**
* Create an reliable input stream that pulls messages from a Kafka Broker.
* @param ssc StreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def createReliableStream(
ssc: StreamingContext,
zkQuorum: String,
Expand All @@ -159,6 +168,15 @@ object KafkaUtils {
ssc, kafkaParams, topics, storageLevel)
}

/**
* Create an reliable input stream that pulls messages from a Kafka Broker.
* @param ssc StreamingContext object
* @param kafkaParams Map of kafka configuration parameters,
* see http://kafka.apache.org/08/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
def createReliableStream[
K: ClassTag,
V: ClassTag,
Expand All @@ -172,7 +190,16 @@ object KafkaUtils {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, true, storageLevel)
}

def createReliableStream(
/**
* Create an reliable Java input stream that pulls messages form a Kafka Broker.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
*/
def createReliableStream(
jssc: JavaStreamingContext,
zkQuorum: String,
groupId: String,
Expand All @@ -181,6 +208,15 @@ object KafkaUtils {
createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}

/**
* Create an reliable Java input stream that pulls messages form a Kafka Broker.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
*/
def createReliableStream(
jssc: JavaStreamingContext,
zkQuorum: String,
Expand All @@ -192,6 +228,19 @@ object KafkaUtils {
storageLevel)
}

/**
* Create an reliable Java input stream that pulls messages form a Kafka Broker.
* @param jssc JavaStreamingContext object
* @param keyTypeClass Key type of RDD
* @param valueTypeClass value type of RDD
* @param keyDecoderClass Type of kafka key decoder
* @param valueDecoderClass Type of kafka value decoder
* @param kafkaParams Map of kafka configuration parameters,
* see http://kafka.apache.org/08/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
* @param storageLevel RDD storage level.
*/
def createReliableStream[K, V, U <: Decoder[_], T <: Decoder[_]](
jssc: JavaStreamingContext,
keyTypeClass: Class[K],
Expand Down

0 comments on commit 0894aef

Please sign in to comment.