Skip to content

Commit

Permalink
Don't check serializability of DStream transforms.
Browse files Browse the repository at this point in the history
Since the DStream is reachable from within these closures, they aren't
checkable by the straightforward technique of passing them to the
closure serializer.
  • Loading branch information
willb committed Mar 20, 2014
1 parent 4ecf841 commit d6e8dd6
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -533,15 +533,15 @@ abstract class DStream[T: ClassTag] (
* on each RDD of 'this' DStream.
*/
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false))
}

/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
val cleanedF = context.sparkContext.clean(transformFunc)
val cleanedF = context.sparkContext.clean(transformFunc, false)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 1)
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
Expand All @@ -556,7 +556,7 @@ abstract class DStream[T: ClassTag] (
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V] = {
val cleanedF = ssc.sparkContext.clean(transformFunc)
val cleanedF = ssc.sparkContext.clean(transformFunc, false)
transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
}

Expand All @@ -567,7 +567,7 @@ abstract class DStream[T: ClassTag] (
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V] = {
val cleanedF = ssc.sparkContext.clean(transformFunc)
val cleanedF = ssc.sparkContext.clean(transformFunc, false)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 2)
val rdd1 = rdds(0).asInstanceOf[RDD[T]]
Expand Down

0 comments on commit d6e8dd6

Please sign in to comment.