Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add factory method that allows to resolve/reject a Future #1529

Closed
danieldietrich opened this issue Aug 24, 2016 · 11 comments
Closed

Add factory method that allows to resolve/reject a Future #1529

danieldietrich opened this issue Aug 24, 2016 · 11 comments

Comments

@danieldietrich
Copy link
Contributor

danieldietrich commented Aug 24, 2016

In ES6 we use Promise like this:

new Promise((resolve, reject) => {
    try {
        resolve(longRunningComputation());
    } catch (err) {
        reject(err);
    }
})
.then(...)
.catch(...);

We aligned to Scala when creating Future (for read operations) and Promise (for write operations). But Promise looks too heavy. It is a wrapper around a Future that provides statements to mutate the underlying Future. Promise is not monadic opposed to Future.

When starting async programming with Scala it was first hard for me to get the difference between Future and Promise and when to use which.

I think we could greatly simplify the API by providing ES6 like factory methods and first deprecate, then remove Promise completely (in the next major release).

We write

return Future.of(resolve -> {
        onComplete(result -> resolve.accept(mapper.apply(result)));
});

instead of

// implementation of Future.mapTry
final Promise<T> promise = Promise.make();
this.onComplete(result -> promise.complete(mapper.apply(result)));
return promise.future();

Note: there will be different factory methods with and without reject function, having BiConsumer, two Consumers or one Consumer, with and without ExecutorService. Examples will follow.

@danieldietrich danieldietrich added this to the 2.1.0 milestone Aug 24, 2016
@danieldietrich
Copy link
Contributor Author

danieldietrich commented Aug 25, 2016

Let's dig deeper into the topic on how the new Future.of((resolve, reject) -> ...) API should make use of ExecutorServices.

Example 1: Expensive resolve call, e.g. having a long running operation

// starts a new Thread because longRunningCompuration() must not block the current Thread
Future.of(resolve -> resolve.accept(longRunningComputation()));

// uses someExecutor to start a new Thread
Future.of(someExecutor, resolve -> resolve.accept(longRunningComputation()));

Example 2: Cheap resolve call, e.g. for internal Future.map implementation

Future<T> first = Future.of(() -> longRunningComputation());
Function<T, U> mapper = ...;

// creates a new Thread for second Future
Future<U> second = Future.of(resolve ->
        // onComplete reuses Thread of first Future
        first.onComplete(result -> resolve(result.map(mapper))
);

Here are the details what happens:

  • Case 1: The first Future is not completed. Then first.onComplete(action) will add action to the internal list of actions. This happens on the Thread of second Future!
  • Case 2: The first Future is completed. Then first.onComplete(action) will immediately perform the action on the Thread of the first Future (we could change that behavior by passing an ExecutorService to the first.onComplete() call.). So we are safe, even if the given mapper Function is expensive.

Summed up, the second Future immediately finishes the computation because first.onComplete() is executed on the Thread of first Future.

If we are aware of cheap running operations, we should execute the operation on the current thread:

Future<U> second = Future.of(SAME_THREAD_EXECUTOR_SERVICE, resolve ->
        first.onComplete(result -> resolve(result.map(mapper))
);

This is equivalent (in fact it will be equivalent when we finished #1530) to:

Future<U> second = Future.of(SAME_THREAD_EXECUTOR_SERVICE, resolve ->
        first.onComplete(SAME_THREAD_EXECUTOR_SERVICE, result -> resolve(result.map(mapper))
);

The SAME_THREAD_EXECUTOR_SERVICE is just a dummy for an ExecutorService that does all operations on the same Thread. Where to put this singleton? These are the options:

  • Future.sameThread(<computation>) factory methods. This does not scale well because non-static operations can not leverage the sameThread() methods.
  • Future.of(Future.SAME_THREAD_EXECUTOR_SERVICE, <computation> singleton. This better but the name is ugly. Can we provide a better API?

The DEFAULT_EXECUTOR_SERVICE is also located within the Future interface:

public static final ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();

I would not create another class for the Javaslang ExecutorServices.

Conclusion

In the case of long running operations, we need to create a new Thread.

The golden rule is:

  • new Threads are created on calls of static Future factory methods
  • Threads are reused by default on Future operations (= instance methods) but an optional executor may be provided

@danieldietrich danieldietrich modified the milestones: 2.2.0, 2.1.0 Aug 25, 2016
@danieldietrich
Copy link
Contributor Author

danieldietrich commented Aug 25, 2016

Syntax

This is the suggested new API (and also an interesting example for co-variant Consumers!):

interface Future<T> {

    // using Future DEFAULT_EXECUTOR_SERVICE
    static <T> Future<T> of(CheckedConsumer2<Consumer<? extends T>, Consumer<? extends Throwable>> resolveOrReject);
    static <T> Future<T> of(CheckedConsumer<Consumer<? extends T>> resolve, CheckedConsumer<Consumer<? extends Throwable>> reject);
    static <T> Future<T> of(CheckedConsumer<Consumer<? extends T>> resolve);

    // with explicit ExecutorService
    static <T> Future<T> of(ExecutorService executorService, CheckedConsumer2<Consumer<? extends T>, Consumer<? extends Throwable>> resolveOrReject);
    static <T> Future<T> of(ExecutorService executorService, CheckedConsumer<Consumer<? extends T>> resolve, CheckedConsumer<Consumer<? extends Throwable>> reject);
    static <T> Future<T> of(ExecutorService executorService, CheckedConsumer<Consumer<? extends T>> resolve);
}

Note that we do not have CheckedConsumer0..8 yet - but I think there exists an issue for that. (I will provide the link here.)

Semantics (informal)

Without the loss of generality we define the semantics of the following function:

interface Future<T> {

    static <T> Future<T> of(ExecutorService executorService, CheckedConsumer2<Consumer<? extends T>, Consumer<? extends Throwable>> resolveOrReject) {
        final FutureImpl<T> future = new FutureImpl<>(executorService);
        future.run(() -> {
            final Context<T> context = ... // TBD
            final Consumer<T> resolve = result -> context.set(result); // TBD
            final Consumer<Throwable> reject = x -> throw x; // TBD
            resolveOrReject.apply(resolve, reject));
            return context.get();
        }
        return future;
    }
}

@danieldietrich
Copy link
Contributor Author

The nature of a Promise

One might think we loose the ability to pass around a writable object that is able to mutate an underlying Future, e.g.

Promise<T> giveMeAPromiseOfTypeT() {
    return Promise.make();
}

What we gain here is just the type T. We have to complete that Promise ourselves with an appropriate value.

final Promise<T> promise = giveMeAPromise();

mightBeInterestedIn(promise.future());

// zZz
promise.complete(foo());

What's wrong with this code?

We pulled the computational logic out of our asynchronous computation context.

If we pass the underlying Future around we do even not know if the Future will be completed at all.

We could use a Future instead of a Promise:

final Future<T> future = Future.of(() -> foo());
mightBeInterestedIn(future);

Yes, that's also working. So when do we need to use Promise at all?

Promise is a way (/workaround?) to combine Futures.

Given a Future (that is not completed yet) we might further process the Future value. The only way to do this is to register a completion handler/action, i.e. with one

  • onComplete(Consumer<Try<T>>)
  • onSuccess(Consumer<T>)
  • onFailure(Consumer<Throwable>).

But these methods take Consumer functions, they do not return a value. If we want to transform the value we need to return a new Future because the value might not be present yet. Currently we can't express this with our old-fashioned Future API. Instead we use a Promise:

<U> Future<U> transform(Future<T> future, Function<T, U> transformation) {
    final Promise<U> promise = Promise.make();
    future.onComplete(result -> promise.complete(transformation.apply(result)));
    return promise.future();
}

It has already been determined which transformation should be applied. Our new Future API leverages this property and brings the computation back into the asynchronous computation context:

<U> Future<U> transform(Future<T> future, Function<T, U> transformation) {
    return Future.of(resolve ->
            resolve.accept(transformation.apply(result))
    );
}

Currently I find no example where it is necessary to provide the resolve function to the outside.

@viktorklang
Copy link

viktorklang commented Aug 25, 2016

Hi @danieldietrich,

Thanks for inviting me here :)

Long brain dump ahead, I hope you can glean any value from it!

We aligned to Scala when creating Future (for read operations) and Promise (for write operations).

Sounds cool :)

But Promise looks too heavy.

What is the definition of heavy here?

It is a wrapper around a Future that provides statements to mutate the underlying Future.

Technically, at least in Scala, it is not a wrapper.

Promise is not monadic opposed to Future.

This is a statement, but comes across ass criticism. Should it be monadic, what would the value be? The proposed solution is not monadic either?

When starting async programming with Scala it was first hard for me to get the difference between Future and Promise and when to use which.

Could this be solved with better / more documentation?

I saw a section on scheduling considerations, and I'd like to recommend the following, from my former colleague @havocp, which has been a great guiding principle to avoid rather nasty surprises in runtime behavior: http://blog.ometer.com/2011/07/24/callbacks-synchronous-and-asynchronous/

I think the idea of providing a completion function is an interesting one:

def futureOf[U](f: (Try[U] => Boolean) => Unit)(implicit ec: ExecutionContext): Future[U] = // 1
  Future.successful(()).flatMap[U](_ => { // 2
    val p = Promise[U]() // 3
    f(p.tryComplete _) // 4
    p.future // 5
  })

Lines explained:

1: The supplied function f takes a function of Try[U] which will be used to complete the underlying Promise/Future. It returns Boolean because otherwise there is no way to tell if the completion was successful (Promise.tryComplete. Promise.complete, Promise.(try)success and Promise.(try)failure can all be implemented on top of that.)
It needs the implicit ExecutionContext to know where to evaluate f since f could be a "long running", IO-using or otherwise effectful function (since it returns Unit)

2: In Scala 2.12 we could use Future.unit.flatMap instead (cached instance of Future.successful(())), but we need to defer execution to the EC. And we want to have isomorphic behavior to other function application of future, so if f throws exceptions then we need to fail the resulting future, otherwise we risk having it hang around unresolved.

3: We create a new promise of the desired result type since we need to transport the values sent to us to the returned Future

4: We ETA-expand the tryComplete function and pass it into f to let the user code complete the resulting Future

5: we make sure to return the resulting Future to flatMap, carrying the resulting value back to the returned Future of the futureOf method

ALTERNATIVELY:

def future[T]: ((Try[T] => Boolean), Future[T]) = {
  val p = Promise[T]()
  (p.tryComplete _, p.future)
}

The alternative is arguably much simpler.

Promise: The Good & the Bad

Good:

  • Communicates intent, Promise carries more important information, most notably: The "owner" of a Promise has promised to complete it. Example: def x(p: Promise[T]) carries more information than: def x(f: Try[T] => Boolean)
  • Gives access to check Promise::isCompleted without having to complete it (compared to Try[T] => Boolean
  • Through Promise::future there is also possibility to read, not only write (compared to Try[T] => Boolean
  • Cheap: Allocating a Scala DefaultPromise is a single allocation, with no additional allocation for the Future instance.

Bad:

  • One more thing to learn (besides Future)

So, I think, my current opinion I that what you propose is a nice, complementary, way of completing Promises. In my experience, I rarely need to deal with Promises directly, but when I do, having the intent which is Promise is really valuable, both to the reader of the code, and the writer of the code.

I hope the Future of your day is great!

Cheers,

@viktorklang
Copy link

Addendum: There are some really interesting news in the Futures for Scala 2.12, which I've described in a blog series here

@danieldietrich
Copy link
Contributor Author

danieldietrich commented Aug 25, 2016

Hi @viktorklang,

thanks for joining the party and sharing your thoughts! I've followed your links, great stuff (still have to dig deeper regarding space-leaks with ACPS-code - wondering if Java(slang) has the same problem).

I hope the Future of your day is great!

so far so good :)

Your points and the code examples are very helpful. Coming from

// Scala
((T => Boolean, Throwable => Boolean) => Unit
// Java
CheckedConsumer2<Predicate<T>, Predicate<Throwable>>

your deduction helped me to understand that, what really happens is, we pass a Promise to our Future constructor. Here are the deduction steps:

// Scala
(Try[T] => Boolean) => Unit
// Java
CheckedConsumer<Predicate<Try<T>>>

Communicates intent, Promise carries more important information, most notably: The "owner" of a Promise has promised to complete it. Example: def x(p: Promise[T]) carries more information than: def x(f: Try[T] => Boolean)

This could be addressed in Java by using @FunctionalInterfaces (Java has no function literals in the same manner Scala has):

// Java
@FunctionalInterface
public interface Promise<T> {
    boolean tryComplete(Try<T> result);
}

Now the type of the function underlines its purpose.

Gives access to check Promise::isCompleted without having to complete it (compared to Try[T] => Boolean

Yep, isCompleted() is not any more available on such a function and that hurts. This can be solved by passing a real Promise to the Consumer:

// Scala
def futureOf[U](f: Promise[U] => Unit)(implicit ec: ExecutionContext): Future[U] =
  Future.successful(()).flatMap[U](_ => {
    val p = Promise[U]()
    f(p)
    p.future
  })
// Java
static <U> Future<U> of(ExecutorService executorService, CheckedConsumer<Promise<U>> f) {
    return Future.successful(null).flatMap(executorService , ignored -> {
        final Promise<U> p = Promise.make(executorService);
        f.accept(p);
        return p.future();
    });
}

Btw - the trick with the cached instance Future.successful(()) is cool to circumvent an explicit try/catch in order to handle exceptions thrown by f! Need that, too :)

My original example can then be expressed like this:

// Java
return Future.of(promise -> {
        onComplete(result -> promise.trySuccess(mapper.apply(result)));
});

(Please note that the example is not very good because it is the body of the Future.map implementation. The call to onComplete() is a call of the 'outer' Future method. That's confusing here, I know.)

I think having such a Future constructor could be helpful. I agree that removing Promise is not a good idea.

Thanks again, Viktor. I really love your APIs/your code. In Future/Promise it all boils down to compositional calls on top of a thin, hardened core.

@danieldietrich danieldietrich changed the title Make Future resolvable/rejectable and deprecate Promise Make Future resolvable/rejectable and ~~deprecate Promise~~ Aug 25, 2016
@viktorklang
Copy link

@danieldietrich Thanks for inviting me! I'll play around with the idea of an alternative Future "factory" when I have some time to focus :)

In Future/Promise it all boils down to compositional calls on top of a thin, hardened core.

This. So much.

@danieldietrich
Copy link
Contributor Author

@viktorklang Hope this might be useful in some way. Looking forward to hear from the Klang-Meister :)

@viktorklang
Copy link

@danieldietrich Definitely looks promising—thanks for improving my understanding :-)
Stay awesome!

@danieldietrich danieldietrich changed the title Make Future resolvable/rejectable and ~~deprecate Promise~~ Add factory method that allows to resolve/reject a Future Aug 26, 2016
@danieldietrich
Copy link
Contributor Author

danieldietrich commented Aug 26, 2016

Idea: If Promise<T> would extend Function<Try<T>>, Boolean>, then it could be better composed (at least in Java, in Scala functions are way better integrated into the language). We could deprecate boolean tryComplete(Try<T>) (but not neccessarily have to) and add Boolean apply(Try<T>).

class Promise<T> implements Function1<Try<T>, Boolean> {
    @Override
    public Boolean apply(Try<T> result) {
        return tryComplete(result);
    }
    ...
}

Note: When introducing a rich Predicate1 in Javaslang the above might be replaced with class Promise<T> implements Predicate1<Try<T>>. But maybe this is unnecessary because Project Valhalla will allow us to have primitive generic type parameters!?

Our example then looks like this:

<U> Future<U> map(Function<? super T, ? extends U> mapper) {
    return Future.of(p -> onComplete(p.compose(mapper)));
}

In Scala Future(onComplete(_ compose mapper)) ?

(Hope this is right - writing this from the top of my head while cooking... Needs to be tested...)

@viktorklang Having Promise.apply() would fit good in Scala 2.12 because trySuccess() and tryFailure() are deprecated. Now Promise has only a single complete function. Other Scala types are also Functions, like Map, Set, Seq, ...

@danieldietrich
Copy link
Contributor Author

danieldietrich commented Sep 25, 2017

As Viktor mention on his blog, Promise might not be used as promised by the user, e.g.

Promise<?> promise = Promise.make();
promise.completeWith(promise.future); // 😱

We removed it in #2093 and introduced the equivalent Future.join():

static <T> Future<T> join(CheckedConsumer<Predicate<Try<? extends T>>> computation) { ... }

As the name suggests, join is used to join other future computations (that where forked before). The computation does not eat any extra threads, it is executed synchronously. Typically the tryComplete handler is used in onComplete handlers of other futures.

Example:

// new API
Future<String> greeting(Future<String> nameFuture) {
    return Future.join(tryComplete -> {
        nameFuture.onComplete(name -> tryComplete.test(Try.success("Hi " + name)));
    });
}

// old API
Future<String> greeting(Future<String> nameFuture) {
    final Promise promise = Promise.make();
    nameFuture.onComplete(name -> promise.tryComplete(Try.success("Hi " + name)));
    return promise.future();
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants