Skip to content

Commit

Permalink
Kafka streaming producer added. (apache#66)
Browse files Browse the repository at this point in the history
Signed-off-by: Rostyslav Sotnychenko <rsotnychenko@maprtech.com>

(cherry picked from commit de237dc)
  • Loading branch information
mgorbov authored and Mikhail Gorbov committed Jan 2, 2018
1 parent a18469d commit 317ca9e
Show file tree
Hide file tree
Showing 18 changed files with 1,112 additions and 0 deletions.
16 changes: 16 additions & 0 deletions build/dev-build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env bash

PROFILES="-Pyarn -Phadoop-provided -Pscala-2.11 -Phive -Phive-thriftserver -DskipTests"

if [[ -z "$1" ]]; then
./build/mvn ${PROFILES} clean install
else
./build/mvn ./build/mvn ${PROFILES} -pl :$1 clean install
if [ $? -ne 0 ]; then exit 1; fi
./build/mvn ${PROFILES} -pl :spark-assembly_2.10 clean package
fi

if [ $? -ne 0 ]; then exit 1; fi

scp -r assembly/target/scala-2.11/jars mapr@node1:/opt/mapr/spark/spark-2.0.1/jars
if [ $? -ne 0 ]; then exit 1; fi
6 changes: 6 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@
<artifactId>parquet-hadoop-bundle</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-producer_${scala.binary.version}</artifactId>
<scope>provided</scope>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.streaming

import java.util.{ Map => JMap }

import org.apache.kafka.common.serialization.Serializer

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{ConstantInputDStream, DStream}

class ItemJsonSerializer extends Serializer[Item] {
override def configure(configs: JMap[String, _], isKey: Boolean): Unit = { /* NOP */ }

override def serialize(topic: String, data: Item): Array[Byte] = data.toString.getBytes

override def close(): Unit = { /* NOP */ }
}

case class Item(id: Int, value: Int) {
override def toString: String = s"""{"id":"$id","value":"$value"}"""
}

/**
* Produces messages to Kafka.
* Usage: KafkaProducerExample <kafkaBrokers> <topics> <numMessages>
* <kafkaBrokers> is a list of one or more kafka brokers
* <topics> is a list of one or more kafka topics
* <numMessages> is the number of messages that the kafka producer should send
*
* Example:
* `$ bin/run-example \
* org.apache.spark.examples.streaming.KafkaProducerExample broker1,broker2 \
* topic1,topic2 10`
*/

// scalastyle:off println
object KafkaProducerExample extends App {
import org.apache.spark.streaming.kafka.producer._

if (args.length < 3) {
System.err.println(s"""
|Usage: Usage: KafkaProducerExample <kafkaBrokers> <topics> <numMessages>
| <kafkaBrokers> is a list of one or more kafka brokers
| <topics> is a list of one or more kafka topics
| <numMessages> is the number of messages that the kafka producer
| should send
""".stripMargin)
System.exit(1)
}

val Array(kafkaBrokers, topics, numMessages) = args

val batchTime = Seconds(2)

val sparkConf = new SparkConf()
.set("spark.executor.memory", "1g")
.set("spark.driver.memory", "1g")
.setAppName(getClass.getCanonicalName)
val ssc = new StreamingContext(sparkConf, batchTime)

val producerConf = new ProducerConf(
bootstrapServers = kafkaBrokers.split(",").toList)

val items = (0 until numMessages.toInt).map(i => Item(i, i))
val defaultRDD: RDD[Item] = ssc.sparkContext.parallelize(items)
val dStream: DStream[Item] = new ConstantInputDStream[Item](ssc, defaultRDD)

dStream.sendToKafka[ItemJsonSerializer](topics, producerConf)
dStream.count().print()

ssc.start()
ssc.awaitTermination()

ssc.stop(stopSparkContext = true, stopGracefully = true)
}
139 changes: 139 additions & 0 deletions external/kafka-producer/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.1.0-mapr-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-producer_2.11</artifactId>
<properties>
<sbt.project.name>streaming-kafka-producer</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project External Kafka Producer v09</name>
<url>http://spark.apache.org/</url>

<dependencies>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.6</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.9.0.0</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0-mapr-1607</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
<version>3.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
5 changes: 5 additions & 0 deletions external/kafka-producer/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.kafka.producer

import scala.reflect.{classTag, ClassTag}
import scala.util.{Failure, Success, Try}

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.serialization.Serializer

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging

class KafkaRDDWriter[
K: ClassTag,
V: ClassTag,
KS <: Serializer[K] : ClassTag,
VS <: Serializer[V] : ClassTag](
val topic: String,
val producerConf: ProducerConf) extends Serializable with Logging {

private var producer: KafkaProducer[K, V] = null

private val callback: KafkaCallback = new KafkaCallback()

private def initializeProducer(producerConf: ProducerConf): KafkaProducer[K, V] = {
val conf = producerConf
.withKeySerializer(classTag[KS].runtimeClass.getName)
.withValueSerializer(classTag[VS].runtimeClass.getName)
Try(new KafkaProducer[K, V](conf.asJMap())) match {
case Success(_producer) => _producer
case Failure(kafkaException) =>
throw new SparkException("Failed to instantiate Kafka producer due to: ", kafkaException)
}
}

def sendV(taskContext: TaskContext, records: Iterator[V]): Unit = {
sendRecords[V](taskContext, records, toValueRecord)
}

def sendKV(taskContext: TaskContext, records: Iterator[(K, V)]): Unit = {
sendRecords[(K, V)](taskContext, records, kv => toKeyValueRecord(kv._1, kv._2))
}

private def sendRecords[R](
taskContext: TaskContext,
records: Iterator[R],
recordMapper: R => ProducerRecord[K, V]): Unit = {
if (records.isEmpty) {
logDebug(s"No data to send: rdd-partition=${taskContext.partitionId()}")
return
}
try {
producer = initializeProducer(producerConf)

taskContext.addTaskCompletionListener((taskContext: TaskContext) => {
log.debug(s"Task completed: topic=$topic, rdd-partition=${taskContext.partitionId()}." +
s" Closing producer")
Option(producer).foreach(_.close())
})

log.debug(s"Sending data: topic=$topic, rdd-partition=${taskContext.partitionId()}")
records.map(recordMapper).foreach(producer.send(_, callback))
producer.flush()
} catch {
case kex: KafkaException => throw new SparkException(kex.getMessage)
} finally {
Option(producer).foreach(_.close())
}
}

private def toValueRecord(value: V): ProducerRecord[K, V] = {
new ProducerRecord[K, V](topic, value)
}

private def toKeyValueRecord(key: K, value: V): ProducerRecord[K, V] = {
new ProducerRecord[K, V](topic, key, value)
}
}

class KafkaCallback extends Callback with Serializable {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) {
throw new SparkException("Failed to send data due to: ", exception)
}
}
}
Loading

0 comments on commit 317ca9e

Please sign in to comment.