diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala index 6249a3197d07d..7da8eb3e35912 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala @@ -31,6 +31,15 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder * @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark * is rolled back. */ +// Flume forces transactions to be thread-local. So each transaction *must* be committed, or +// rolled back from the thread it was originally created in. So each getEvents call from Spark +// creates a TransactionProcessor which runs in a new thread, in which the transaction is created +// and events are pulled off the channel. Once the events are sent to spark, +// that thread is blocked and the TransactionProcessor is saved in a map, +// until an ACK or NACK comes back or the transaction times out (after the specified timeout). +// When the response comes or a timeout is hit, the TransactionProcessor is retrieved and then +// unblocked, at which point the transaction is committed or rolled back. + private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging { val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads, diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala index d5afde0fae19d..7b735133e3d14 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala @@ -36,15 +36,23 @@ import org.apache.flume.sink.AbstractSink * if an ACK is not received from Spark within that time * threads - Number of threads to use to receive requests from Spark (Default: 10) * + * This sink is unlike other Flume sinks in the sense that it does not push data, + * instead the process method in this sink simply blocks the SinkRunner the first time it is + * called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol. + * + * Each time a getEventBatch call comes, creates a transaction and reads events + * from the channel. When enough events are read, the events are sent to the Spark receiver and + * the thread itself is blocked and a reference to it saved off. + * + * When the ack for that batch is received, + * the thread which created the transaction is is retrieved and it commits the transaction with the + * channel from the same thread it was originally created in (since Flume transactions are + * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack + * is received within the specified timeout, the transaction is rolled back too. If an ack comes + * after that, it is simply ignored and the events get re-sent. + * */ -// Flume forces transactions to be thread-local. So each transaction *must* be committed, or -// rolled back from the thread it was originally created in. So each getEvents call from Spark -// creates a TransactionProcessor which runs in a new thread, in which the transaction is created -// and events are pulled off the channel. Once the events are sent to spark, -// that thread is blocked and the TransactionProcessor is saved in a map, -// until an ACK or NACK comes back or the transaction times out (after the specified timeout). -// When the response comes, the TransactionProcessor is retrieved and then unblocked, -// at which point the transaction is committed or rolled back. + private[flume] class SparkSink extends AbstractSink with Logging with Configurable { diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala index 6f4e50b0f1d63..b9e3c786ebb3b 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala @@ -213,6 +213,13 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String, charSeqMap } + /** + * When the thread is started it sets as many events as the batch size or less (if enough + * events aren't available) into the eventBatch and object and lets any threads waiting on the + * [[getEventBatch]] method to proceed. Then this thread waits for acks or nacks to come in, + * or for a specified timeout and commits or rolls back the transaction. + * @return + */ override def call(): Void = { populateEvents() processAckOrNack() diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 739398c7ad92d..47071d0cc4714 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -35,16 +35,20 @@ import org.apache.spark.streaming.util.ManualClock import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext} import org.apache.spark.streaming.flume.sink._ - class FlumePollingStreamSuite extends TestSuiteBase { +class FlumePollingStreamSuite extends TestSuiteBase { val testPort = 9999 + val batchCount = 5 + val eventsPerBatch = 100 + val totalEventsPerChannel = batchCount * eventsPerBatch + val channelCapacity = 5000 test("flume polling test") { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), - StorageLevel.MEMORY_AND_DISK, 100, 1) + StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1) val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) @@ -52,7 +56,7 @@ import org.apache.spark.streaming.flume.sink._ // Start the channel and sink. val context = new Context() - context.put("capacity", "5000") + context.put("capacity", channelCapacity.toString) context.put("transactionCapacity", "1000") context.put("keep-alive", "0") val channel = new MemoryChannel() @@ -77,7 +81,8 @@ import org.apache.spark.streaming.flume.sink._ val ssc = new StreamingContext(conf, batchDuration) val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _)) val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = - FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, 100, 5) + FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, + eventsPerBatch, 5) val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) @@ -85,7 +90,7 @@ import org.apache.spark.streaming.flume.sink._ // Start the channel and sink. val context = new Context() - context.put("capacity", "5000") + context.put("capacity", channelCapacity.toString) context.put("transactionCapacity", "1000") context.put("keep-alive", "0") val channel = new MemoryChannel() @@ -127,7 +132,7 @@ import org.apache.spark.streaming.flume.sink._ executorCompletion.take() } val startTime = System.currentTimeMillis() - while (outputBuffer.size < 5 && + while (outputBuffer.size < batchCount * channels.size && System.currentTimeMillis() - startTime < 15000) { logInfo("output.size = " + outputBuffer.size) Thread.sleep(100) @@ -138,9 +143,9 @@ import org.apache.spark.streaming.flume.sink._ ssc.stop() val flattenedBuffer = outputBuffer.flatten - assert(flattenedBuffer.size === 25 * channels.size) + assert(flattenedBuffer.size === totalEventsPerChannel * channels.size) var counter = 0 - for (k <- 0 until channels.size; i <- 0 until 25) { + for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) { val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " + String.valueOf(i)).getBytes("utf-8"), Map[String, String]("test-" + i.toString -> "header")) @@ -157,7 +162,7 @@ import org.apache.spark.streaming.flume.sink._ j += 1 } } - assert(counter === 25 * channels.size) + assert(counter === totalEventsPerChannel * channels.size) } def assertChannelIsEmpty(channel: MemoryChannel) = { @@ -170,10 +175,10 @@ import org.apache.spark.streaming.flume.sink._ private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] { override def call(): Void = { var t = 0 - for (i <- 0 until 5) { + for (i <- 0 until batchCount) { val tx = channel.getTransaction tx.begin() - for (j <- 0 until 5) { + for (j <- 0 until eventsPerBatch) { channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes( "utf-8"), Map[String, String]("test-" + t.toString -> "header")))