-
Notifications
You must be signed in to change notification settings - Fork 8
Case Studies in Reakt Reactive Java
This is a presentation of different Reakt features in the context of a real application. I have renamed the classnames and such, but this is from an in-progress microservice. It demonstrates where you would use the Reakt pieces to build a reactive Java application.
This covers usage of Reakt:
- blocking promises
Promises.all
- invokable promise
- Expected values
-
Circuit
breakers - Working with the
Reactor
- Working with streams
- Using
AsyncSupplier
s to create downstream services - Reakt Guava integration
- Using
promise.thenMap
Let's say you have an async microservices application, and you want to write some unit and integration tests. You want the tests to wait until the system starts up. Rather you want the system to notify the test when it is done loading.
NOTE: The examples below are written in Kotlin which is a JVM language from Idea. I use Kotlin because the example are easier to read and take up less space. If you know Java 8, you should follow no problem. Think of it like pseudo code. Kotlin works well with Java classes and such.
import io.advantageous.qbit.util.PortUtils
import io.advantageous.reakt.AsyncSupplier
import io.advantageous.reakt.promise.Promises
object TestUtils {
val adminPort : AtomicInteger = AtomicInteger(9090)
val servicePort : AtomicInteger = AtomicInteger(8080)
fun loadSystem(): ServicePlatform {
/** Load System. */
val loadPromise = Promises.blockingPromiseNotify(Duration.ofSeconds(4))
val servicePlatform = servicePlatform().withNamespace(Constants.TODO_SERVICE)
.setWaitForStartup(true).setAdminPort(PortUtils.findOpenPortStartAt(adminPort.andIncrement))
.setServicePort(PortUtils.findOpenPortStartAt(servicePort.andIncrement))
Main.run(servicePlatform).invokeWithPromise(loadPromise)
loadPromise.get() //Wait for the system to load before we start.
Assert.assertFalse("system loaded", loadPromise.failure())
return servicePlatform
}
Notice that we create a promising using Promises.blockingPromiseNotify(Duration.ofSeconds(4))
.
We call loadPromise.get()
to wait until the system loads. Blocking promises
are good for testing.
Now we can use the loadSystem
in our tests. Here is a test that does a health check against a running server.
@Test
@Throws(Exception::class)
fun mainHealthCheckTest() {
val servicePlatform = loadSystem()
val httpTextResponse = HttpClientBuilder.httpClientBuilder().setPort(servicePlatform.adminPort)
.buildAndStart().get("/__admin/ok")
assertNotNull(httpTextResponse)
assertEquals("true", httpTextResponse.body())
assertEquals(200, httpTextResponse.code().toLong())
shutdownSystem(servicePlatform)
}
The actual code that loads our system uses an invokable promise.
import io.advantageous.reakt.AsyncSupplier
import io.advantageous.reakt.Callback
import io.advantageous.reakt.Stream
import io.advantageous.reakt.promise.Promise
import io.advantageous.reakt.promise.Promises.*
object Main {
...
var repoServiceQueue: ServiceQueue? = null
private val logger = LoggerFactory.getLogger(Main::class.java)
...
fun main(args: Array<String>) {
run(servicePlatform().withNamespace(TODO_SERVICE)).invoke()
}
...
fun run(servicePlatform: ServicePlatform): Promise<Void> {
return invokablePromise { donePromise ->
val loadCollectionServicePromise = promiseBoolean()
val loadTodoServicePromise = promiseBoolean()
val loadPromise = all(loadCollectionServicePromise,
loadTodoServicePromise)
.thenPromise(donePromise)
createCollectionService(servicePlatform, loadPromise)
createTodoServiceService(servicePlatform, loadPromise)
servicePlatform.start()
}
}
There is a lot going on here. The run
method is using Promises.invokablePromise
.
Then the run
method uses Promises.all
to chain the promise loadCollectionServicePromise
and loadTodoServicePromise
together. The method Promises.all
is used when you want all of the promises to
trigger then the all
promise triggers. This way you are being notified when both the createCollectionService
and the createTodoServiceService
async reply. You don't want to start testing before
the system is initialized.
The project that I am working on uses leader election from elekt. Elekt, leadership API lib, uses Reakt streams. For testing, we just simulate the Elekt consul support.
The LeaderElector looks like this:
public interface LeaderElector {
/**
* Attempt to elect this service as leader.
* Returns true if successful, and false if not successful.
* @param endpoint endpoint describes the host and port of the leader.
* @param callback callback
*/
void selfElect(final Endpoint endpoint, final Callback<Boolean> callback);
/**
*
* This will send leadership changes as the occur over the stream.
*
* @param callback callback returns new leader.
*/
void leadershipChangeNotice(final Stream<Endpoint> callback);
/**
*
* This will come back quickly with a new Leader.
* If no Endpoint is returned in the callback then there is no leader.
*
* @param callback callback returns new leader.
*/
void getLeader(final Callback<Endpoint> callback);
}
To simulate that for integration testing, we use this mock LeaderElector
.
private fun createLeaderElector(): Supplier<LeaderElector> {
return Supplier {
object : LeaderElector {
override fun leadershipChangeNotice(stream: Stream<Endpoint>?) {
logger.info("Leader notice registered")
val idOfServer = Identity()
stream?.reply(Endpoint(idOfServer.host, idOfServer.servicePort))
Thread({
Thread.currentThread().isDaemon = true
while (true) {
Thread.sleep(1000 * 10)
stream?.reply(Endpoint(idOfServer.host, idOfServer.servicePort))
}
})
}
override fun selfElect(endpoint: Endpoint?, callback: Callback<Boolean>?) {
logger.info("Self elect was called")
callback?.resolve(true);
}
override fun getLeader(callback: Callback<Endpoint>?) {
logger.info("Self elect was called")
val idOfServer = Identity()
callback?.resolve(Endpoint(idOfServer.host, idOfServer.servicePort))
}
}
}
}
Notice the call to stream.reply
to send a stream of leader elect notifications that this server has been elected the leader.
It is important to monitor the health of your system, and sometimes it is good not to beat a dead horse. If downstream services are broken there is no point in using them until the are fixed. In Reakt we use Circuit Breakers and Expected to handle when some service is support to be there or some value is expected.
Let's demonstrate this with MetricsCollectionServiceImpl
.
import io.advantageous.reakt.Breaker
import io.advantageous.reakt.Breaker.*
import io.advantageous.reakt.Expected
import java.util.function.Function
import java.util.function.Supplier
import java.time.Duration.ofMillis as millis
import java.time.Duration.ofSeconds as seconds
/**
* Metrics Collection Service.
* Manages finding leader with LeaderElector.
*/
class MetricsCollectionServiceImpl
/**
* @param mgmt ServiceManagementBundle (from QBit)
* *
* @param leaderElectorSupplier leaderElectorSupplier for connecting to the leader elector.
* *
* @param todoListServiceSupplier todoListServiceSupplier for connecting
* * to the todo service.
* *
* @param metricRepository metric repo for storing metrics
*/
(
/**
* Service management bundle which includes stats collection, Reakt reactor, QBit health management, and more.
*/
private val mgmt: ServiceManagementBundle,
/**
* Supplies a leader elector interface. LeaderElector is from Elekt.
*/
private val leaderElectorSupplier: Supplier<LeaderElector>,
/**
* This is used to create a supplier.
*/
private val todoListServiceSupplier: Function<Endpoint, TodoServiceClient>,
/**
* Metric repository for saving repositories.
*/
private val metricRepository: MetricRepositoryClient)
: MetricsCollectionService {
/**
* The current leaderEndpoint which starts out empty.
*/
private var leaderEndpoint = Expected.empty<Endpoint>()
/**
* The actual todoService wrapped in a Reakt Circuit breaker.
*/
private var todoService = Breaker.opened<TodoServiceClient>()
/**
* The leader elector we are using, wrapped in Reakt Circuit breaker
*/
private var leaderElectorBreaker: Breaker<LeaderElector> = Breaker.opened()
/**
* Call count per second.
*/
private var callCount: Long = 0
init {
/*
* Check circuit breaker health every 10 seconds.
*/
mgmt.reactor().addRepeatingTask(seconds(10)) {
healthCheck()
}
createLeaderElector()
mgmt.reactor().addRepeatingTask(seconds(1)) { throughPut() }
mgmt.reactor().addRepeatingTask(millis(100), { flushService(metricRepository) })
}
Notice that we are using Reakt circuit breakers.
Notice we are using Reakt reactor's Reactor.addRepeatingTask
to periodically check the health of our repo.
Reakt's Reactor is used to manage callbacks so they execute on this thread, callback timeouts, and repeating tasks.
Let's look at the healthCheck
that runs every 10 seconds to see how circuit breakers work.
private fun healthCheck() {
if (mgmt.isFailing) {
logger.warn("CollectionService Health is suspect")
} else {
logger.debug("CollectionService is Healthy")
}
leaderElectorBreaker.ifBroken {
createLeaderElector()
}
/* If the TODO service is broken, i.e. the circuit is open then do... */
todoService.ifBroken {
/* Check to see if we have a leaderEndpoint. */
leaderEndpoint
.ifPresent { leaderEndpoint ->
this.handleNewLeader(leaderEndpoint)
}
/* If we don't have a leaderEndpoint, then look it up. */
.ifEmpty {
/* Look up the endpoint if the elector is not broken. */
leaderElectorBreaker
.ifOk { elector ->
this.lookupLeader(elector)
}
.ifBroken {
logger.warn("We have no leader and the leader elector is down")
}
}
}
}
The leaderEndpoint
is an expected value that might not exist. The methods ifOk
and ifBroken
are from circuit breaker. The ifOk
means the fuse it not burned out. The ifBroken
means the fuse blew. As you can see combining Expected
values and services wrapped in Breaker
s allows us to simplify and reasoning on what to do if things go down.
When a fuse opens or breaks, then we can work around it. Here is how we mark a broken breaker.
try {
leaderElectorBreaker = Breaker.operational(leaderElectorSupplier.get())
leaderElectorBreaker
.ifOk { this.lookupLeader(it) }
.ifBroken {
logger.error("Unable to connect to leader supplier")
}
if (leaderElectorBreaker.isOk)
mgmt.increment("leader.elector.create.success")
else
mgmt.increment("leader.elector.create.fail")
leaderElectorBreaker.ifOk {
this.handleElectionStream(it)
}
} catch (ex: Exception) {
mgmt.increment("leader.elector.create.fail.exception")
logger.error("Unable to connect to leader supplier", ex)
leaderElectorBreaker = Breaker.broken<LeaderElector>()
}
Notice the use of Breaker.operational
to denote that we have a new service to work with that should work.
Then if the service fails, we mark it has broken with Breaker.broken
.
Here is us handling the election stream that we showed a mock-up of earlier.
private fun handleElectionStream(leaderElector: LeaderElector) {
leaderElector.leadershipChangeNotice { result ->
result
.catchError { error -> // Run on this service thread
mgmt.reactor()
.deferRun {
logger.error("Error handling election stream")
mgmt.increment("leader.stream.elect.error")
this.leaderElectorBreaker = broken<LeaderElector>()
createLeaderElector()
}
}
.then { endpoint -> // Run on this service thread
mgmt.reactor().deferRun {
mgmt.increment("leader.stream.elect.notify")
logger.info("New Leader Notify {} {}", endpoint.host, endpoint.port)
handleSuccessfulLeaderLookupOrStream(endpoint)
}
}
}
}
Notice that we use reactor.deferRun
so we can handle this stream on this services thread.
Now let's show another example of Promises.all
. We have a Cassandra service that wants to write a heap of records to the DB. It wants to write the records in parallel.
/**
* Stores Metric data and results into Cassandra.
*/
internal class CassandraMetricRepository
/**
* @param sessionAsyncSupplier supplier to supply Cassandra session.
* @param serviceMgmt serviceMgmt to manage callbacks and repeating tasks.
* @param promise returns when cassandra initializes.
*
*/
(
/**
* Cassandra Session supplier.
*/
private val sessionAsyncSupplier: AsyncSupplier<Session>,
/**
* QBit serviceMgmt for repeating tasks, stats, time and callbacks that execute on the caller's thread.
*/
private val serviceMgmt: ServiceManagementBundle,
promise: Promise<Boolean>) : MetricRepositoryService {
/**
* generate the sequence for backup.
*/
private val sequenceGen = AtomicLong(2)
/**
* Reference to the cassandra session which get connected to async.
*/
private var sessionBreaker = Breaker.opened<Session>()
/**
* Error counts from Cassandra driver for the last time period.
*/
private val errorCount = AtomicLong()
...
Notice that we create our sessionBreaker
, which is our reference to Cassandra as an opened Circuit.
We define a sessionAsyncSupplier
An AsyncSupplier
is also from Reakt. It is like a regular Supplier except it is async.
We use the reactor to define a repeating task to check the health of the Cassandra connection.
init {
/* Connect the Cassandra session. */
connectSession(promise)
/*
This makes sure we are connected.
It provides circuit breaker if sessionBreaker is down to auto reconnect.
*/
serviceMgmt.reactor().addRepeatingTask(Duration.ofSeconds(5)) { this.cassandraCircuitBreaker() }
}
There we check for the health of our Cassandra session and if it goes down, we try to reconnect just like before.
We use the circuit breaker to do alternative logic if our connection goes down.
override fun recordMetrics(callback: Callback<Boolean>, metrics: List<Metric>) {
sessionBreaker()
/* if we are not connected, fail fast. */
.ifBroken { callback.reject("Not connected to Cassandra") }
/* If we are connected then call cassandra. */
.ifOk { session -> doStoreMetrics(session, callback, metrics) }
}
Note the use of ifBroken
and ifOk
. This way we can control the reconnect.
The method doStoreMetrics
stores many records to Cassandra asynchronously, and even though it saves records
in parallel it does not let its caller know via a callback, unless all of the records were stored.
/**
* Does the low level cassandra storage.
*/
private fun doStoreMetrics(session: Session,
callback: Callback<Boolean>,
metrics: List<Metric>) {
logger.debug("Storing metrics {}", metricss.size)
/* Make many calls to cassandra using its async lib to recordMetrics
each imprint. */
val promises = metrics.map({ metric -> doStoreMetric(session, metric) }).toList()
/*
* Create a parent promise to contain all of the promises we
* just created for each imprint.
*/
serviceMgmt.reactor().all(promises)
.then {
serviceMgmt.increment("bulk.store.success);
logger.info("metrics were stored {}", metrics.size)
callback.resolve(true)
}
.catchError { error ->
serviceMgmt.increment("bulk.store.error);
logger.error("Problem storing metrics ${metrics.size}", error)
callback.reject(error)
}
}
It does this call coordination by using reactor.all
to create a promise that only replies
if all of the other promise reply. The method doStoreMetric
returns a single promise. We use
Kotlin streams (just like Java streams but more concise) to turn the list of metrics into
a list of calls to doStoreMetric
into a list of Promises
which we then pass to reactor.all
to make
all of those promises into a single promise.
The doStoreMetric
uses Reakt Guava/Cassandra integration to turn a ListableFuture into a Reakt promise.
import io.advantageous.reakt.guava.Guava.registerCallback
private fun doStoreMetric(session: Session,
metric : Metric): Promise<Boolean> {
val resultSetFuture = session.executeAsync(QueryBuilder.insertInto(METRICS_TABLE)
.value("employeeId", metric.employeeId)
.value("metricType", metric.metricType.name.toLowerCase())
.value("metricName", metric.metricName)
.value("provider", metric.provider)
.value("externalId", metric.externalId)
.value("value", metric.value)
.value("surrogateKey", metric.surrogateKey)
.value("created_at", metric.timestamp))
return createPromiseFromResultSetFutureForStore(resultSetFuture, "Storing Metric")
}
private fun createPromiseFromResultSetFutureForStore(resultSetFuture: ResultSetFuture,
message: String): Promise<Boolean> {
val resultSetPromise = serviceMgmt.reactor().promise<ResultSet>()
val promise = resultSetPromise.thenMap({ it.wasApplied() }).catchError { error ->
if (error is DriverException) {
callback.ifPresent { callback1 -> callback1.reject(error.message, error) }
logger.error("Error " + message, error)
errorCount.incrementAndGet()
}
}
registerCallback<ResultSet>(resultSetFuture, resultSetPromise)
return promise
}
Notice we use registerCallback
from the Reakt Guava integration to convert the future into a promise. We also use promise.thenMap
to convert a Promise into a Promise.
override fun collectTodo(callback: Callback<Boolean>,
todoList: List<Todo>) {
callCount++
todoRepo.recordTodoList(todoList)
.then { ok ->
todoService
.ifOk { todoService1 ->
doCollectWithCallback(callback, todoList, todoService1)
}
.ifBroken {
mgmt.increment("collect.call.todo.service.broken")
logger.error("Connection to todoService is down.")
mgmt.increment("collect.broken")
}
}
.catchError { error ->
mgmt.setFailing()
logger.error("Connection to cassandra is down.", error)
callback.reject("Connection to cassandra is down. " + error.message, error)
}
.invokeWithReactor(mgmt.reactor())
}
You can invoke invokable promises in the context of a Reactor
by using .invokeWithReactor(mgmt.reactor())
. This allows the callback handlers from the promises to run in the same thread as the service actor or event loop.
I hope you enjoyed this article. It links back to areas of the Reakt documentation where you can find more details. If you are new to Reakt and what to understand the question Why Reakt and What is Reakt I suggest reading this. Also this interview about Reakt might help.
Java Promises
- Promise
- Promise then*() and catchError()
- Promise thenMap()
- Promise all()
- Promise any()
- Blocking Promise
- Invokable Promise
- Reactor Replay Promises
Reactor, Stream, Results
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Elekt Consul Leadership election
- Elekt Leadership election
- Reactive Microservices
What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Java Promises
- Promise
- Promise then*() and catchError()
- Promise thenMap()
- Promise all()
- Promise any()
- Blocking Promise
- Invokable Promise
- Reactor Replay Promises
Callback, and async Results
Reactor, Stream and Stream Result
Expected & Circuit Breaker
Scala Akka and Reakt