diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 719dd0a6a53c2..924a03cd2f0ae 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -19,42 +19,28 @@ package org.apache.spark.streaming.api.python import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} -import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} -import org.apache.spark.broadcast.Broadcast +import scala.reflect.ClassTag + import org.apache.spark._ -import org.apache.spark.util.Utils -import java.io._ -import scala.Some -import org.apache.spark.streaming.Duration -import scala.util.control.Breaks._ -import org.apache.spark.broadcast.Broadcast -import scala.Some -import org.apache.spark.streaming.Duration import org.apache.spark.rdd.RDD -import org.apache.spark.api.python.PythonRDD - - +import org.apache.spark.api.python._ +import org.apache.spark.broadcast.Broadcast import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.api.java._ -import org.apache.spark.rdd.RDD -import org.apache.spark.api.python._ -import org.apache.spark.api.python.PairwiseRDD - -import scala.reflect.ClassTag class PythonDStream[T: ClassTag]( - parent: DStream[T], - command: Array[Byte], - envVars: JMap[String, String], - pythonIncludes: JList[String], - preservePartitoning: Boolean, - pythonExec: String, - broadcastVars: JList[Broadcast[Array[Byte]]], - accumulator: Accumulator[JList[Array[Byte]]] - ) extends DStream[Array[Byte]](parent.ssc) { + parent: DStream[T], + command: Array[Byte], + envVars: JMap[String, String], + pythonIncludes: JList[String], + preservePartitoning: Boolean, + pythonExec: String, + broadcastVars: JList[Broadcast[Array[Byte]]], + accumulator: Accumulator[JList[Array[Byte]]]) + extends DStream[Array[Byte]](parent.ssc) { override def dependencies = List(parent) @@ -146,8 +132,3 @@ DStream[(Long, Array[Byte])](prev.ssc){ } val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this) } - - - - -