Skip to content

Commit

Permalink
Stop query in test.
Browse files Browse the repository at this point in the history
  • Loading branch information
kunalkhamar committed Apr 27, 2017
1 parent 915d67b commit f9342c9
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ class StreamSuite extends StreamTest {
test("batch id is updated correctly in the job description") {
val queryName = "memStream"
@volatile var jobDescription: String = null
def assertContainsNameAndBatch(batch: Integer): Unit = {
def assertDescContainsQueryNameAnd(batch: Integer): Unit = {
// wait for listener event to be processed
spark.sparkContext.listenerBus.waitUntilEmpty(streamingTimeout.toMillis)
assert(jobDescription.contains(queryName) && jobDescription.contains(s"batch = $batch"))
Expand All @@ -557,13 +557,14 @@ class StreamSuite extends StreamTest {

input.addData(1)
query.processAllAvailable()
assertContainsNameAndBatch(batch = 0)
assertDescContainsQueryNameAnd(batch = 0)
input.addData(2, 3)
query.processAllAvailable()
assertContainsNameAndBatch(batch = 1)
assertDescContainsQueryNameAnd(batch = 1)
input.addData(4)
query.processAllAvailable()
assertContainsNameAndBatch(batch = 2)
assertDescContainsQueryNameAnd(batch = 2)
query.stop()
}
}

Expand Down

0 comments on commit f9342c9

Please sign in to comment.