Skip to content

Commit

Permalink
[SPARK-3902] [SPARK-3590] Stabilize AsynRDDActions and add Java API
Browse files Browse the repository at this point in the history
This PR adds a Java API for AsyncRDDActions and promotes the API from `Experimental` to stable.

Author: Josh Rosen <joshrosen@apache.org>
Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#2760 from JoshRosen/async-rdd-actions-in-java and squashes the following commits:

0d45fbc [Josh Rosen] Whitespace fix.
ad3ae53 [Josh Rosen] Merge remote-tracking branch 'origin/master' into async-rdd-actions-in-java
c0153a5 [Josh Rosen] Remove unused variable.
e8e2867 [Josh Rosen] Updates based on Marcelo's review feedback
7a1417f [Josh Rosen] Removed unnecessary java.util import.
6f8f6ac [Josh Rosen] Fix import ordering.
ff28e49 [Josh Rosen] Add MiMa excludes and fix a scalastyle error.
346e46e [Josh Rosen] [SPARK-3902] Stabilize AsyncRDDActions; add Java API.
  • Loading branch information
JoshRosen committed Oct 20, 2014
1 parent 7e63bb4 commit d1966f3
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 35 deletions.
33 changes: 33 additions & 0 deletions core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java;


import java.util.List;
import java.util.concurrent.Future;

public interface JavaFutureAction<T> extends Future<T> {

/**
* Returns the job IDs run by the underlying async operation.
*
* This returns the current snapshot of the job list. Certain operations may run multiple
* jobs, so multiple calls to this method may return different lists.
*/
List<Integer> jobIds();
}
86 changes: 71 additions & 15 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

package org.apache.spark

import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.Try
import java.util.Collections
import java.util.concurrent.TimeUnit

import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaFutureAction
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}

import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.{Failure, Try}

/**
* :: Experimental ::
* A future for the result of an action to support cancellation. This is an extension of the
* Scala Future interface to support cancellation.
*/
@Experimental
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
// documentation (with reference to the word "action").
Expand Down Expand Up @@ -69,6 +70,11 @@ trait FutureAction[T] extends Future[T] {
*/
override def isCompleted: Boolean

/**
* Returns whether the action has been cancelled.
*/
def isCancelled: Boolean

/**
* The value of this Future.
*
Expand Down Expand Up @@ -96,15 +102,16 @@ trait FutureAction[T] extends Future[T] {


/**
* :: Experimental ::
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
@Experimental
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
extends FutureAction[T] {

@volatile private var _cancelled: Boolean = false

override def cancel() {
_cancelled = true
jobWaiter.cancel()
}

Expand Down Expand Up @@ -143,6 +150,8 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
}

override def isCompleted: Boolean = jobWaiter.jobFinished

override def isCancelled: Boolean = _cancelled

override def value: Option[Try[T]] = {
if (jobWaiter.jobFinished) {
Expand All @@ -164,12 +173,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:


/**
* :: Experimental ::
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
@Experimental
class ComplexFutureAction[T] extends FutureAction[T] {

// Pointer to the thread that is executing the action. It is set when the action is run.
Expand Down Expand Up @@ -222,7 +229,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob
// command need to be in an atomic block.
val job = this.synchronized {
if (!cancelled) {
if (!isCancelled) {
rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc)
} else {
throw new SparkException("Action has been cancelled")
Expand All @@ -243,10 +250,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
}
}

/**
* Returns whether the promise has been cancelled.
*/
def cancelled: Boolean = _cancelled
override def isCancelled: Boolean = _cancelled

@throws(classOf[InterruptedException])
@throws(classOf[scala.concurrent.TimeoutException])
Expand All @@ -271,3 +275,55 @@ class ComplexFutureAction[T] extends FutureAction[T] {
def jobIds = jobs

}

private[spark]
class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S => T)
extends JavaFutureAction[T] {

import scala.collection.JavaConverters._

override def isCancelled: Boolean = futureAction.isCancelled

override def isDone: Boolean = {
// According to java.util.Future's Javadoc, this returns True if the task was completed,
// whether that completion was due to successful execution, an exception, or a cancellation.
futureAction.isCancelled || futureAction.isCompleted
}

override def jobIds(): java.util.List[java.lang.Integer] = {
Collections.unmodifiableList(futureAction.jobIds.map(Integer.valueOf).asJava)
}

private def getImpl(timeout: Duration): T = {
// This will throw TimeoutException on timeout:
Await.ready(futureAction, timeout)
futureAction.value.get match {
case scala.util.Success(value) => converter(value)
case Failure(exception) =>
if (isCancelled) {
throw new CancellationException("Job cancelled").initCause(exception)
} else {
// java.util.Future.get() wraps exceptions in ExecutionException
throw new ExecutionException("Exception thrown by job", exception)
}
}
}

override def get(): T = getImpl(Duration.Inf)

override def get(timeout: Long, unit: TimeUnit): T =
getImpl(Duration.fromNanos(unit.toNanos(timeout)))

override def cancel(mayInterruptIfRunning: Boolean): Boolean = synchronized {
if (isDone) {
// According to java.util.Future's Javadoc, this should return false if the task is completed.
false
} else {
// We're limited in terms of the semantics we can provide here; our cancellation is
// asynchronous and doesn't provide a mechanism to not cancel if the job is running.
futureAction.cancel()
true
}
}

}
53 changes: 41 additions & 12 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import java.util.{Comparator, List => JList, Iterator => JIterator}
import java.lang.{Iterable => JIterable, Long => JLong}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec

import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
Expand Down Expand Up @@ -294,8 +296,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Applies a function f to all elements of this RDD.
*/
def foreach(f: VoidFunction[T]) {
val cleanF = rdd.context.clean((x: T) => f.call(x))
rdd.foreach(cleanF)
rdd.foreach(x => f.call(x))
}

/**
Expand Down Expand Up @@ -576,16 +577,44 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def name(): String = rdd.name

/**
* :: Experimental ::
* The asynchronous version of the foreach action.
*
* @param f the function to apply to all the elements of the RDD
* @return a FutureAction for the action
* The asynchronous version of `count`, which returns a
* future for counting the number of elements in this RDD.
*/
@Experimental
def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
import org.apache.spark.SparkContext._
rdd.foreachAsync(x => f.call(x))
def countAsync(): JavaFutureAction[JLong] = {
new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), JLong.valueOf)
}

/**
* The asynchronous version of `collect`, which returns a future for
* retrieving an array containing all of the elements in this RDD.
*/
def collectAsync(): JavaFutureAction[JList[T]] = {
new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => x.asJava)
}

/**
* The asynchronous version of the `take` action, which returns a
* future for retrieving the first `num` elements of this RDD.
*/
def takeAsync(num: Int): JavaFutureAction[JList[T]] = {
new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => x.asJava)
}

/**
* The asynchronous version of the `foreach` action, which
* applies a function f to all the elements of this RDD.
*/
def foreachAsync(f: VoidFunction[T]): JavaFutureAction[Void] = {
new JavaFutureActionWrapper[Unit, Void](rdd.foreachAsync(x => f.call(x)),
{ x => null.asInstanceOf[Void] })
}

/**
* The asynchronous version of the `foreachPartition` action, which
* applies a function f to each partition of this RDD.
*/
def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = {
new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x)),
{ x => null.asInstanceOf[Void] })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.reflect.ClassTag

import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* A set of asynchronous RDD actions available through an implicit conversion.
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
*/
@Experimental
class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging {

/**
Expand Down
Loading

0 comments on commit d1966f3

Please sign in to comment.