diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala index e7c9c47c960fa..201e02f4ee12e 100644 --- a/project/MimaBuild.scala +++ b/project/MimaBuild.scala @@ -58,17 +58,18 @@ object MimaBuild { SparkBuild.SPARK_VERSION match { case v if v.startsWith("1.0") => Seq( - excludePackage("org.apache.spark.api.java"), - excludePackage("org.apache.spark.streaming.api.java"), - excludePackage("org.apache.spark.mllib") - ) ++ - excludeSparkClass("rdd.ClassTags") ++ - excludeSparkClass("util.XORShiftRandom") ++ - excludeSparkClass("mllib.recommendation.MFDataGenerator") ++ - excludeSparkClass("mllib.optimization.SquaredGradient") ++ - excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++ - excludeSparkClass("mllib.regression.LassoWithSGD") ++ - excludeSparkClass("mllib.regression.LinearRegressionWithSGD") + excludePackage("org.apache.spark.api.java"), + excludePackage("org.apache.spark.streaming.api.java"), + excludePackage("org.apache.spark.mllib") + ) ++ + excludeSparkClass("rdd.ClassTags") ++ + excludeSparkClass("util.XORShiftRandom") ++ + excludeSparkClass("mllib.recommendation.MFDataGenerator") ++ + excludeSparkClass("mllib.optimization.SquaredGradient") ++ + excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++ + excludeSparkClass("mllib.regression.LassoWithSGD") ++ + excludeSparkClass("mllib.regression.LinearRegressionWithSGD") + excludeSparkClass("streaming.dstream.NetworkReceiver") case _ => Seq() } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index cd5bec117eaca..92d885c4bc5a5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -109,7 +109,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { while(!hasTimedOut && jobScheduler.networkInputTracker.hasMoreReceivedBlockIds) { Thread.sleep(pollTime) } - logInfo("Waited for all received blocsk to be consumed for job generation") + logInfo("Waited for all received blocks to be consumed for job generation") // Stop generating jobs val stopTime = timer.stop(false) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 1700431adface..04e0a6a283cfb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -96,7 +96,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // Stop everything else listenerBus.stop() ssc.env.actorSystem.stop(eventActor) - eventActor == null + eventActor = null logInfo("Stopped JobScheduler") } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index 69c78701949ef..067e804202236 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -17,16 +17,14 @@ package org.apache.spark.streaming.scheduler -import scala.collection.mutable.{HashMap, SynchronizedMap, Queue} +import scala.collection.mutable.{HashMap, Queue, SynchronizedMap} import akka.actor._ - -import org.apache.spark.{SparkException, Logging, SparkEnv} +import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.SparkContext._ import org.apache.spark.storage.BlockId -import org.apache.spark.streaming.{Time, StreamingContext} -import org.apache.spark.streaming.dstream.NetworkReceiver -import org.apache.spark.streaming.dstream.StopReceiver +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver} import org.apache.spark.util.AkkaUtils private[streaming] sealed trait NetworkInputTrackerMessage diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index ba6d6b36f7154..9cc27ef7f03b5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -17,15 +17,16 @@ package org.apache.spark.streaming -import org.scalatest.{FunSuite, BeforeAndAfter} -import org.scalatest.exceptions.TestFailedDueToTimeoutException -import org.scalatest.concurrent.Timeouts -import org.scalatest.time.SpanSugar._ -import org.apache.spark.{Logging, SparkException, SparkConf, SparkContext} -import org.apache.spark.util.{Utils, MetadataCleaner} -import org.apache.spark.streaming.dstream.{NetworkReceiver, DStream} import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.{DStream, NetworkReceiver} +import org.apache.spark.util.{MetadataCleaner, Utils} +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Timeouts +import org.scalatest.exceptions.TestFailedDueToTimeoutException +import org.scalatest.time.SpanSugar._ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts with Logging { @@ -296,4 +297,4 @@ class TestReceiver extends NetworkReceiver[Int] { object TestReceiver { val counter = new AtomicInteger(1) -} \ No newline at end of file +}