diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index f12995a357275..e942053c6ede5 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -53,7 +53,9 @@ private[streaming] class FlumePollingInputDStream[T: ClassTag]( storageLevel: StorageLevel ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) { - protected[streaming] override val customScopeName: Option[String] = Some(s"flume polling stream [$id]") + protected[streaming] override val customScopeName: Option[String] = { + Some(s"flume polling stream [$id]") + } override def getReceiver(): Receiver[SparkFlumeEvent] = { new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index b386a237e2e4c..bc9093f0b8a44 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -65,7 +65,9 @@ class DirectKafkaInputDStream[ val maxRetries = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRetries", 1) - protected[streaming] override val customScopeName: Option[String] = Some(s"kafka direct stream [$id]") + protected[streaming] override val customScopeName: Option[String] = { + Some(s"kafka direct stream [$id]") + } protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData