Skip to content

Commit

Permalink
Cleaned up tests a bit. Added some docs in multiple places.
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed Jul 24, 2014
1 parent 65b76b4 commit 73d6f6d
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
* @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark
* is rolled back.
*/
// Flume forces transactions to be thread-local. So each transaction *must* be committed, or
// rolled back from the thread it was originally created in. So each getEvents call from Spark
// creates a TransactionProcessor which runs in a new thread, in which the transaction is created
// and events are pulled off the channel. Once the events are sent to spark,
// that thread is blocked and the TransactionProcessor is saved in a map,
// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
// When the response comes or a timeout is hit, the TransactionProcessor is retrieved and then
// unblocked, at which point the transaction is committed or rolled back.

private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,23 @@ import org.apache.flume.sink.AbstractSink
* if an ACK is not received from Spark within that time
* threads - Number of threads to use to receive requests from Spark (Default: 10)
*
* This sink is unlike other Flume sinks in the sense that it does not push data,
* instead the process method in this sink simply blocks the SinkRunner the first time it is
* called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol.
*
* Each time a getEventBatch call comes, creates a transaction and reads events
* from the channel. When enough events are read, the events are sent to the Spark receiver and
* the thread itself is blocked and a reference to it saved off.
*
* When the ack for that batch is received,
* the thread which created the transaction is is retrieved and it commits the transaction with the
* channel from the same thread it was originally created in (since Flume transactions are
* thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
* is received within the specified timeout, the transaction is rolled back too. If an ack comes
* after that, it is simply ignored and the events get re-sent.
*
*/
// Flume forces transactions to be thread-local. So each transaction *must* be committed, or
// rolled back from the thread it was originally created in. So each getEvents call from Spark
// creates a TransactionProcessor which runs in a new thread, in which the transaction is created
// and events are pulled off the channel. Once the events are sent to spark,
// that thread is blocked and the TransactionProcessor is saved in a map,
// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
// When the response comes, the TransactionProcessor is retrieved and then unblocked,
// at which point the transaction is committed or rolled back.

private[flume]
class SparkSink extends AbstractSink with Logging with Configurable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
charSeqMap
}

/**
* When the thread is started it sets as many events as the batch size or less (if enough
* events aren't available) into the eventBatch and object and lets any threads waiting on the
* [[getEventBatch]] method to proceed. Then this thread waits for acks or nacks to come in,
* or for a specified timeout and commits or rolls back the transaction.
* @return
*/
override def call(): Void = {
populateEvents()
processAckOrNack()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,28 @@ import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
import org.apache.spark.streaming.flume.sink._

class FlumePollingStreamSuite extends TestSuiteBase {
class FlumePollingStreamSuite extends TestSuiteBase {

val testPort = 9999
val batchCount = 5
val eventsPerBatch = 100
val totalEventsPerChannel = batchCount * eventsPerBatch
val channelCapacity = 5000

test("flume polling test") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)),
StorageLevel.MEMORY_AND_DISK, 100, 1)
StorageLevel.MEMORY_AND_DISK, eventsPerBatch, 1)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()

// Start the channel and sink.
val context = new Context()
context.put("capacity", "5000")
context.put("capacity", channelCapacity.toString)
context.put("transactionCapacity", "1000")
context.put("keep-alive", "0")
val channel = new MemoryChannel()
Expand All @@ -77,15 +81,16 @@ import org.apache.spark.streaming.flume.sink._
val ssc = new StreamingContext(conf, batchDuration)
val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, 100, 5)
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
eventsPerBatch, 5)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()

// Start the channel and sink.
val context = new Context()
context.put("capacity", "5000")
context.put("capacity", channelCapacity.toString)
context.put("transactionCapacity", "1000")
context.put("keep-alive", "0")
val channel = new MemoryChannel()
Expand Down Expand Up @@ -127,7 +132,7 @@ import org.apache.spark.streaming.flume.sink._
executorCompletion.take()
}
val startTime = System.currentTimeMillis()
while (outputBuffer.size < 5 &&
while (outputBuffer.size < batchCount * channels.size &&
System.currentTimeMillis() - startTime < 15000) {
logInfo("output.size = " + outputBuffer.size)
Thread.sleep(100)
Expand All @@ -138,9 +143,9 @@ import org.apache.spark.streaming.flume.sink._
ssc.stop()

val flattenedBuffer = outputBuffer.flatten
assert(flattenedBuffer.size === 25 * channels.size)
assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
var counter = 0
for (k <- 0 until channels.size; i <- 0 until 25) {
for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
String.valueOf(i)).getBytes("utf-8"),
Map[String, String]("test-" + i.toString -> "header"))
Expand All @@ -157,7 +162,7 @@ import org.apache.spark.streaming.flume.sink._
j += 1
}
}
assert(counter === 25 * channels.size)
assert(counter === totalEventsPerChannel * channels.size)
}

def assertChannelIsEmpty(channel: MemoryChannel) = {
Expand All @@ -170,10 +175,10 @@ import org.apache.spark.streaming.flume.sink._
private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] {
override def call(): Void = {
var t = 0
for (i <- 0 until 5) {
for (i <- 0 until batchCount) {
val tx = channel.getTransaction
tx.begin()
for (j <- 0 until 5) {
for (j <- 0 until eventsPerBatch) {
channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes(
"utf-8"),
Map[String, String]("test-" + t.toString -> "header")))
Expand Down

0 comments on commit 73d6f6d

Please sign in to comment.