Skip to content

Commit

Permalink
Merge with jhberges/stompjms branch containing the heartbeat implemen…
Browse files Browse the repository at this point in the history
…tation

See fusesource#18

Usage with -Dstompjms.heartbeat=X,Y
  • Loading branch information
themerius committed Jun 28, 2016
2 parents 28fafb2 + 645b3f8 commit b6311b2
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,37 @@
*/
package org.fusesource.stomp.activemq;

import junit.framework.TestCase;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.fusesource.stomp.jms.StompJmsConnectionFactory;

import javax.jms.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class ActiveMQJmsStompTest extends TestCase {
public class ActiveMQJmsStompTest {
BrokerService broker;
int port;

@Override
protected void setUp() throws Exception {
super.setUp();
@Before
public void setUp() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
TransportConnector connector = broker.addConnector("stomp://0.0.0.0:0");
Expand All @@ -37,11 +48,10 @@ protected void setUp() throws Exception {
port = connector.getConnectUri().getPort();
}

@Override
protected void tearDown() throws Exception {
@After
public void tearDown() throws Exception {
broker.stop();
broker.waitUntilStopped();
super.tearDown();
}

protected ConnectionFactory createConnectionFactory() throws Exception {
Expand All @@ -50,6 +60,7 @@ protected ConnectionFactory createConnectionFactory() throws Exception {
return result;
}

@Test
public void testDurableSubs() throws Exception {
Connection connection1 = createConnectionFactory().createConnection();
connection1.setClientID("client1");
Expand Down Expand Up @@ -113,9 +124,85 @@ public void testQueueSendReceiveSingleConnection() throws Exception {
connection1.close();
}

private void assertTextMessageReceived(String expected, MessageConsumer sub) throws JMSException {
@Test
@Ignore("Test added as support for AMQ-4493 - chained request/replies over ActiveMQ. The test *will* fail as of 2013-08-21")
public void testChainedRequestReply() throws Exception {
final String firstTopic = "mytopic";
final String secondTopic = "secondtopic";
// First block sending "1" and expecting "123" as reply.
Connection connectionSource = createConnectionFactory().createConnection();
connectionSource.start();
Session sessionSource = connectionSource.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = sessionSource.createProducer(sessionSource.createTopic(firstTopic));

Destination replyToProducer = sessionSource.createTemporaryQueue();
MessageConsumer sourceReplySubscription = sessionSource.createConsumer(replyToProducer);

// Second block receiving "1" and using "mytopic" to append "3" after it's own "2"
final Connection connectionIntermediate = createConnectionFactory().createConnection();
connectionIntermediate.start();
final Session sessionIntermediate = connectionIntermediate.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageConsumer consumerIntermediate = sessionIntermediate.createConsumer(sessionIntermediate.createTopic(firstTopic));
Thread intermediateThread = new Thread(new Runnable() {

public void run() {
try {
Message message = consumerIntermediate.receive();
System.out.println("Producer -> Intermediate: " + message);
Destination replyToIntermediate = sessionIntermediate.createTemporaryQueue();
MessageProducer intermediateProducer = sessionIntermediate.createProducer(sessionIntermediate.createTopic(secondTopic));
TextMessage secondMessage = sessionIntermediate.createTextMessage(((TextMessage) message).getText() + "2");
intermediateProducer.send(secondMessage);

MessageConsumer intermediateReplyConsumer = sessionIntermediate.createConsumer(replyToIntermediate);
Message finalResponse = intermediateReplyConsumer.receive();
MessageProducer replyProducer = sessionIntermediate.createProducer(message.getJMSDestination());

System.out.println("Intermediate -> Producer: " + finalResponse);
replyProducer.send(finalResponse);
} catch (JMSException e) {
Assert.fail(e.getMessage());
}
}});
intermediateThread.setDaemon(true);
intermediateThread.start();

// Final block receiving "12" appending "3" and replying
final Connection connectionFinal = createConnectionFactory().createConnection();
connectionIntermediate.start();
final Session sessionFinal = connectionIntermediate.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageConsumer consumerFinal = sessionIntermediate.createConsumer(sessionIntermediate.createTopic(secondTopic));
Thread finalThread = new Thread(new Runnable() {

public void run() {
try {
Message message = consumerFinal.receive();
System.out.println("Intermediate -> Final: " + message);
TextMessage finalResponse = sessionFinal.createTextMessage(((TextMessage) message).getText() + "3");

MessageProducer replyProducer = sessionFinal.createProducer(message.getJMSDestination());
System.out.println("Final -> Intermediate: " + finalResponse);
replyProducer.send(finalResponse);
} catch (JMSException e) {
Assert.fail(e.getMessage());
}
}});
finalThread.setDaemon(true);
finalThread.start();

TextMessage sourceTextMessage = sessionSource.createTextMessage("1");
sourceTextMessage.setJMSReplyTo(replyToProducer);
producer.send(sourceTextMessage);

assertTextMessageReceived("123", sourceReplySubscription);
connectionSource.close();
connectionIntermediate.close();
connectionFinal.close();
}

private void assertTextMessageReceived(final String expected, final MessageConsumer sub) throws JMSException {
Message msg = sub.receive(1000*5);
assertNotNull("A message was not received.", msg);
assertEquals(expected, ((TextMessage)msg).getText());
Assert.assertNotNull("A message was not received.", msg);
Assert.assertEquals(expected, ((TextMessage)msg).getText());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public interface Constants {
final AsciiBuffer CONTENT_TYPE = ascii("content-type");
final AsciiBuffer TRANSFORMATION = ascii("transformation");
final AsciiBuffer TRANSFORMATION_ERROR = ascii("transformation-error");
final AsciiBuffer HEARTBEAT = ascii("heart-beat");

/**
* This header is used to instruct ActiveMQ to construct the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class Stomp {

private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("stompjms.thread.keep_alive", ""+1000));
private static final long STACK_SIZE = Long.parseLong(System.getProperty("stompjms.thread.stack_size", ""+1024*512));
public static final String HEARTBEAT_INTERVAL = System.getProperty("stompjms.heartbeat", "0,0");

private static ThreadPoolExecutor blockingThreadPool;
public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
Expand Down Expand Up @@ -161,6 +162,7 @@ public void onTransportConnected() {
if (clientId != null) {
frame.addHeader(CLIENT_ID, StompFrame.encodeHeader(clientId));
}
frame.addHeader(HEARTBEAT, StompFrame.encodeHeader(HEARTBEAT_INTERVAL));
if( customHeaders!=null ) {
for (Object key : customHeaders.keySet()) {
frame.addHeader(StompFrame.encodeHeader(key.toString()), StompFrame.encodeHeader(customHeaders.get(key).toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@

package org.fusesource.stomp.jms;

import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.stomp.client.Callback;
import org.fusesource.stomp.client.CallbackConnection;
import org.fusesource.stomp.client.Promise;
import org.fusesource.stomp.client.ProtocolException;
import org.fusesource.stomp.client.Stomp;
import org.fusesource.stomp.codec.StompFrame;
import org.fusesource.stomp.jms.message.StompJmsMessage;
import org.fusesource.stomp.jms.util.StompTranslator;
import static org.fusesource.hawtdispatch.Dispatch.NOOP;
import static org.fusesource.stomp.client.Constants.ABORT;
import static org.fusesource.stomp.client.Constants.ACK;
Expand All @@ -23,6 +33,7 @@
import static org.fusesource.stomp.client.Constants.ID;
import static org.fusesource.stomp.client.Constants.MESSAGE;
import static org.fusesource.stomp.client.Constants.MESSAGE_ID;
import static org.fusesource.stomp.client.Constants.NEWLINE;
import static org.fusesource.stomp.client.Constants.SELECTOR;
import static org.fusesource.stomp.client.Constants.SEND;
import static org.fusesource.stomp.client.Constants.SERVER;
Expand All @@ -31,11 +42,18 @@
import static org.fusesource.stomp.client.Constants.SUBSCRIPTION;
import static org.fusesource.stomp.client.Constants.TRANSACTION;

import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.net.ssl.SSLContext;

import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -84,6 +102,7 @@ public class StompChannel {
StompServerAdaptor serverAdaptor;
String clientId;
private long disconnectTimeout = 10000;
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);

public AsciiBuffer sessionId() {
return sessionId;
Expand All @@ -107,10 +126,20 @@ public StompChannel copy() {

CountDownLatch connectedLatch = new CountDownLatch(1);

private ScheduledFuture<?> heartBeatFuture;

public void connect() throws JMSException {
if (this.connected.compareAndSet(false, true)) {
try {
final Promise<CallbackConnection> future = new Promise<CallbackConnection>();
final Promise<CallbackConnection> future = new Promise<CallbackConnection>() {

@Override
public void onSuccess(final CallbackConnection value) {
rescheduleHeartBeat();
super.onSuccess(value);
}

};
Stomp stomp = new Stomp(brokerURI);
stomp.setLogin(userName);
stomp.setPasscode(password);
Expand Down Expand Up @@ -196,7 +225,7 @@ public void onSuccess(StompFrame value) {
});
}
});

stopHeartbeats();
// Wait for the disconnect to finish..
try {
cd.await(getDisconnectTimeout(), TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -225,6 +254,7 @@ public void sendMessage(StompJmsMessage copy, AsciiBuffer txid, boolean sync) th
} else {
sendFrame(frame);
}
rescheduleHeartBeat();
} catch (IOException e) {
throw StompJmsExceptionSupport.create(e);
}
Expand Down Expand Up @@ -357,6 +387,7 @@ public void onFailure(Throwable value) {
@Override
public void onSuccess(Void value) {
writeBufferRemaining.getAndAdd(size);
rescheduleHeartBeat();
}
});
}
Expand All @@ -368,6 +399,7 @@ public void onSuccess(Void value) {
@Override
public void onSuccess(Void value) {
writeBufferRemaining.getAndAdd(size);
rescheduleHeartBeat();
super.onSuccess(value);
}
};
Expand Down Expand Up @@ -604,4 +636,32 @@ public SSLContext getSslContext() {
public void setSslContext(SSLContext sslContext) {
this.sslContext = sslContext;
}

private void rescheduleHeartBeat() {
stopHeartbeats();
heartBeatFuture = scheduledThreadPoolExecutor.schedule(new Callable<Void>() {
public Void call() throws Exception {
StompFrame heartbeatFrame = new StompFrame(SEND);
heartbeatFrame.addHeader(DESTINATION, AsciiBuffer.ascii(channelId));
heartbeatFrame.content(NEWLINE);
heartbeatFrame.addContentLengthHeader();
sendFrame(heartbeatFrame);
rescheduleHeartBeat();
return null;
}
},
getSendInterval(Stomp.HEARTBEAT_INTERVAL),
TimeUnit.MILLISECONDS);
}

private long getSendInterval(String heartbeatInterval) {
return Long.parseLong(heartbeatInterval.split(",")[0]);
}

private synchronized void stopHeartbeats() {
if (null != heartBeatFuture && !(heartBeatFuture.isCancelled() || heartBeatFuture.isDone())) {
heartBeatFuture.cancel(false);
heartBeatFuture = null;
}
}
}

0 comments on commit b6311b2

Please sign in to comment.