Skip to content

QBit's Reakt Promise, Callback Support

Richard Hightower edited this page Apr 26, 2016 · 2 revisions

QBit supports Reakt Callbacks and Promises (not to mention Reakt's reactor).

Continuing on with our Restful microservice example, we show how you can mix Reakt's promises with the QBit microservices lib. Reakt is the Reactive Java Library.

If you have read through the QBit microservice documentation, you know that you can call QBit services remotely or locally by using client proxies. Now QBit supports Reakt Callbacks and Promises in those client proxies.

Here is an example using our Todo example from before.

Using Reakt Promise in QBit proxy

package com.mammatustech.todo;

import io.advantageous.reakt.promise.Promise;

import java.util.List;

public interface TodoManager {
    Promise<Boolean> add(Todo todo);
    Promise<Boolean> remove(String id);
    Promise<List<Todo>> list();
}

Let's implement our earlier service using a manager class.

TodoManager

package com.mammatustech.todo;

import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.RequestParam;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.service.stats.StatsCollector;
import io.advantageous.reakt.reactor.Reactor;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.TreeMap;

import static io.advantageous.qbit.annotation.QueueCallbackType.*;

public class TodoManagerImpl  {


    private final Map<String, Todo> todoMap = new TreeMap<>();

    /**
     * Used to manage callbacks and such.
     */
    private final Reactor reactor;

    /**
     * Stats Collector for KPIs.
     */
    private final StatsCollector statsCollector;

    public TodoManagerImpl(Reactor reactor, StatsCollector statsCollector) {
        this.reactor = reactor;
        this.statsCollector = statsCollector;

        /** Send stat count i.am.alive every three seconds.  */
        this.reactor.addRepeatingTask(Duration.ofSeconds(3),
                () -> statsCollector.increment("todoservice.i.am.alive"));

        this.reactor.addRepeatingTask(Duration.ofSeconds(1), statsCollector::clientProxyFlush);
    }


    public void add(final Callback<Boolean> callback, final Todo todo) {

        /** Send KPI add.called every time the add method gets called. */
        statsCollector.increment("todoservice.add.called");
        todoMap.put(todo.getId(), todo);
        callback.accept(true);
    }


    public void remove(final Callback<Boolean> callback, final @RequestParam("id") String id) {

        /** Send KPI add.removed every time the remove method gets called. */
        statsCollector.increment("todoservice.remove.called");
        Todo remove = todoMap.remove(id);
        callback.accept(remove != null);

    }


    public void list(final Callback<ArrayList<Todo>> callback) {

        /** Send KPI add.list every time the list method gets called. */
        statsCollector.increment("todoservice.list.called");

        callback.accept(new ArrayList<>(todoMap.values()));
    }


    @QueueCallback({EMPTY, IDLE, LIMIT})
    public void process() {
        reactor.process();
    }


}

All of the service code from before sans the RequestMappings is in this class. We also left in some stats gathering from the StatsD Microservice Monitoring example. Notice that the proxy interface and the service methods do not have to match. In the service is typical to use callbacks, but in the client proxies, you can use callbacks or promises. Promises give a nice, fluent programming flow.

Testing an async lib can be difficult, but we can use a Reakt blocking promise to help test this. But before we do that, let's run this service in a service bundle as follows:

Running the TodoManager service in a service bundle.

    @Test
    public void testManager() throws Exception {

        /** Create service bundle . */
        final ServiceBundleBuilder serviceBundleBuilder = serviceBundleBuilder();
        serviceBundleBuilder.getRequestQueueBuilder().setBatchSize(1); //for testing
        final ServiceBundle serviceBundle = serviceBundleBuilder.build();

        /** Create implementation of our manager. */
        final TodoManagerImpl todoManagerImpl = new TodoManagerImpl(Reactor.reactor(), new StatsCollector() {
        });

        /** Add implementation to service bundle. */
        serviceBundle.addServiceObject("todo", todoManagerImpl);
        final TodoManager todoManager = serviceBundle.createLocalProxy(TodoManager.class, "todo");
        serviceBundle.start();
...

Now let's test async adding a Todo to the TodoManager.

Async adding a Todo to the TodoManager test using a blocking promise

        /** Add a Todo. */
        final Promise<Boolean> addPromise = blockingPromise();
        todoManager.add(new Todo("Buy Tesla", "Buy new Tesla", System.currentTimeMillis()))
                .catchError(Throwable::printStackTrace).invokeWithPromise(addPromise);
        assertTrue(addPromise.get());

Notice we use a blocking promise in the test. In the app we would use a replay promise or a callback promise with then handlers and catchError handlers.

Here is the full test exampling using blocking promises.

Example of using a Promise based client proxy and blocking promises

package com.mammatustech.todo;

import io.advantageous.qbit.service.ServiceBundle;
import io.advantageous.qbit.service.ServiceBundleBuilder;
import io.advantageous.qbit.service.stats.StatsCollector;
import io.advantageous.reakt.promise.Promise;
import io.advantageous.reakt.reactor.Reactor;
import org.junit.Test;

import java.util.List;

import static io.advantageous.qbit.service.ServiceBundleBuilder.*;
import static io.advantageous.reakt.promise.Promises.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class TodoManagerTest {


    @Test
    public void testManager() throws Exception {

        /** Create service bundle . */
        final ServiceBundleBuilder serviceBundleBuilder = serviceBundleBuilder();
        serviceBundleBuilder.getRequestQueueBuilder().setBatchSize(1);
        final ServiceBundle serviceBundle = serviceBundleBuilder.build();

        /** Create implementation. */
        final TodoManagerImpl todoManagerImpl = new TodoManagerImpl(Reactor.reactor(), new StatsCollector() {
        });

        /** Add implementation to service bundle. */
        serviceBundle.addServiceObject("todo", todoManagerImpl);
        final TodoManager todoManager = serviceBundle.createLocalProxy(TodoManager.class, "todo");
        serviceBundle.start();

        /** Add a Todo. */
        final Promise<Boolean> addPromise = blockingPromise();
        todoManager.add(new Todo("Buy Tesla", "Buy new Tesla", System.currentTimeMillis()))
                .catchError(Throwable::printStackTrace).invokeWithPromise(addPromise);
        assertTrue(addPromise.get());

        /** Call list to get a list of Todos. */
        final Promise<List<Todo>> listPromise = blockingPromise();
        todoManager.list().invokeWithPromise(listPromise);
        final List<Todo> todos = listPromise.get();
        assertEquals(1, todos.size());
        assertEquals("Buy Tesla", todos.get(0).getName());

        /** Get the id of the Todo to remove it. */
        final String id = todos.get(0).getId();

        /** Remove the todo with the todo id.  */
        final Promise<Boolean> removePromise = blockingPromise();
        todoManager.remove(id).invokeWithPromise(removePromise);
        assertTrue(removePromise.get());

        /** See if the todo was removed.  */
        final Promise<List<Todo>> listPromise2 = blockingPromise();
        todoManager.list().invokeWithPromise(listPromise2);
        final List<Todo> todos2 = listPromise2.get();
        assertEquals(0, todos2.size());

    }
}

QBit generates stubs with Callbacks args or callback methods that return Invokable promises. Invokable promises allow you to write fluent, lambda friendly code.

Example of fluent, lambda friendly invokable promise code

        employeeService.lookupEmployee("123")
               .then((employee)-> {...}).catchError(...).invoke();

We changed our TodoService to use a TodoManager so we could show using invokable promises in the wild.

TodoService uses TodoManager

package com.mammatustech.todo;

import io.advantageous.qbit.annotation.*;
import io.advantageous.qbit.reactive.Callback;

import java.util.*;

import static io.advantageous.qbit.annotation.QueueCallbackType.EMPTY;
import static io.advantageous.qbit.annotation.QueueCallbackType.IDLE;
import static io.advantageous.qbit.annotation.QueueCallbackType.LIMIT;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;


/**
 * <code>
 *     curl -X POST -H "Content-Type: application/json" /
 *     http://localhost:8888/v1/todo-service/todo /
 *     -d '{"id":"id1", "name":"buy tesla", "description":"daddy wants"}'
 * </code>
 *
 * <code>
 *     curl http://localhost:8888/v1/todo-service/todo
 * </code>
 */
@RequestMapping("/todo-service")
public class TodoService {

    private final TodoManager todoManager;

    public TodoService(final TodoManager todoManager) {
        this.todoManager = todoManager;
    }


    @RequestMapping(value = "/todo", method = RequestMethod.POST)
    public void add(final Callback<Boolean> callback,
                    final Todo todo) {
        todoManager.add(todo)
                .catchError(callback::reject)
                .then(callback::resolve)
                .invoke();
    }



    @RequestMapping(value = "/todo", method = RequestMethod.DELETE)
    public void remove(final Callback<Boolean> callback,
                       final @RequestParam("id") String id) {
        todoManager.remove(id)
                .catchError(callback::reject)
                .then(callback::resolve)
                .invoke();
    }



    @RequestMapping(value = "/todo", method = RequestMethod.GET)
    public void list(final Callback<List<Todo>> callback) {
        todoManager.list()
                .catchError(callback::reject)
                .then(callback::resolve)
                .invoke();
    }



    @QueueCallback({EMPTY, IDLE, LIMIT})
    public void process() {
        flushServiceProxy(todoManager);
    }


}

For completeness here is our main method which still has the StatsD code from the last example.

TodoServiceMain

package com.mammatustech.todo;


import io.advantageous.qbit.admin.ManagedServiceBuilder;
import io.advantageous.qbit.service.ServiceBundle;
import io.advantageous.qbit.service.stats.StatsCollector;
import io.advantageous.reakt.reactor.Reactor;

import java.net.URI;
import java.util.Objects;

public class TodoServiceMain {


    public static void main(final String... args) throws Exception {

        //To test locally use https://hub.docker.com/r/samuelebistoletti/docker-statsd-influxdb-grafana/
        final URI statsdURI = URI.create("udp://192.168.99.100:8125");

        //For timer
        final Reactor reactor = Reactor.reactor();


        /* Create the ManagedServiceBuilder which manages a clean shutdown, health, stats, etc. */
        final ManagedServiceBuilder managedServiceBuilder =
                ManagedServiceBuilder.managedServiceBuilder()
                        .setRootURI("/v1") //Defaults to services
                        .setPort(8888); //Defaults to 8080 or environment variable PORT

        /** Enable statsD */
        enableStatsD(managedServiceBuilder, statsdURI);
        final StatsCollector statsCollector = managedServiceBuilder.createStatsCollector();


        /** Create todo impl. */
        final TodoManagerImpl impl = new TodoManagerImpl(reactor, statsCollector);


        /** Create service bundle for internal todo manager. */
        final ServiceBundle serviceBundle = managedServiceBuilder.createServiceBundleBuilder().build();
        serviceBundle.addServiceObject("todoManager", impl).startServiceBundle();


        /** Create TodoManager. */
        final TodoManager todoManager = serviceBundle.createLocalProxy(TodoManager.class, "todoManager");

        /** Start the REST/Websocket service. */
        managedServiceBuilder.addEndpointService(new TodoService(todoManager)).getEndpointServerBuilder()
                .build().startServer();

        /* Start the admin builder which exposes health end-points and swagger meta data. */
        managedServiceBuilder.getAdminBuilder().build().startServer();

        System.out.println("Todo Server and Admin Server started");

    }

    /**
     * Enable Stats D.
     *
     * @param host statsD host
     * @param port statsD port
     */
    public static void enableStatsD(ManagedServiceBuilder managedServiceBuilder, String host, int port) {
        if (port < 1) throw new IllegalStateException("StatsD port must be set");
        Objects.requireNonNull(host, "StatsD Host cannot be null");
        if (host.isEmpty()) throw new IllegalStateException("StatsD Host name must not be empty");
        managedServiceBuilder.getStatsDReplicatorBuilder().setHost(host).setPort(port);
        managedServiceBuilder.setEnableStatsD(true);
    }

    /**
     * Enable Stats D.
     *
     * @param uri for statsd
     */
    public static void enableStatsD(ManagedServiceBuilder managedServiceBuilder, URI uri) {
        if (!uri.getScheme().equals("udp")) throw new IllegalStateException("Scheme must be udp");
        enableStatsD(managedServiceBuilder, uri.getHost(), uri.getPort());
    }
}

The main method creates a Manager and creates a Service and wires them together.

In this example, we showed using Promises in the local client proxy with a service bundle, but you could also use them from a remote proxy exposed via WebSocket.

In the next example we implement this example interface.

Example interface

    interface ServiceDiscovery {
        Promise<URI> lookupService(URI uri);
    }

We implement the above example interface as Remote WebSocket RPC, local service bundle, local service queue, using strongly typed and loosely typed end points.

Example showing Promises being used in local and remote proxies

package io.advantageous.qbit.vertx;


import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.client.Client;
import io.advantageous.qbit.client.ClientBuilder;
import io.advantageous.qbit.server.EndpointServerBuilder;
import io.advantageous.qbit.server.ServiceEndpointServer;
import io.advantageous.qbit.service.ServiceBuilder;
import io.advantageous.qbit.service.ServiceBundle;
import io.advantageous.qbit.service.ServiceBundleBuilder;
import io.advantageous.qbit.service.ServiceQueue;
import io.advantageous.qbit.time.Duration;
import io.advantageous.qbit.util.PortUtils;
import io.advantageous.reakt.promise.Promise;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.*;

public class ReaktInterfaceTest {

    final URI successResult = URI.create("http://localhost:8080/employeeService/");

    ServiceDiscovery serviceDiscovery;
    ServiceDiscovery serviceDiscoveryStrongTyped;
    ServiceDiscovery serviceDiscoveryServiceBundle;
    ServiceDiscovery serviceDiscoveryWebSocket;


    ServiceDiscoveryImpl impl;
    URI empURI;
    CountDownLatch latch;
    AtomicReference<URI> returnValue;
    AtomicReference<Throwable> errorRef;

    int port;
    Client client;
    ServiceEndpointServer server;
    ServiceBundle serviceBundle;
    ServiceQueue serviceQueue;
    ServiceQueue serviceQueue2;

    @Before
    public void before() {

        port = PortUtils.findOpenPortStartAt(9000);


        latch = new CountDownLatch(1);
        returnValue = new AtomicReference<>();
        errorRef = new AtomicReference<>();
        impl = new ServiceDiscoveryImpl();
        empURI = URI.create("marathon://default/employeeService?env=staging");


        server = EndpointServerBuilder.endpointServerBuilder()
                .addService("/myservice", impl)
                .setPort(port).build().startServer();

        Sys.sleep(200);

        client = ClientBuilder.clientBuilder().setPort(port).build().startClient();

        serviceQueue = ServiceBuilder.serviceBuilder().setServiceObject(impl).buildAndStartAll();
        serviceBundle = ServiceBundleBuilder.serviceBundleBuilder().build();
        serviceBundle.addServiceObject("myservice", impl);
        serviceQueue2 = ServiceBuilder.serviceBuilder().setInvokeDynamic(false).setServiceObject(impl)
                .buildAndStartAll();


        serviceDiscoveryServiceBundle = serviceBundle.createLocalProxy(ServiceDiscovery.class, "myservice");
        serviceBundle.start();

        serviceDiscovery = serviceQueue.createProxyWithAutoFlush(ServiceDiscovery.class, Duration.TEN_MILLIS);
        serviceDiscoveryStrongTyped = serviceQueue2.createProxyWithAutoFlush(ServiceDiscovery.class,
                Duration.TEN_MILLIS);

        serviceDiscoveryWebSocket = client.createProxy(ServiceDiscovery.class, "/myservice");
    }

    @After
    public void after() {
        serviceQueue2.stop();
        serviceQueue.stop();
        serviceBundle.stop();
        server.stop();
        client.stop();
    }

    public void await() {
        try {
            latch.await(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }

    @Test
    public void testServiceWithReturnPromiseSuccess() {
        testSuccess(serviceDiscovery);
        testSuccess(serviceDiscoveryStrongTyped);
        testSuccess(serviceDiscoveryServiceBundle);
        testSuccess(serviceDiscoveryWebSocket);

    }

    private void testSuccess(ServiceDiscovery serviceDiscovery) {
        serviceDiscovery.lookupService(empURI).then(this::handleSuccess)
                .catchError(this::handleError).invoke();
        await();
        assertNotNull("We have a return", returnValue.get());
        assertNull("There were no errors", errorRef.get());
        assertEquals("The result is the expected result", successResult, returnValue.get());
    }


    @Test
    public void testServiceWithReturnPromiseFail() {
        testFail(serviceDiscovery);
        testFail(serviceDiscoveryStrongTyped);
        testFail(serviceDiscoveryServiceBundle);
        testFail(serviceDiscoveryWebSocket);
    }

    private void testFail(ServiceDiscovery serviceDiscovery) {
        serviceDiscovery.lookupService(null).then(this::handleSuccess)
                .catchError(this::handleError).invoke();

        await();
        assertNull("We do not have a return", returnValue.get());
        assertNotNull("There were  errors", errorRef.get());
    }


    @Test(expected = IllegalStateException.class)
    public void testServiceWithReturnPromiseSuccessInvokeTwice() {
        final Promise<URI> promise = serviceDiscovery.lookupService(empURI).then(this::handleSuccess)
                .catchError(this::handleError);
        promise.invoke();
        promise.invoke();
    }

    @Test
    public void testIsInvokable() {
        final Promise<URI> promise = serviceDiscovery.lookupService(empURI).then(this::handleSuccess)
                .catchError(this::handleError);

        assertTrue("Is this an invokable promise", promise.isInvokable());
    }


    private void handleError(Throwable error) {
        errorRef.set(error);
        latch.countDown();
    }

    private void handleSuccess(URI uri) {
        returnValue.set(uri);
        latch.countDown();
    }


    interface ServiceDiscovery {
        Promise<URI> lookupService(URI uri);
    }

    public class ServiceDiscoveryImpl {
        public void lookupService(final io.advantageous.qbit.reactive.Callback<URI> callback, final URI uri) {
            if (uri == null) {
                callback.reject("uri can't be null");
            } else {
                callback.resolve(successResult);
            }
        }
    }
}
Clone this wiki locally