Skip to content

Commit

Permalink
[SPARK-6765] Fix test code style for streaming.
Browse files Browse the repository at this point in the history
So we can turn style checker on for test code.

Author: Reynold Xin <rxin@databricks.com>

Closes apache#5409 from rxin/test-style-streaming and squashes the following commits:

7aea69b [Reynold Xin] [SPARK-6765] Fix test code style for streaming.
  • Loading branch information
rxin committed Apr 8, 2015
1 parent 8d2a36c commit 15e0d2b
Show file tree
Hide file tree
Showing 19 changed files with 115 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.flume

import java.net.InetSocketAddress
Expand Down Expand Up @@ -213,7 +212,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
assert(counter === totalEventsPerChannel * channels.size)
}

def assertChannelIsEmpty(channel: MemoryChannel) = {
def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
queueRemaining.setAccessible(true)
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
}

/** Class to create socket channel with compression */
private class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
private class CompressionChannelFactory(compressionLevel: Int)
extends NioClientSocketChannelFactory {

override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
val encoder = new ZlibEncoder(compressionLevel)
pipeline.addFirst("deflater", encoder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
msgTopic.publish(message)
} catch {
case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
Thread.sleep(50) // wait for Spark streaming to consume something from the message queue
// wait for Spark streaming to consume something from the message queue
Thread.sleep(50)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ class BasicOperationsSuite extends TestSuiteBase {
test("flatMapValues") {
testOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
(s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _).flatMapValues(x => Seq(x, x + 10)),
(s: DStream[String]) => {
s.map(x => (x, 1)).reduceByKey(_ + _).flatMapValues(x => Seq(x, x + 10))
},
Seq( Seq(("a", 2), ("a", 12), ("b", 1), ("b", 11)), Seq(("", 2), ("", 12)), Seq() ),
true
)
Expand Down Expand Up @@ -474,7 +476,7 @@ class BasicOperationsSuite extends TestSuiteBase {
stream.foreachRDD(_ => {}) // Dummy output stream
ssc.start()
Thread.sleep(2000)
def getInputFromSlice(fromMillis: Long, toMillis: Long) = {
def getInputFromSlice(fromMillis: Long, toMillis: Long): Set[Int] = {
stream.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class CheckpointSuite extends TestSuiteBase {

var ssc: StreamingContext = null

override def batchDuration = Milliseconds(500)
override def batchDuration: Duration = Milliseconds(500)

override def beforeFunction() {
super.beforeFunction()
Expand Down Expand Up @@ -72,7 +72,7 @@ class CheckpointSuite extends TestSuiteBase {
val input = (1 to 10).map(_ => Seq("a")).toSeq
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some((values.sum + state.getOrElse(0)))
Some(values.sum + state.getOrElse(0))
}
st.map(x => (x, 1))
.updateStateByKey(updateFunc)
Expand Down Expand Up @@ -199,7 +199,12 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq() ),
(s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
Seq( Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq() ),
Seq(
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq(),
Seq(("a", 2), ("b", 1)),
Seq(("", 2)), Seq() ),
3
)
}
Expand All @@ -212,7 +217,8 @@ class CheckpointSuite extends TestSuiteBase {
val n = 10
val w = 4
val input = (1 to n).map(_ => Seq("a")).toSeq
val output = Seq(Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x => Seq(("a", 4)))
val output = Seq(
Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x => Seq(("a", 4)))
val operation = (st: DStream[String]) => {
st.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
Expand All @@ -236,7 +242,13 @@ class CheckpointSuite extends TestSuiteBase {
classOf[TextOutputFormat[Text, IntWritable]])
output
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
Seq(
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq(),
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq()),
3
)
} finally {
Expand All @@ -259,7 +271,13 @@ class CheckpointSuite extends TestSuiteBase {
classOf[NewTextOutputFormat[Text, IntWritable]])
output
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
Seq(
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq(),
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq()),
3
)
} finally {
Expand Down Expand Up @@ -298,7 +316,13 @@ class CheckpointSuite extends TestSuiteBase {
output
}
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
Seq(
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq(),
Seq(("a", 2), ("b", 1)),
Seq(("", 2)),
Seq()),
3
)
} finally {
Expand Down Expand Up @@ -533,7 +557,8 @@ class CheckpointSuite extends TestSuiteBase {
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also waits for the expected amount of time for each batch.
*/
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] =
{
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.getTimeMillis())
for (i <- 1 to numBatches.toInt) {
Expand All @@ -543,7 +568,7 @@ class CheckpointSuite extends TestSuiteBase {
logInfo("Manual clock after advancing = " + clock.getTimeMillis())
Thread.sleep(batchDuration.milliseconds)

val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
}.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
outputStream.output.map(_.flatten)
Expand All @@ -552,4 +577,4 @@ class CheckpointSuite extends TestSuiteBase {

private object CheckpointSuite extends Serializable {
var batchThreeShouldBlockIndefinitely: Boolean = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class FailureSuite extends TestSuiteBase with Logging {
val directory = Utils.createTempDir()
val numBatches = 30

override def batchDuration = Milliseconds(1000)
override def batchDuration: Duration = Milliseconds(1000)

override def useManualClock = false
override def useManualClock: Boolean = false

override def afterFunction() {
Utils.deleteRecursively(directory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
"localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
def output: ArrayBuffer[String] = outputBuffer.flatMap(x => x)
outputStream.register()
ssc.start()

Expand Down Expand Up @@ -164,7 +164,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
val outputStream = new TestOutputStream(countStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
def output: ArrayBuffer[Long] = outputBuffer.flatMap(x => x)
outputStream.register()
ssc.start()

Expand Down Expand Up @@ -196,15 +196,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val queueStream = ssc.queueStream(queue, oneAtATime = true)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(queueStream, outputBuffer)
def output = outputBuffer.filter(_.size > 0)
def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
outputStream.register()
ssc.start()

// Setup data queued into the stream
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = input.map(Seq(_))
//Thread.sleep(1000)

val inputIterator = input.toIterator
for (i <- 0 until input.size) {
// Enqueue more than 1 item per tick but they should dequeue one at a time
Expand Down Expand Up @@ -239,7 +239,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val queueStream = ssc.queueStream(queue, oneAtATime = false)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(queueStream, outputBuffer)
def output = outputBuffer.filter(_.size > 0)
def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
outputStream.register()
ssc.start()

Expand Down Expand Up @@ -352,7 +352,8 @@ class TestServer(portToBind: Int = 0) extends Logging {
logInfo("New connection")
try {
clientSocket.setTcpNoDelay(true)
val outputStream = new BufferedWriter(new OutputStreamWriter(clientSocket.getOutputStream))
val outputStream = new BufferedWriter(
new OutputStreamWriter(clientSocket.getOutputStream))

while(clientSocket.isConnected) {
val msg = queue.poll(100, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -384,7 +385,7 @@ class TestServer(portToBind: Int = 0) extends Logging {

def stop() { servingThread.interrupt() }

def port = serverSocket.getLocalPort
def port: Int = serverSocket.getLocalPort
}

/** This is a receiver to test multiple threads inserting data using block generator */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
testBlockStoring(handler) { case (data, blockIds, storeResults) =>
// Verify the data in block manager is correct
val storedData = blockIds.flatMap { blockId =>
blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
}.toList
storedData shouldEqual data

Expand All @@ -120,7 +120,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
testBlockStoring(handler) { case (data, blockIds, storeResults) =>
// Verify the data in block manager is correct
val storedData = blockIds.flatMap { blockId =>
blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
}.toList
storedData shouldEqual data

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ class ReceivedBlockTrackerSuite
* Get all the data written in the given write ahead log files. By default, it will read all
* files in the test log directory.
*/
def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles): Seq[ReceivedBlockTrackerLogEvent] = {
def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles)
: Seq[ReceivedBlockTrackerLogEvent] = {
logFiles.flatMap {
file => new WriteAheadLogReader(file, hadoopConf).toSeq
}.map { byteBuffer =>
Expand All @@ -244,7 +245,8 @@ class ReceivedBlockTrackerSuite
}

/** Create batch allocation object from the given info */
def createBatchAllocation(time: Long, blockInfos: Seq[ReceivedBlockInfo]): BatchAllocationEvent = {
def createBatchAllocation(time: Long, blockInfos: Seq[ReceivedBlockInfo])
: BatchAllocationEvent = {
BatchAllocationEvent(time, AllocatedBlocks(Map((streamId -> blockInfos))))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
val errors = new ArrayBuffer[Throwable]

/** Check if all data structures are clean */
def isAllEmpty = {
def isAllEmpty: Boolean = {
singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
arrayBuffers.isEmpty && errors.isEmpty
}
Expand All @@ -320,24 +320,21 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
optionalBlockId: Option[StreamBlockId]) {
byteBuffers += bytes
}

def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
optionalBlockId: Option[StreamBlockId]) {
iterators += iterator
}

def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
optionalBlockId: Option[StreamBlockId]) {
arrayBuffers += arrayBuffer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0
SlowTestReceiver.receivedAllRecords = false
//Create test receiver that sleeps in onStop()
// Create test receiver that sleeps in onStop()
val totalNumRecords = 15
val recordsPerSecond = 1
val input = ssc.receiverStream(new SlowTestReceiver(totalNumRecords, recordsPerSecond))
Expand Down Expand Up @@ -370,7 +370,8 @@ object TestReceiver {
}

/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */
class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {

var receivingThreadOption: Option[Thread] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {

// To make sure that the processing start and end times in collected
// information are different for successive batches
override def batchDuration = Milliseconds(100)
override def actuallyWait = true
override def batchDuration: Duration = Milliseconds(100)
override def actuallyWait: Boolean = true

test("batch info reporting") {
val ssc = setupStreams(input, operation)
Expand Down
Loading

0 comments on commit 15e0d2b

Please sign in to comment.