-
Notifications
You must be signed in to change notification settings - Fork 434
Distributed Queue
IMPORTANT - We recommend that you do NOT use ZooKeeper for Queues. Please see Tech Note 4 for details.
An implementation of the Distributed Queue ZK recipe. Items put into the queue are guaranteed to be ordered (by means of ZK's PERSISTENT_SEQUENTIAL node). If a single consumer takes items out of the queue, they will be ordered FIFO. If ordering is important, use a LeaderSelector to nominate a single consumer.
- QueueBuilder
- QueueConsumer
- QueueSerializer
- DistributedQueue
public static QueueBuilder builder(CuratorFramework client,
QueueConsumer consumer,
QueueSerializer serializer,
java.lang.String queuePath)
Parameters:
client - the curator client
consumer - functor to receive messages
serializer - serializer to use for items
queuePath - path to store queue
QueueBuilder builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedQueue<MessageType queue = builder.build();
The queue must be started via the start()
method. Call close()
when you are done with the queue.
To add messages to the queue:
queue.put(aMessage);
Your consumer (QueueConsumer.consumeMessage()
) will get called as messages arrive.
In the general usage case, the message is removed from the queue prior to the consumer being called. A more atomic mode is provided that removes the item from the queue only after the consumer successfully returns. To enable this mode, call the lockPath()
method of the Queue Builder. This uses a lock to make the message recoverable. A lock is held while the message is being processed - this prevents other processes from taking the message. The message will not be removed from the queue until the consumer functor returns. Thus, if there is a failure or the process dies, the message will get sent to another process. There is a small performance penalty for this behavior however.
The Distributed queue writes messages using this format:
OFFSET | SIZE | DESCRIPTION |
0 | 4 | Format version. Currently 0x00010001 |
4 | 1 | Opcode: 0x01 = message, 0x02 = End of data |
5 | 4 | Message byte length |
9 | n | Message: serialized message bytes |
9 + n | ... | Next set of opcode-size-bytes until end of data |
The QueueConsumer
class extends ConnectionStateListener
. When the queue is started, it adds the listener to the Curator instance. Users of the DistributedQueue
must pay attention to any connection state changes.
If the SUSPENDED state is reported, the instance must assume that, until it receives a RECONNECTED state, the queue is no longer being updated. If the LOST state is reported, the queue is permanently down.
- Curator
- Javadoc
- Coverage Report
- Getting Started
- Examples
- FAQ
- Client
- Framework
-
Recipes
- Leader Latch
- Leader Election
- Shared Reentrant Lock
- Shared Lock
- Shared Reentrant Read Write Lock
- Shared Semaphore
- Multi Shared Lock
- Distributed Queue
- Distributed Id Queue
- Distributed Priority Queue
- Distributed Delay Queue
- Simple Distributed Queue
- Barrier
- Double Barrier
- Shared counter
- Distributed Atomic Long
- Path Cache
- Node Cache
- Utilities – Test Server, Test Cluster, ZKPaths, EnsurePath, QueueSharder, Reaper, ChildReaper
- Tech Notes
- Errors
- Exhibitor Integration
- Extensions
- Logging and Tracing