An implementation of a JMS consumer that processes message sequentially if the messages have the same key but concurrently if they have different keys.
##Description of Solution
There are two main classes to reading messages concurrently while keeping the order of message per key:
*SequenceManager *AbstractKeySequenceMessageListener
The SequenceManager is responsible from reading the queues sequentially and assigning a different JMS session (from a pool) per message. A different session is given per message so that when a message is acknowledged, it does not acknowledge any other message that has already been read.
The AbstractKeySequenceMessageListener is responsible of keeping track of order of the messages with the same key.
###Algorithm
The flow of message processing is as follows:
#####Detail of Flow
Considering a MQueue with the following messages (and respective keys):
The SequenceManager will read each of the messages sequentially in a separate session, extract the key from the message and delegate them to the AbstractKeySequenceMessageListener. The Listener will then check if an internal queue already exists for the key of the message. If there is, then the message is added at the end of that queue. If not, then a new internal queue is created and the message is added to that queue.
Step 1
Step 2
Step N
When a message is added to the respective internal queue, the listener will create a new thread that will wait for the current message to be the first in the internal queue and perform the task specified on the doTask method of implemented on the Listener.
Once the task for the first element of the internal queue is completed the message will be acknowledge, the JMS session committed and remove the element from the internal queue.
If the queue is empty, meaning no other message with the same key has been received since the completion of the task, then the internal queue deleted to save memory.
To use the SequenceManager the following implementation needs to be done:
1. Implement a class extending AbstractKeySequenceMessageListener
public class MyMessageListener extends AbstractKeySequenceMessageListener {
public MyMessageListener(int maxNbThreads) {
super(maxNbThreads);
}
@Override
public void doTask(Message message) {
// perform the task for the message
}
@Override
public void onMessageError(Message message, Exception e) {
// error handling code
}
}
2. Implement a class implementing MessageKeyExtractor
public class MyMessageKeyExtractor implements MessageKeyExtractor{
@Override
public String extractKey(Message message) throws JMSException {
String myKey = "";
//Extract key from message
return myKey;
}
}
3. Create a JMS Connection Factory
MQConnectionFactory connectionFactory = new MQConnectionFactory();
connectionFactory.setHostName(MQ_HOST_NAME);
connectionFactory.setPort(MQ_PORT);
connectionFactory.setQueueManager(MQ_MANAGER);
connectionFactory.setChannel(MQ_CHANNEL);
connectionFactory.setTransportType(MQ_TRANSPORT_TYPE);
4. Instantiate the Sequence Manager
new SequenceManager(QUEUE_NAME, connectionFactory, new MyMessageListener(100), new MyMessageKeyExtractor()))
##Performance
Performance statistics using Java SE client:
- 5% of the messages have the same key
- 100 maximum threads are used on the sequential-concurrent and concurrent
*messages with the same keys are produced one after the other (1.0, 1.1, 1.3. ... 2.1, 2.2, 2.3, ...) **messages with the same keys are produced after each batch (1.0, 2.0, 3.0, ..., 1.1, 2.1, 3.1, ...)