From 9570bec0d54537e51623b2b5777895c209dd706a Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 4 Aug 2015 20:25:42 +0800 Subject: [PATCH] Fix the variable name and check null in finally --- .../spark/streaming/mqtt/MQTTStreamSuite.scala | 16 ++++++++-------- .../spark/streaming/mqtt/MQTTTestUtils.scala | 8 +++++--- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 879be775ea9f9..a6a9249db8ed7 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -35,12 +35,12 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter private val topic = "def" private var ssc: StreamingContext = _ - private var MQTTTestUtils: MQTTTestUtils = _ + private var mqttTestUtils: MQTTTestUtils = _ before { ssc = new StreamingContext(master, framework, batchDuration) - MQTTTestUtils = new MQTTTestUtils - MQTTTestUtils.setup() + mqttTestUtils = new MQTTTestUtils + mqttTestUtils.setup() } after { @@ -48,15 +48,15 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter ssc.stop() ssc = null } - if (MQTTTestUtils != null) { - MQTTTestUtils.teardown() - MQTTTestUtils = null + if (mqttTestUtils != null) { + mqttTestUtils.teardown() + mqttTestUtils = null } } test("mqtt input stream") { val sendMessage = "MQTT demo for spark streaming" - val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic, + val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + mqttTestUtils.brokerUri, topic, StorageLevel.MEMORY_ONLY) @volatile var receiveMessage: List[String] = List() @@ -71,7 +71,7 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter // Retry it because we don't know when the receiver will start. eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { - MQTTTestUtils.publishData(topic, sendMessage) + mqttTestUtils.publishData(topic, sendMessage) assert(sendMessage.equals(receiveMessage(0))) } ssc.stop() diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index 47cc9af497778..1a371b7008824 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -100,9 +100,11 @@ private class MQTTTestUtils extends Logging { } } } finally { - client.disconnect() - client.close() - client = null + if (client != null) { + client.disconnect() + client.close() + client = null + } } }