Releases: mp911de/spinach
0.3
SocketAddressSupplier API
The SocketAddressSupplier API allows to control the used
connection points/SocketAddresses for initial connection and reconnection.
The use case for controlling the source and sequence of
connection points are failover and cluster node discovery.
The API exposes a factory for SocketAddressSupplier
s which accepts
a RedisURI
. By default, multiple addresses are utilized in with RoundRobin.
The predefined methods can be found in the SocketAddressSupplierFactory.Factories
enum:
ROUND_ROBIN
: Cyclic use of multiple addresses specified by theRedisURI
HELLO_CLUSTER
: UsesROUND_ROBIN
for the initial connect. Once a connection is
established the mechanism obtains the cluster nodes using theHELLO
command AT STARTUP. Periodical scheduling/updating is currently not implemented.
This can be however achieved by implementing an own factory
that calls thereloadNodes()
method periodically/on demand.
New mechanisms can be implemented by implementing the
SocketAddressSupplier
/SocketAddressSupplierFactory
interfaces.
Submit your mechanism by opening a Pull-Request if you want to contribute to spinach.
DisqueClient client = new DisqueClient();
DisqueConnection<String, String> connection = client.connect(new Utf8StringCodec(), disqueURI,
SocketAddressSupplierFactory.Factories.ROUND_ROBIN);
DisqueConnection<String, String> connection = client.connect(new Utf8StringCodec(), disqueURI,
SocketAddressSupplierFactory.Factories.HELLO_CLUSTER);
QueueListener API
Spinach allows a push pattern to obtain jobs from Disque. The QueueListener API
introduces a listener/notification style by utilizing rx-java Observables.
The QueueListener API exposes an Observable<Job>
that emits jobs.
The jobs can be processed asynchronously by applying transformations
or doOnNext
steps. Each Observable<Job>
allocates at the time
of subscription resources. The resources are released when unsubscribing
from the subscription.
QueueListenerFactory<String, String> queueListenerFactory = QueueListenerFactory.create(disqueURI,
new Utf8StringCodec(), "queue"); // 1
Observable<Job<String, String>> getjobs = queueListenerFactory.getjobs(); // 2
Subscription subscribe = getjobs.doOnNext(new Action1<Job<String, String>>() { // 3
@Override
public void call(Job<String, String> job) {
// process the job
System.out.println(job.getId());
}
}).doOnNext(new Action1<Job<String, String>>() {
@Override
public void call(Job<String, String> job) {
// ack the job (different connection)
connection.sync().ackjob(job.getId());
}
}).subscribe(); // 4
- The
QueueListenerFactory
is set up. You can reuse an existingDisqueClient
to reuse thread pools. - Create an
Observable<Job>
by calling thegetjobs
method. This call
just creates the observable, but no resources are allocated. - Apply transformations or set up callbacks to process the jobs
- Finally subscribe and open the connection/listener process
To be honest, the QueueListener API is just a side-product of the
recommendation to keep track of the producer node id when receiving jobs.
A client can optimize locality to connect directly the node that produces
most of the received jobs. The QueueListenerFactory can initiate locality tracking
for a particular observable and can enable periodically scheduled checks to
switch to the node that produced the most received jobs.
Locality tracking
The node switch can also be triggered directly on a QueueListenerFactory for all
active listeners with enabled locality tracking.
QueueListenerFactory<String, String> queueListenerFactory = QueueListenerFactory.create(disqueURI,
new Utf8StringCodec(), "queue"); // 1
Observable<Job<String, String>> getjobs = queueListenerFactory
.withLocalityTracking() // 2
.withNodeSwitching(2, TimeUnit.MINUTES) // 3
.getjobs(); // 4
Subscription subscribe = getjobs.doOnNext(new Action1<Job<String, String>>() { // 5
@Override
public void call(Job<String, String> job) {
// process the job
System.out.println(job.getId());
}
}).doOnNext(new Action1<Job<String, String>>() {
@Override
public void call(Job<String, String> job) {
// ack the job (different connection)
connection.sync().ackjob(job.getId());
}
}).subscribe(); // 6
queueListenerFactory.switchNodes(); // 7
- The
QueueListenerFactory
is set up. You can reuse an existingDisqueClient
to reuse thread pools. - Enable locality tracking for this particular
Observable<Job>
- Enable node switching. The check whether node switching is necessary and
the actual reconnect happens every 2 minutes. - Create an
Observable<Job>
by calling thegetjobs
method. This call
just creates the observable, but no resources are allocated. - Apply transformations or set up callbacks to process the jobs
- Finally subscribe and open the connection/listener process
- Initiate a programmatic node switch check
The QueueListener API does not emit a terminal event.
Please note this API is experimental and may change.
Upgrade to lettuce 3.4
With this upgrade you get all the features provided within lettuce 3.4.
Some of the highlights, also available for spinach are:
- Reusable ClientResources
- EventBus and Client Events
- Command Latency Metrics
Read more: https://github.com/mp911de/lettuce/releases/tag/3.4.Final
Updated dependencies
netty 4.0.28.Final -> 4.0.34.Final
Enhancements
- Keep track of nodeId's that were obtained by GETJOB/Implement QueueListener API #8
- Support a pluggable reconnect mechanism #9
- Upgrade to lettuce 3.4 #10
- Getjob NOHANG, WITHARGUMENTS #14 #15 (thanks to @macobo)
- Switch nodes when received LEAVING in QueueListener API #17
- Adopt new Disque Job ID format #18
- Implement PAUSE command #19
- Implement QSTAT command #20
Fixes
For complete information on spinach see the website:
0.2
This release addresses minor issues and allows to use lettuce 3.3.Final. It adds support for more Disque commands and eases creation of a DisqueURI.
Enhancements
- Provide a Reactive API #4 (shipped with 0.1.1)
- Implement CLUSTER commands #5 (shipped with 0.1.1)
- Implement JSCAN command #6 (shipped with 0.1.1)
- Implement NACK command #11
- Upgraded lettuce from 3.2.Final to 3.3.Final #13 (Thanks to @pulse00)
Other Changes
- Bump rxjava to 1.0.14
- Polishing and sugar for DisqueURI
spinach requires a minimum of Java 8 to build and Java 6 run. It is tested continuously against Disque unstable.
For complete information on spinach see the website:
0.1.1
This release fixes a reconnection bug. spinach allows to specify multiple connection points. The first working connection point will allow a connection but if the connection is down, only the connection point of the broken connection will be used for reconnecting. So the reconnect sticks to the broken host/connection point without trying the other specified connection points. The current release addresses this issue by using round-robin and utilizes all provided connection points for a reconnect.
Fixes
- Reconnect sticks to the last known connection point and does not use additional connection points #7
spinach requires a minimum of Java 8 to build and Java 6 run. It is tested continuously against Disque.
For complete information on spinach see the website:
0.1 / Initial Release
Initial Release