From 6d6776a45f30e3594a15bda2582f99819c28a583 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 8 May 2014 23:16:56 -0700 Subject: [PATCH] SPARK-1729. Make Flume pull data from source, rather than the current push model Currently Spark uses Flume's internal Avro Protocol to ingest data from Flume. If the executor running the receiver fails, it currently has to be restarted on the same node to be able to receive data. This commit adds a new Sink which can be deployed to a Flume agent. This sink can be polled by a new DStream that is also included in this commit. This model ensures that data can be pulled into Spark from Flume even if the receiver is restarted on a new node. This also allows the receiver to receive data on multiple threads for better performance. --- external/flume-sink/pom.xml | 82 ++++ .../flume-sink/src/main/avro/sparkflume.avdl | 40 ++ .../apache/spark/flume/sink/SparkSink.scala | 365 ++++++++++++++++++ external/flume/pom.xml | 4 + .../streaming/flume/EventTransformer.scala | 70 ++++ .../streaming/flume/FlumeInputDStream.scala | 38 +- .../flume/FlumePollingInputDStream.scala | 140 +++++++ .../spark/streaming/flume/FlumeUtils.scala | 52 ++- .../flume/FlumePollingReceiverSuite.scala | 104 +++++ pom.xml | 1 + project/SparkBuild.scala | 26 +- project/plugins.sbt | 4 + 12 files changed, 885 insertions(+), 41 deletions(-) create mode 100644 external/flume-sink/pom.xml create mode 100644 external/flume-sink/src/main/avro/sparkflume.avdl create mode 100644 external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala create mode 100644 external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala create mode 100644 external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala create mode 100644 external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml new file mode 100644 index 0000000000000..40d7e09edaf16 --- /dev/null +++ b/external/flume-sink/pom.xml @@ -0,0 +1,82 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 1.0.0-SNAPSHOT + ../../pom.xml + + + spark-streaming-flume-sink_2.10 + jar + Spark Project External Flume Sink + http://spark.apache.org/ + + + org.apache.flume + flume-ng-sdk + 1.4.0 + + + org.jboss.netty + netty + + + org.apache.thrift + libthrift + + + + + org.apache.flume + flume-ng-core + 1.4.0 + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + org.apache.avro + avro-maven-plugin + 1.7.3 + + String + + ${project.basedir}/target/scala-${scala.binary.version}/src_managed/main/compiled_avro + + + + generate-sources + + idl-protocol + + + + + + + diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl new file mode 100644 index 0000000000000..9dcc709de079a --- /dev/null +++ b/external/flume-sink/src/main/avro/sparkflume.avdl @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +@namespace("org.apache.spark.flume") + +protocol SparkFlumeProtocol { + + record SparkSinkEvent { + map headers; + bytes body; + } + + record EventBatch { + string sequenceNumber; + array eventBatch; + } + + EventBatch getEventBatch (int n); + + void ack (string sequenceNumber); + + void nack (string sequenceNumber); + +} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala new file mode 100644 index 0000000000000..6243463a475b6 --- /dev/null +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.flume.sink + +import org.apache.flume.sink.AbstractSink +import java.util.concurrent.locks.ReentrantLock +import org.apache.flume.Sink.Status +import org.apache.spark.flume.{SparkSinkEvent, EventBatch, SparkFlumeProtocol} +import scala.util.control.Breaks +import java.nio.ByteBuffer +import org.apache.flume.{FlumeException, Context} +import org.slf4j.LoggerFactory +import java.util.concurrent.atomic.AtomicLong +import org.apache.commons.lang.RandomStringUtils +import java.util.concurrent._ +import java.util +import org.apache.flume.conf.{ConfigurationException, Configurable} +import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.avro.ipc.{NettyTransceiver, NettyServer} +import org.apache.avro.ipc.specific.SpecificResponder +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory +import java.net.InetSocketAddress + +class SparkSink() extends AbstractSink with Configurable { + private val LOG = LoggerFactory.getLogger(this.getClass) + private val lock = new ReentrantLock() + private val blockingCondition = lock.newCondition() + + // This sink will not persist sequence numbers and reuses them if it gets restarted. + // So it is possible to commit a transaction which may have been meant for the sink before the + // restart. + // Since the new txn may not have the same sequence number we must guard against accidentally + // committing + // a new transaction. To reduce the probability of that happening a random string is prepended + // to the sequence number. + // Does not change for life of sink + private val seqBase = RandomStringUtils.randomAlphanumeric(8) + // Incremented for each transaction + private val seqNum = new AtomicLong(0) + + private var transactionExecutorOpt: Option[ExecutorService] = None + + private var numProcessors: Integer = SparkSinkConfig.DEFAULT_PROCESSOR_COUNT + private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT + + private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]() + + private var processorFactory: Option[SparkHandlerFactory] = None + private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME + private var port: Int = 0 + private var maxThreads: Int = SparkSinkConfig.DEFAULT_MAX_THREADS + private var serverOpt: Option[NettyServer] = None + private var running = false + + override def start() { + transactionExecutorOpt = Option(Executors.newFixedThreadPool(numProcessors, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Spark Sink, " + getName + " Processor Thread - %d").build())) + + processorFactory = Option(new SparkHandlerFactory(numProcessors)) + + val responder = new SpecificResponder(classOf[SparkFlumeProtocol], new AvroCallbackHandler()) + + serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port), + new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat( + "Spark Sink " + classOf[NettyTransceiver].getSimpleName + " Boss-%d").build), + Executors.newFixedThreadPool(maxThreads, new ThreadFactoryBuilder().setNameFormat( + "Spark Sink " + classOf[NettyTransceiver].getSimpleName + " I/O Worker-%d").build)))) + + serverOpt.map(server => server.start()) + lock.lock() + try { + running = true + } finally { + lock.unlock() + } + super.start() + } + + override def stop() { + lock.lock() + try { + running = false + transactionExecutorOpt.map(executor => executor.shutdownNow()) + blockingCondition.signalAll() + } finally { + lock.unlock() + } + } + + override def configure(ctx: Context) { + import SparkSinkConfig._ + hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME) + val portOpt = Option(ctx.getInteger(CONF_PORT)) + if(portOpt.isDefined) { + port = portOpt.get + } else { + throw new ConfigurationException("The Port to bind must be specified") + } + numProcessors = ctx.getInteger(PROCESSOR_COUNT, DEFAULT_PROCESSOR_COUNT) + transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT) + maxThreads = ctx.getInteger(CONF_MAX_THREADS, DEFAULT_MAX_THREADS) + } + + override def process(): Status = { + // This method is called in a loop by the Flume framework - block it until the sink is + // stopped to save CPU resources + lock.lock() + try { + while(running) { + blockingCondition.await() + } + } finally { + lock.unlock() + } + Status.BACKOFF + } + + private class AvroCallbackHandler() extends SparkFlumeProtocol { + + override def getEventBatch(n: Int): EventBatch = { + val processor = processorFactory.get.checkOut(n) + transactionExecutorOpt.map(executor => executor.submit(processor)) + // Wait until a batch is available - can be null if some error was thrown + val eventBatch = Option(processor.eventQueue.take()) + if (eventBatch.isDefined) { + val eventsToBeSent = eventBatch.get + processorMap.put(eventsToBeSent.getSequenceNumber, processor) + if (LOG.isDebugEnabled) { + LOG.debug("Sent " + eventsToBeSent.getEventBatch.size() + + " events with sequence number: " + eventsToBeSent.getSequenceNumber) + } + eventsToBeSent + } else { + throw new FlumeException("Error while trying to retrieve events from the channel.") + } + } + + override def ack(sequenceNumber: CharSequence): Void = { + completeTransaction(sequenceNumber, success = true) + null + } + + override def nack(sequenceNumber: CharSequence): Void = { + completeTransaction(sequenceNumber, success = false) + LOG.info("Spark failed to commit transaction. Will reattempt events.") + null + } + + def completeTransaction(sequenceNumber: CharSequence, success: Boolean) { + val processorOpt = Option(processorMap.remove(sequenceNumber)) + if (processorOpt.isDefined) { + val processor = processorOpt.get + processor.resultQueueUpdateLock.lock() + try { + // Is the sequence number the same as the one the processor is processing? If not, + // don't update { + if (processor.eventBatch.getSequenceNumber.equals(sequenceNumber)) { + processor.resultQueue.put(success) + } + } finally { + processor.resultQueueUpdateLock.unlock() + } + } + } + } + + // Flume forces transactions to be thread-local (horrible, I know!) + // So the sink basically spawns a new thread to pull the events out within a transaction. + // The thread fills in the event batch object that is set before the thread is scheduled. + // After filling it in, the thread waits on a condition - which is released only + // when the success message comes back for the specific sequence number for that event batch. + /** + * This class represents a transaction on the Flume channel. This class runs a separate thread + * which owns the transaction. It is blocked until the success call for that transaction comes + * back. + * @param maxBatchSize + */ + private class TransactionProcessor(var maxBatchSize: Int) extends Callable[Void] { + // Must be set to a new event batch before scheduling this!! + val eventBatch = new EventBatch("", new util.LinkedList[SparkSinkEvent]) + val eventQueue = new SynchronousQueue[EventBatch]() + val resultQueue = new SynchronousQueue[Boolean]() + val resultQueueUpdateLock = new ReentrantLock() + + object Zero { + val zero = "0" // Oh, I miss static finals + } + + + override def call(): Void = { + val tx = getChannel.getTransaction + tx.begin() + try { + eventBatch.setSequenceNumber(seqBase + seqNum.incrementAndGet()) + val events = eventBatch.getEventBatch + events.clear() + val loop = new Breaks + loop.breakable { + for (i <- 0 until maxBatchSize) { + val eventOpt = Option(getChannel.take()) + + eventOpt.map(event => { + events.add(new SparkSinkEvent(toCharSequenceMap(event + .getHeaders), + ByteBuffer.wrap(event.getBody))) + }) + if (eventOpt.isEmpty) { + loop.break() + } + } + } + // Make the data available to the sender thread + eventQueue.put(eventBatch) + + // Wait till timeout for the ack/nack + val maybeResult = Option(resultQueue.poll(transactionTimeout, TimeUnit.SECONDS)) + // There is a race condition here. + // 1. This times out. + // 2. The result is empty, so timeout exception is thrown. + // 3. The ack comes in before the finally block is entered + // 4. The thread with the ack has a handle to this processor, + // and another thread has the same processor checked out + // (since the finally block was executed and the processor checked back in) + // 5. The thread with the ack now updates the result queue, + // so the processor thinks it is the ack for the current batch. + // To avoid this - update the sequence number to "0" (with or without a result - does not + // matter). + // In the ack method, check if the seq number is the same as the processor's - + // if they are then update the result queue. Now if the + // processor updates the seq number first - the ack/nack never updates the result. If the + // ack/nack updates the + // result after the timeout but before the seq number is updated to "0" it does not + // matter - the processor would + // still timeout and the result is cleared before reusing the processor. + // Unfortunately, this needs to be done from within a lock + // to make sure that the new sequence number is actually visible to the ack thread + // (happens-before) + resultQueueUpdateLock.lock() + try { + eventBatch.setSequenceNumber(Zero.zero) + } finally { + resultQueueUpdateLock.unlock() + } + eventBatch.getEventBatch.clear() + // If the batch failed on spark side, throw a FlumeException + maybeResult.map(success => + if (!success) { + throw new + FlumeException("Spark could not accept events. The transaction will be retried.") + } + ) + // If the operation timed out, throw a TimeoutException + if (maybeResult.isEmpty) { + throw new TimeoutException("Spark did not respond within the timeout period of " + + transactionTimeout + "seconds. Transaction will be retried") + } + null + } catch { + case e: Throwable => + try { + LOG.warn("Error while attempting to remove events from the channel.", e) + tx.rollback() + } catch { + case e1: Throwable => LOG.error( + "Rollback failed while attempting to rollback due to commit failure.", e1) + } + null // No point rethrowing the exception + } finally { + // Must *always* release the caller thread + eventQueue.put(null) + // In the case of success coming after the timeout, but before resetting the seq number + // remove the event from the map and then clear the value + resultQueue.clear() + processorMap.remove(eventBatch.getSequenceNumber) + processorFactory.get.checkIn(this) + tx.close() + } + } + + def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence, + CharSequence] = { + val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size()) + charSeqMap.putAll(inMap) + charSeqMap + } + } + + private class SparkHandlerFactory(val maxInstances: Int) { + val queue = new scala.collection.mutable.Queue[TransactionProcessor] + val queueModificationLock = new ReentrantLock() + var currentSize = 0 + val waitForCheckIn = queueModificationLock.newCondition() + + def checkOut(n: Int): TransactionProcessor = { + def getProcessor = { + val processor = queue.dequeue() + processor.maxBatchSize = n + processor + } + queueModificationLock.lock() + try { + if (queue.size > 0) { + getProcessor + } + else { + if (currentSize < maxInstances) { + currentSize += 1 + new TransactionProcessor(n) + } else { + // No events in queue and cannot initialize more! + // Since currentSize never reduces, queue size increasing is the only hope + while (queue.size == 0 && currentSize >= maxInstances) { + waitForCheckIn.await() + } + getProcessor + } + } + } finally { + queueModificationLock.unlock() + } + } + + def checkIn(processor: TransactionProcessor) { + queueModificationLock.lock() + try { + queue.enqueue(processor) + waitForCheckIn.signal() + } finally { + queueModificationLock.unlock() + } + } + } +} + +object SparkSinkConfig { + val PROCESSOR_COUNT = "processorCount" + val DEFAULT_PROCESSOR_COUNT = 10 + + val CONF_TRANSACTION_TIMEOUT = "timeout" + val DEFAULT_TRANSACTION_TIMEOUT = 60 + + val CONF_HOSTNAME = "hostname" + val DEFAULT_HOSTNAME = "0.0.0.0" + + val CONF_PORT = "port" + + val CONF_MAX_THREADS = "maxThreads" + val DEFAULT_MAX_THREADS = 5 +} diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 6aec215687fe0..93d8ec02ac69e 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -74,6 +74,10 @@ junit-interface test + + org.apache.spark + spark-streaming-flume-sink_2.10 + target/scala-${scala.binary.version}/classes diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala new file mode 100644 index 0000000000000..91f6171d57368 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import java.io.{ObjectOutput, ObjectInput} +import org.apache.spark.util.Utils +import scala.collection.JavaConversions._ +import org.apache.spark.Logging + +/** + * A simple object that provides the implementation of readExternal and writeExternal for both + * the wrapper classes for Flume-style Events. + */ +object EventTransformer extends Logging { + def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence], + Array[Byte]) = { + val bodyLength = in.readInt() + val bodyBuff = new Array[Byte](bodyLength) + in.read(bodyBuff) + + val numHeaders = in.readInt() + val headers = new java.util.HashMap[CharSequence, CharSequence] + + for (i <- 0 until numHeaders) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.read(keyBuff) + val key: String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.read(valBuff) + val value: String = Utils.deserialize(valBuff) + + headers.put(key, value) + } + (headers, bodyBuff) + } + + def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence], + body: Array[Byte]) { + out.writeInt(body.length) + out.write(body) + val numHeaders = headers.size() + out.writeInt(numHeaders) + for ((k,v) <- headers) { + val keyBuff = Utils.serialize(k.toString) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v.toString) + out.writeInt(valBuff.length) + out.write(valBuff) + } + } +} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index df7605fe579f8..78715226ab402 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -30,7 +30,6 @@ import org.apache.flume.source.avro.Status import org.apache.avro.ipc.specific.SpecificResponder import org.apache.avro.ipc.NettyServer -import org.apache.spark.util.Utils import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ @@ -61,47 +60,14 @@ class SparkFlumeEvent() extends Externalizable { /* De-serialize from bytes. */ def readExternal(in: ObjectInput) { - val bodyLength = in.readInt() - val bodyBuff = new Array[Byte](bodyLength) - in.read(bodyBuff) - - val numHeaders = in.readInt() - val headers = new java.util.HashMap[CharSequence, CharSequence] - - for (i <- 0 until numHeaders) { - val keyLength = in.readInt() - val keyBuff = new Array[Byte](keyLength) - in.read(keyBuff) - val key : String = Utils.deserialize(keyBuff) - - val valLength = in.readInt() - val valBuff = new Array[Byte](valLength) - in.read(valBuff) - val value : String = Utils.deserialize(valBuff) - - headers.put(key, value) - } - + val (headers, bodyBuff) = EventTransformer.readExternal(in) event.setBody(ByteBuffer.wrap(bodyBuff)) event.setHeaders(headers) } /* Serialize to bytes. */ def writeExternal(out: ObjectOutput) { - val body = event.getBody.array() - out.writeInt(body.length) - out.write(body) - - val numHeaders = event.getHeaders.size() - out.writeInt(numHeaders) - for ((k, v) <- event.getHeaders) { - val keyBuff = Utils.serialize(k.toString) - out.writeInt(keyBuff.length) - out.write(keyBuff) - val valBuff = Utils.serialize(v.toString) - out.writeInt(valBuff.length) - out.write(valBuff) - } + EventTransformer.writeExternal(out, event.getHeaders, event.getBody.array()) } } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala new file mode 100644 index 0000000000000..71b0f72f85f53 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.flume + +import scala.reflect.ClassTag +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.Logging +import java.net.InetSocketAddress +import java.util.concurrent.{TimeUnit, Executors} +import org.apache.avro.ipc.NettyTransceiver +import org.apache.avro.ipc.specific.SpecificRequestor +import org.apache.spark.flume.{SparkSinkEvent, SparkFlumeProtocol} +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory +import com.google.common.util.concurrent.ThreadFactoryBuilder +import java.io.{ObjectOutput, ObjectInput, Externalizable} +import java.nio.ByteBuffer +import scala.collection.JavaConversions._ + +class FlumePollingInputDStream[T: ClassTag]( + @transient ssc_ : StreamingContext, + val host: String, + val port: Int, + val maxBatchSize: Int, + val parallelism: Int, + storageLevel: StorageLevel +) extends ReceiverInputDStream[SparkPollingEvent](ssc_) { + /** + * Gets the receiver object that will be sent to the worker nodes + * to receive data. This method needs to defined by any specific implementation + * of a NetworkInputDStream. + */ + override def getReceiver(): Receiver[SparkPollingEvent] = { + new FlumePollingReceiver(host, port, maxBatchSize, parallelism, storageLevel) + } +} + +private[streaming] class FlumePollingReceiver( + host: String, + port: Int, + maxBatchSize: Int, + parallelism: Int, + storageLevel: StorageLevel +) extends Receiver[SparkPollingEvent](storageLevel) with Logging { + + lazy val channelFactory = + new NioClientSocketChannelFactory(Executors.newSingleThreadExecutor(), + Executors.newSingleThreadExecutor()) + lazy val transceiver = new NettyTransceiver(new InetSocketAddress(host, port), channelFactory) + lazy val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver) + lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build()) + + override def onStart(): Unit = { + val dataReceiver = new Runnable { + override def run(): Unit = { + while (true) { + val batch = client.getEventBatch(maxBatchSize) + val seq = batch.getSequenceNumber + val events: java.util.List[SparkSinkEvent] = batch.getEventBatch + logDebug("Received batch of " + events.size() + " events with sequence number: " + seq) + try { + events.foreach(event => store(SparkPollingEvent.fromSparkSinkEvent(event))) + client.ack(seq) + } catch { + case e: Throwable => + client.nack(seq) + TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds. + logWarning("Error while attempting to store events", e) + } + } + } + } + for (i <- 0 until parallelism) { + logInfo("Starting Flume Polling Receiver worker threads starting..") + receiverExecutor.submit(dataReceiver) + } + } + + override def store(dataItem: SparkPollingEvent) { + // Not entirely sure store is thread-safe for all storage levels - so wrap it in synchronized + // This takes a performance hit, since the parallelism is useful only for pulling data now. + this.synchronized { + super.store(dataItem) + } + } + + override def onStop(): Unit = { + logInfo("Shutting down Flume Polling Receiver") + receiverExecutor.shutdownNow() + transceiver.close() + channelFactory.releaseExternalResources() + } +} + +private[streaming] object SparkPollingEvent { + def fromSparkSinkEvent(in: SparkSinkEvent): SparkPollingEvent = { + val event = new SparkPollingEvent() + event.event = in + event + } +} +/* + * Unfortunately Avro does not allow including pre-compiled classes - so even though + * SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper + * around that to make it externalizable. + */ +class SparkPollingEvent() extends Externalizable with Logging { + var event : SparkSinkEvent = new SparkSinkEvent() + + /* De-serialize from bytes. */ + def readExternal(in: ObjectInput) { + val (headers, bodyBuff) = EventTransformer.readExternal(in) + event.setBody(ByteBuffer.wrap(bodyBuff)) + event.setHeaders(headers) + } + + /* Serialize to bytes. */ + def writeExternal(out: ObjectOutput) { + EventTransformer.writeExternal(out, event.getHeaders, event.getBody.array()) + } +} + + diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 499f3560ef768..f7d9bd3c6e2ab 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream} -import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream object FlumeUtils { /** @@ -68,4 +68,52 @@ object FlumeUtils { ): JavaReceiverInputDStream[SparkFlumeEvent] = { createStream(jssc.ssc, hostname, port, storageLevel) } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * @param host The host on which the Flume agent is running + * @param port The port the Spark Sink is accepting connections on + * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a + * single RPC call + * @param parallelism Number of concurrent requests this stream should send to the sink. Note + * that having a higher number of requests concurrently being pulled will + * result in this stream using more threads + * @param storageLevel Storage level to use for storing the received objects + */ + def createPollingStream ( + ssc: StreamingContext, + host: String, + port: Int, + maxBatchSize: Int = 100, + parallelism: Int = 5, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): ReceiverInputDStream[SparkPollingEvent] = { + new FlumePollingInputDStream[SparkPollingEvent](ssc, host, port, maxBatchSize, + parallelism, storageLevel) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * @param host The host on which the Flume agent is running + * @param port The port the Spark Sink is accepting connections on + * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a + * single RPC call + * @param parallelism Number of concurrent requests this stream should send to the sink. Note + * that having a higher number of requests concurrently being pulled will + * result in this stream using more threads + * @param storageLevel Storage level to use for storing the received objects + */ + def createJavaPollingStream ( + ssc: StreamingContext, + host: String, + port: Int, + maxBatchSize: Int = 100, + parallelism: Int = 5, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): JavaReceiverInputDStream[SparkPollingEvent] = { + new FlumePollingInputDStream[SparkPollingEvent](ssc, host, port, maxBatchSize, + parallelism, storageLevel) + } } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala new file mode 100644 index 0000000000000..579f0b1091df3 --- /dev/null +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.streaming.flume + +import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext} +import org.apache.spark.storage.StorageLevel +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import org.apache.spark.streaming.util.ManualClock +import java.nio.charset.Charset +import org.apache.flume.channel.MemoryChannel +import org.apache.flume.Context +import org.apache.flume.conf.Configurables +import org.apache.spark.flume.sink.{SparkSinkConfig, SparkSink} +import scala.collection.JavaConversions._ +import org.apache.flume.event.EventBuilder +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +class FlumePollingReceiverSuite extends TestSuiteBase { + + val testPort = 9999 + + test("flume polling test") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(conf, batchDuration) + val flumeStream: ReceiverInputDStream[SparkPollingEvent] = + FlumeUtils.createPollingStream(ssc, "localhost", testPort, 100, 5, + StorageLevel.MEMORY_AND_DISK) + val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]] + with SynchronizedBuffer[Seq[SparkPollingEvent]] + val outputStream = new TestOutputStream(flumeStream, outputBuffer) + outputStream.register() + + // Start the channel and sink. + val context = new Context() + context.put("capacity", "5000") + context.put("transactionCapacity", "1000") + context.put("keep-alive", "0") + val channel = new MemoryChannel() + Configurables.configure(channel, context) + + val sink = new SparkSink() + context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost") + context.put(SparkSinkConfig.CONF_PORT, String.valueOf(testPort)) + Configurables.configure(sink, context) + sink.setChannel(channel) + sink.start() + ssc.start() + + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq(1, 2, 3, 4, 5) + for (i <- 0 until 5) { + val tx = channel.getTransaction + tx.begin() + for (j <- 0 until input.size) { + channel.put(EventBuilder.withBody( + (String.valueOf(i) + input(j)).getBytes("utf-8"), + Map[String, String]("test-" + input(j).toString -> "header"))) + } + tx.commit() + tx.close() + Thread.sleep(500) // Allow some time for the events to reach + clock.addToTime(batchDuration.milliseconds) + } + val startTime = System.currentTimeMillis() + while (outputBuffer.size < 5 && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + logInfo("output.size = " + outputBuffer.size) + Thread.sleep(100) + } + val timeTaken = System.currentTimeMillis() - startTime + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + logInfo("Stopping context") + ssc.stop() + + val decoder = Charset.forName("UTF-8").newDecoder() + + assert(outputBuffer.size === 5) + var counter = 0 + for (i <- 0 until outputBuffer.size; + j <- 0 until outputBuffer(i).size) { + counter += 1 + val eventToVerify = outputBuffer(i)(j).event + val str = decoder.decode(eventToVerify.getBody) + assert(str.toString === (String.valueOf(i) + input(j))) + assert(eventToVerify.getHeaders.get("test-" + input(j).toString) === "header") + } + } + +} diff --git a/pom.xml b/pom.xml index 86264d1132ec4..cf7bf3d2ee1ec 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,7 @@ external/twitter external/kafka external/flume + external/flume-sink external/zeromq external/mqtt examples diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 29dcd8678b476..df21813ff983a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -16,10 +16,15 @@ */ import sbt._ +import sbt.ClasspathDependency import sbt.Classpaths.publishTask +import sbt.ExclusionRule import sbt.Keys._ +import sbt.Task import sbtassembly.Plugin._ import AssemblyKeys._ +import sbtavro.SbtAvro._ +import scala.Some import scala.util.Properties import org.scalastyle.sbt.ScalastylePlugin.{Settings => ScalaStyleSettings} import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact @@ -140,8 +145,11 @@ object SparkBuild extends Build { lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings) .dependsOn(streaming % "compile->compile;test->test") + lazy val externalFlumeSink = Project("external-flume-sink", file("external/flume-sink"), settings = flumeSinkSettings) + lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings) - .dependsOn(streaming % "compile->compile;test->test") + .dependsOn(streaming % "compile->compile;test->test").dependsOn(externalFlumeSink) + lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings) .dependsOn(streaming % "compile->compile;test->test") @@ -149,8 +157,8 @@ object SparkBuild extends Build { lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings) .dependsOn(streaming % "compile->compile;test->test") - lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) - lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) + lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalFlumeSink, externalZeromq, externalMqtt) + lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalFlumeSink, externalZeromq, externalMqtt) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) .dependsOn(core, mllib, graphx, bagel, streaming, hive) dependsOn(allExternal: _*) @@ -622,6 +630,18 @@ object SparkBuild extends Build { ) ) + def flumeSinkSettings() = { + sharedSettings ++ Seq( + name := "spark-streaming-flume-sink", + previousArtifact := sparkPreviousArtifact("spark-streaming-flume-sink"), + libraryDependencies ++= Seq( + "org.apache.flume" % "flume-ng-sdk" % "1.4.0" % "compile" + excludeAll(excludeJBossNetty, excludeThrift), + "org.apache.flume" % "flume-ng-core" % "1.4.0" % "compile" + excludeAll(excludeJBossNetty, excludeThrift) + ) + ) ++ sbtavro.SbtAvro.avroSettings + } def zeromqSettings() = sharedSettings ++ Seq( name := "spark-streaming-zeromq", previousArtifact := sparkPreviousArtifact("spark-streaming-zeromq"), diff --git a/project/plugins.sbt b/project/plugins.sbt index 0cd16fd5bedd4..eadf71707ba19 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,6 +4,8 @@ resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline. resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" +resolvers += "sbt-plugins" at "http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases" + addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2") addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0") @@ -24,3 +26,5 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6") addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.0") addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.0") + +addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2")