diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 3efe6a0a5ae7b..4b2ea45fb81d0 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -30,9 +30,6 @@ import org.apache.flume.source.avro.AvroFlumeEvent import org.apache.flume.source.avro.Status import org.apache.avro.ipc.specific.SpecificResponder import org.apache.avro.ipc.NettyServer - -import org.apache.spark.util.Utils - import org.apache.spark.Logging import org.apache.spark.util.Utils import org.apache.spark.storage.StorageLevel @@ -42,11 +39,8 @@ import org.apache.spark.streaming.receiver.Receiver import org.jboss.netty.channel.ChannelPipelineFactory import org.jboss.netty.channel.Channels -import org.jboss.netty.channel.ChannelPipeline -import org.jboss.netty.channel.ChannelFactory import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory import org.jboss.netty.handler.codec.compression._ -import org.jboss.netty.handler.execution.ExecutionHandler private[streaming] class FlumeInputDStream[T: ClassTag]( @@ -73,14 +67,47 @@ class SparkFlumeEvent() extends Externalizable { /* De-serialize from bytes. */ def readExternal(in: ObjectInput) { - val (headers, bodyBuff) = EventTransformer.readExternal(in) + val bodyLength = in.readInt() + val bodyBuff = new Array[Byte](bodyLength) + in.readFully(bodyBuff) + + val numHeaders = in.readInt() + val headers = new java.util.HashMap[CharSequence, CharSequence] + + for (i <- 0 until numHeaders) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.readFully(keyBuff) + val key : String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.readFully(valBuff) + val value : String = Utils.deserialize(valBuff) + + headers.put(key, value) + } + event.setBody(ByteBuffer.wrap(bodyBuff)) event.setHeaders(headers) } /* Serialize to bytes. */ def writeExternal(out: ObjectOutput) { - EventTransformer.writeExternal(out, event.getHeaders, event.getBody.array()) + val body = event.getBody.array() + out.writeInt(body.length) + out.write(body) + + val numHeaders = event.getHeaders.size() + out.writeInt(numHeaders) + for ((k, v) <- event.getHeaders) { + val keyBuff = Utils.serialize(k.toString) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v.toString) + out.writeInt(valBuff.length) + out.write(valBuff) + } } } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 01de8781615a7..c93b7fee09f59 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -17,12 +17,11 @@ package org.apache.spark.streaming.flume -import java.io.{ObjectOutput, ObjectInput, Externalizable} import java.net.InetSocketAddress -import java.nio.ByteBuffer import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors} import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -53,9 +52,9 @@ private[streaming] class FlumePollingInputDStream[T: ClassTag]( val maxBatchSize: Int, val parallelism: Int, storageLevel: StorageLevel - ) extends ReceiverInputDStream[SparkFlumePollingEvent](_ssc) { + ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) { - override def getReceiver(): Receiver[SparkFlumePollingEvent] = { + override def getReceiver(): Receiver[SparkFlumeEvent] = { new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel) } } @@ -65,7 +64,7 @@ private[streaming] class FlumePollingReceiver( maxBatchSize: Int, parallelism: Int, storageLevel: StorageLevel - ) extends Receiver[SparkFlumePollingEvent](storageLevel) with Logging { + ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging { lazy val channelFactoryExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true). @@ -104,12 +103,13 @@ private[streaming] class FlumePollingReceiver( "Received batch of " + events.size() + " events with sequence number: " + seq) try { // Convert each Flume event to a serializable SparkPollingEvent + val buffer = new ArrayBuffer[SparkFlumeEvent](events.size()) var j = 0 while (j < events.size()) { - store(SparkFlumePollingEvent.fromSparkSinkEvent(events(j))) - logDebug("Stored events with seq:" + seq) + buffer += sparkSinkEventToSparkFlumeEvent(events(j)) j += 1 } + store(buffer) logDebug("Sending ack for sequence number: " + seq) // Send an ack to Flume so that Flume discards the events from its channels. client.ack(seq) @@ -152,6 +152,18 @@ private[streaming] class FlumePollingReceiver( }) channelFactory.releaseExternalResources() } + + /** + * Utility method to convert [[SparkSinkEvent]] to [[SparkFlumeEvent]] + * @param event - Event to convert to SparkFlumeEvent + * @return - The SparkSinkEvent generated from Spar + */ + private def sparkSinkEventToSparkFlumeEvent(event: SparkSinkEvent): SparkFlumeEvent = { + val sparkFlumeEvent = new SparkFlumeEvent() + sparkFlumeEvent.event.setBody(event.getBody) + sparkFlumeEvent.event.setHeaders(event.getHeaders) + sparkFlumeEvent + } } /** @@ -162,36 +174,5 @@ private[streaming] class FlumePollingReceiver( private class FlumeConnection(val transceiver: NettyTransceiver, val client: SparkFlumeProtocol.Callback) -/** - * Companion object of [[SparkFlumePollingEvent]] - */ -private[streaming] object SparkFlumePollingEvent { - def fromSparkSinkEvent(in: SparkSinkEvent): SparkFlumePollingEvent = { - val event = new SparkFlumePollingEvent() - event.event = in - event - } -} - -/* - * Unfortunately Avro does not allow including pre-compiled classes - so even though - * SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper - * around that to make it externalizable. - */ -class SparkFlumePollingEvent extends Externalizable with Logging { - var event: SparkSinkEvent = new SparkSinkEvent() - - /* De-serialize from bytes. */ - def readExternal(in: ObjectInput) { - val (headers, bodyBuff) = EventTransformer.readExternal(in) - event.setBody(ByteBuffer.wrap(bodyBuff)) - event.setHeaders(headers) - } - - /* Serialize to bytes. */ - def writeExternal(out: ObjectOutput) { - EventTransformer.writeExternal(out, event.getHeaders, event.getBody.array()) - } -} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index c754fe33738b8..4b732c1592ab2 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -117,7 +117,7 @@ object FlumeUtils { * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. * This stream will poll the sink for data and will pull events as they are available. * This stream will use a batch size of 1000 events and run 5 threads to pull data. - * @param host Address of the host on which the Spark Sink is running + * @param hostname Address of the host on which the Spark Sink is running * @param port Port of the host at which the Spark Sink is listening * @param storageLevel Storage level to use for storing the received objects */ @@ -127,7 +127,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[SparkFlumePollingEvent] = { + ): ReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel) } @@ -143,7 +143,7 @@ object FlumeUtils { ssc: StreamingContext, addresses: Seq[InetSocketAddress], storageLevel: StorageLevel - ): ReceiverInputDStream[SparkFlumePollingEvent] = { + ): ReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(ssc, addresses, storageLevel, DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) } @@ -166,8 +166,8 @@ object FlumeUtils { storageLevel: StorageLevel, maxBatchSize: Int, parallelism: Int - ): ReceiverInputDStream[SparkFlumePollingEvent] = { - new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, maxBatchSize, + ): ReceiverInputDStream[SparkFlumeEvent] = { + new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize, parallelism, storageLevel) } @@ -183,7 +183,7 @@ object FlumeUtils { jssc: JavaStreamingContext, hostname: String, port: Int - ): JavaReceiverInputDStream[SparkFlumePollingEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2) } @@ -201,7 +201,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel - ): JavaReceiverInputDStream[SparkFlumePollingEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel) } @@ -217,7 +217,7 @@ object FlumeUtils { jssc: JavaStreamingContext, addresses: Array[InetSocketAddress], storageLevel: StorageLevel - ): JavaReceiverInputDStream[SparkFlumePollingEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(jssc, addresses, storageLevel, DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) } @@ -240,7 +240,7 @@ object FlumeUtils { storageLevel: StorageLevel, maxBatchSize: Int, parallelism: Int - ): JavaReceiverInputDStream[SparkFlumePollingEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism) } } diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java index a30157b94b972..79c5b91654b42 100644 --- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java @@ -32,13 +32,13 @@ public void testFlumeStream() { InetSocketAddress[] addresses = new InetSocketAddress[] { new InetSocketAddress("localhost", 12345) }; - JavaReceiverInputDStream test1 = + JavaReceiverInputDStream test1 = FlumeUtils.createPollingStream(ssc, "localhost", 12345); - JavaReceiverInputDStream test2 = FlumeUtils.createPollingStream( + JavaReceiverInputDStream test2 = FlumeUtils.createPollingStream( ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaReceiverInputDStream test3 = FlumeUtils.createPollingStream( + JavaReceiverInputDStream test3 = FlumeUtils.createPollingStream( ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaReceiverInputDStream test4 = FlumeUtils.createPollingStream( + JavaReceiverInputDStream test4 = FlumeUtils.createPollingStream( ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5); } } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 3ff0cca523928..ec06b841d2321 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -42,10 +42,11 @@ import org.apache.spark.streaming.flume.sink._ test("flume polling test") { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) - val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] = - FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), StorageLevel.MEMORY_AND_DISK, 100, 1) - val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]] - with SynchronizedBuffer[Seq[SparkFlumePollingEvent]] + val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = + FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), + StorageLevel.MEMORY_AND_DISK, 100, 1) + val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] + with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) outputStream.register() @@ -75,10 +76,10 @@ import org.apache.spark.streaming.flume.sink._ // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _)) - val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] = + val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, 100, 5) - val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]] - with SynchronizedBuffer[Seq[SparkFlumePollingEvent]] + val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] + with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) outputStream.register() @@ -115,7 +116,7 @@ import org.apache.spark.streaming.flume.sink._ } def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext, - outputBuffer: ArrayBuffer[Seq[SparkFlumePollingEvent]]) { + outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]]) { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val executor = Executors.newCachedThreadPool() val executorCompletion = new ExecutorCompletionService[Void](executor)