Skip to content

Promises.all

Richard Hightower edited this page Sep 9, 2016 · 13 revisions

We have all functionality with promises. You can create a promise that waits on all promises passed to it to be async returned.

All example using Guava Reakt lib for Cassandra

Promises.all(
 //Call to save Todo item in two table, don't respond until 
 // both calls come back from Cassandra.
 // First call to cassandra.
 futureToPromise(
         session.executeAsync(insertInto("Todo")
                 .value("id", todo.getId())
                 .value("updatedTime", todo.getUpdatedTime())
                 .value("createdTime", todo.getCreatedTime())
                 .value("name", todo.getName())
                 .value("description", todo.getDescription()))
                 ).catchError(error -> recordCassandraError("add.todo", error))
                 .thenSafe(resultSet -> handleResultFromAdd(resultSet, "add.todo")),
 // Second call to cassandra.
 futureToPromise(
         session.executeAsync(insertInto("TodoLookup")
                 .value("id", todo.getId())
                 .value("updatedTime", todo.getUpdatedTime()))
                 ).catchError(error -> recordCassandraError("add.lookup", error))
                 .thenSafe(resultSet -> handleResultFromAdd(resultSet, "add.lookup")
).catchError(returnPromise::reject)
                .then(v -> returnPromise.resolve(true)).invoke()                

Promises.all example

        /** Employee service. */
        EmployeeService employeeService = ...

        /* Promise that expects an employee. */
        Promise<Employee> promise1 = Promises.promise();
        Promise<Employee> promise2 = Promises.promise();


        /* Promise that returns when all employees are returned. */
        final Promise<Void> allPromise = Promises.all(promise1, promise2);


        allPromise.then(nil -> System.out.println("All DONE!"));

        assertFalse("Not done yet", allPromise.complete());

        /** Call service. */
        employeeService.loadEmployee("1", promise1);

        /** Still not done because only one service has been called. */
        assertFalse("Still not done yet", allPromise.complete());

        /** Ok now second service is called. */
        employeeService.loadEmployee("2", promise2);

        /** Wait some time. */
        //...

        assertTrue(allPromise.complete());
        assertTrue(allPromise.success());

We have three types of all promises: callback, blocking and replay callback. (A replay callback is a callback that gets replayed in the caller's thread).

API for creating all promise

...
public interface Promises ...{


    /**
     * All promises must complete.
     * @param promises promises
     * @return return containing promise
     */
    static Promise<Void> all(Promise<?>... promises) {
        return new AllPromise(promises);
    }

    /**
     * All promises must complete.
     * @param promises promises
     * @return return containing promise that is blocking.
     */
    static Promise<Void> allBlocking(Promise<?>... promises) {
        return new AllBlockingPromise(promises);
    }


    /**
     * All promises must complete.
     * @param timeout timeout
     * @param time time
     * @param promises promises
     * @return returns replay promise so promise can be replayed in caller's thread.
     */
    static ReplayPromise<Void> allReplay(final Duration timeout, long time, Promise<?>... promises) {
        return new AllReplayPromise(timeout, time, promises);
    }

    /**
     *
     * All promises must complete.
     * @param timeout timeout
     * @param promises promises
     * @return returns replay promise so promise can be replayed in caller's thread.
     */
    static ReplayPromise<Void> allReplay(final Duration timeout, Promise<?>... promises) {
        return Promises.allReplay(timeout, System.currentTimeMillis(), promises);
    }
...

Given this test service.

    public static class TestService {

        public void simple(Callback<Employee> callback) {
            callback.reply(new Employee("Rick"));
        }


        public void async(final Callback<Employee> callback) {

            new Thread(() -> {
                callback.reply(new Employee("Rick"));
            }).start();
        }


        public void asyncTimeout(final Callback<Employee> callback) {

            new Thread(() -> {
                try {
                    Thread.sleep(10_000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                callback.reply(new Employee("Rick"));
            }).start();
        }

        public void asyncError(final Callback<Employee> callback) {
            new Thread(() -> {
                callback.reject("Rick");
            }).start();
        }



        public void error(Callback<Employee> callback) {
            callback.reject("Error");
        }

        public void exception(Callback<Employee> callback) {
            callback.reject(new IllegalStateException("Error"));
        }
    }

We can have these three example tests.

Blocking example

import io.advantageous.reakt.Callback;
import io.advantageous.reakt.Expected;
import io.advantageous.reakt.Promises.promise;
    @Test
    public void testAllBlocking() throws Exception {

        TestService testService = new TestService();

        Promise<Employee> promise1 = Promises.promise();
        Promise<Employee> promise2 = Promises.promise();

        final Promise<Void> promise = Promises.allBlocking(promise1, promise2);

        assertFalse(promise.complete());

        testService.async(promise1);

        assertFalse(promise.complete());

        testService.async(promise2);

        assertTrue(promise.success());

    }

Callback example

    @Test
    public void testAll() throws Exception {

        /** Test service. */
        TestService testService = new TestService();

        /* Promise that expects an employee. */
        Promise<Employee> promise1 = Promises.promise();
        Promise<Employee> promise2 = Promises.promise();


        /* Promise that returns when all employees are returned. */
        final Promise<Void> promise = Promises.all(promise1, promise2);


        promise.then(nil -> System.out.println("DONE!"));

        assertFalse("Not done yet", promise.complete());

        /** Call service. */
        testService.simple(promise1);

        /** Still not done because only one service has been called. */
        assertFalse(promise.complete());

        /** Ok now second service is called. */
        testService.simple(promise2);

        /** Wait some time. */
        //...

        assertTrue(promise.complete());
        assertTrue(promise.success());

    }

Replay example

    @Test
    public void testAllReplay() throws Exception {

        TestService testService = new TestService();

        Promise<Employee> promise1 = Promises.promise();
        Promise<Employee> promise2 = Promises.promise();

        final ReplayPromise<Void> promise = Promises.allReplay(Duration.ofMillis(1000),
                promise1, promise2);

        assertFalse(promise.complete());

        testService.async(promise1);

        assertFalse(promise.complete());

        testService.async(promise2);



        for (int index=0; index < 10; index++) {
            promise.check(System.currentTimeMillis());
            if (promise.complete()) break;
            Thread.sleep(10);

        }


        assertTrue(promise.complete());
        assertTrue(promise.success());

    }