Skip to content

Commit

Permalink
Handle port collisions in flume polling test
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Aug 6, 2014
1 parent 63bdb1f commit af3ddc9
Showing 1 changed file with 30 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,37 @@ class FlumePollingStreamSuite extends TestSuiteBase {
val eventsPerBatch = 100
val totalEventsPerChannel = batchCount * eventsPerBatch
val channelCapacity = 5000
val maxAttempts = 5

test("flume polling test") {
testMultipleTimes(testFlumePolling)
}

test("flume polling test multiple hosts") {
testMultipleTimes(testFlumePollingMultipleHost)
}

/**
* Run the given test until no more java.net.BindException's are thrown.
* Do this only up to a certain attempt limit.
*/
private def testMultipleTimes(test: () => Unit): Unit = {
var testPassed = false
var attempt = 0
while (!testPassed && attempt < maxAttempts) {
try {
test()
testPassed = true
} catch {
case e: java.net.BindException =>
logError("Exception when running flume polling test", e)
attempt += 1
}
}
assert(testPassed, s"Test failed after $attempt attempts!")
}

private def testFlumePolling(): Unit = {
val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
Expand Down Expand Up @@ -80,7 +109,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
channel.stop()
}

test("flume polling test multiple hosts") {
private def testFlumePollingMultipleHost(): Unit = {
val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
Expand Down

0 comments on commit af3ddc9

Please sign in to comment.