Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposed heartbeat implementation #18

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,37 @@
*/
package org.fusesource.stomp.activemq;

import junit.framework.TestCase;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.fusesource.stomp.jms.StompJmsConnectionFactory;

import javax.jms.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

/**
* <p>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class ActiveMQJmsStompTest extends TestCase {
public class ActiveMQJmsStompTest {
BrokerService broker;
int port;

@Override
protected void setUp() throws Exception {
super.setUp();
@Before
public void setUp() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
TransportConnector connector = broker.addConnector("stomp://0.0.0.0:0");
Expand All @@ -37,11 +48,10 @@ protected void setUp() throws Exception {
port = connector.getConnectUri().getPort();
}

@Override
protected void tearDown() throws Exception {
@After
public void tearDown() throws Exception {
broker.stop();
broker.waitUntilStopped();
super.tearDown();
}

protected ConnectionFactory createConnectionFactory() throws Exception {
Expand All @@ -50,6 +60,7 @@ protected ConnectionFactory createConnectionFactory() throws Exception {
return result;
}

@Test
public void testDurableSubs() throws Exception {
Connection connection1 = createConnectionFactory().createConnection();
connection1.setClientID("client1");
Expand Down Expand Up @@ -81,9 +92,85 @@ public void testDurableSubs() throws Exception {
connection2.close();
}

private void assertTextMessageReceived(String expected, MessageConsumer sub) throws JMSException {
@Test
@Ignore("Test added as support for AMQ-4493 - chained request/replies over ActiveMQ. The test *will* fail as of 2013-08-21")
public void testChainedRequestReply() throws Exception {
final String firstTopic = "mytopic";
final String secondTopic = "secondtopic";
// First block sending "1" and expecting "123" as reply.
Connection connectionSource = createConnectionFactory().createConnection();
connectionSource.start();
Session sessionSource = connectionSource.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = sessionSource.createProducer(sessionSource.createTopic(firstTopic));

Destination replyToProducer = sessionSource.createTemporaryQueue();
MessageConsumer sourceReplySubscription = sessionSource.createConsumer(replyToProducer);

// Second block receiving "1" and using "mytopic" to append "3" after it's own "2"
final Connection connectionIntermediate = createConnectionFactory().createConnection();
connectionIntermediate.start();
final Session sessionIntermediate = connectionIntermediate.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageConsumer consumerIntermediate = sessionIntermediate.createConsumer(sessionIntermediate.createTopic(firstTopic));
Thread intermediateThread = new Thread(new Runnable() {

public void run() {
try {
Message message = consumerIntermediate.receive();
System.out.println("Producer -> Intermediate: " + message);
Destination replyToIntermediate = sessionIntermediate.createTemporaryQueue();
MessageProducer intermediateProducer = sessionIntermediate.createProducer(sessionIntermediate.createTopic(secondTopic));
TextMessage secondMessage = sessionIntermediate.createTextMessage(((TextMessage) message).getText() + "2");
intermediateProducer.send(secondMessage);

MessageConsumer intermediateReplyConsumer = sessionIntermediate.createConsumer(replyToIntermediate);
Message finalResponse = intermediateReplyConsumer.receive();
MessageProducer replyProducer = sessionIntermediate.createProducer(message.getJMSDestination());

System.out.println("Intermediate -> Producer: " + finalResponse);
replyProducer.send(finalResponse);
} catch (JMSException e) {
Assert.fail(e.getMessage());
}
}});
intermediateThread.setDaemon(true);
intermediateThread.start();

// Final block receiving "12" appending "3" and replying
final Connection connectionFinal = createConnectionFactory().createConnection();
connectionIntermediate.start();
final Session sessionFinal = connectionIntermediate.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageConsumer consumerFinal = sessionIntermediate.createConsumer(sessionIntermediate.createTopic(secondTopic));
Thread finalThread = new Thread(new Runnable() {

public void run() {
try {
Message message = consumerFinal.receive();
System.out.println("Intermediate -> Final: " + message);
TextMessage finalResponse = sessionFinal.createTextMessage(((TextMessage) message).getText() + "3");

MessageProducer replyProducer = sessionFinal.createProducer(message.getJMSDestination());
System.out.println("Final -> Intermediate: " + finalResponse);
replyProducer.send(finalResponse);
} catch (JMSException e) {
Assert.fail(e.getMessage());
}
}});
finalThread.setDaemon(true);
finalThread.start();

TextMessage sourceTextMessage = sessionSource.createTextMessage("1");
sourceTextMessage.setJMSReplyTo(replyToProducer);
producer.send(sourceTextMessage);

assertTextMessageReceived("123", sourceReplySubscription);
connectionSource.close();
connectionIntermediate.close();
connectionFinal.close();
}

private void assertTextMessageReceived(final String expected, final MessageConsumer sub) throws JMSException {
Message msg = sub.receive(1000*5);
assertNotNull("A message was not received.", msg);
assertEquals(expected, ((TextMessage)msg).getText());
Assert.assertNotNull("A message was not received.", msg);
Assert.assertEquals(expected, ((TextMessage)msg).getText());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public interface Constants {
final AsciiBuffer CONTENT_TYPE = ascii("content-type");
final AsciiBuffer TRANSFORMATION = ascii("transformation");
final AsciiBuffer TRANSFORMATION_ERROR = ascii("transformation-error");
final AsciiBuffer HEARTBEAT = ascii("heart-beat");

/**
* This header is used to instruct ActiveMQ to construct the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class Stomp {

private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("stompjms.thread.keep_alive", ""+1000));
private static final long STACK_SIZE = Long.parseLong(System.getProperty("stompjms.thread.stack_size", ""+1024*512));
public static final String HEARTBEAT_INTERVAL = System.getProperty("stompjms.heartbeat", "0,0");

private static ThreadPoolExecutor blockingThreadPool;
public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
Expand Down Expand Up @@ -161,6 +162,7 @@ public void onTransportConnected() {
if (clientId != null) {
frame.addHeader(CLIENT_ID, StompFrame.encodeHeader(clientId));
}
frame.addHeader(HEARTBEAT, StompFrame.encodeHeader(HEARTBEAT_INTERVAL));
if( customHeaders!=null ) {
for (Object key : customHeaders.keySet()) {
frame.addHeader(StompFrame.encodeHeader(key.toString()), StompFrame.encodeHeader(customHeaders.get(key).toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,52 @@

import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.stomp.client.Callback;
import org.fusesource.stomp.client.CallbackConnection;
import org.fusesource.stomp.client.Promise;
import org.fusesource.stomp.client.ProtocolException;
import org.fusesource.stomp.client.Stomp;
import org.fusesource.stomp.codec.StompFrame;
import org.fusesource.stomp.client.Callback;
import org.fusesource.stomp.client.Promise;
import org.fusesource.stomp.jms.message.StompJmsMessage;
import org.fusesource.stomp.jms.util.StompTranslator;
import static org.fusesource.hawtdispatch.Dispatch.NOOP;
import static org.fusesource.stomp.client.Constants.ABORT;
import static org.fusesource.stomp.client.Constants.ACK;
import static org.fusesource.stomp.client.Constants.ACK_MODE;
import static org.fusesource.stomp.client.Constants.BEGIN;
import static org.fusesource.stomp.client.Constants.COMMIT;
import static org.fusesource.stomp.client.Constants.CONTENT_LENGTH;
import static org.fusesource.stomp.client.Constants.DESTINATION;
import static org.fusesource.stomp.client.Constants.DISCONNECT;
import static org.fusesource.stomp.client.Constants.HOST_ID;
import static org.fusesource.stomp.client.Constants.ID;
import static org.fusesource.stomp.client.Constants.MESSAGE;
import static org.fusesource.stomp.client.Constants.MESSAGE_ID;
import static org.fusesource.stomp.client.Constants.NEWLINE;
import static org.fusesource.stomp.client.Constants.SELECTOR;
import static org.fusesource.stomp.client.Constants.SEND;
import static org.fusesource.stomp.client.Constants.SERVER;
import static org.fusesource.stomp.client.Constants.SESSION;
import static org.fusesource.stomp.client.Constants.SUBSCRIBE;
import static org.fusesource.stomp.client.Constants.SUBSCRIPTION;
import static org.fusesource.stomp.client.Constants.TRANSACTION;

import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.net.ssl.SSLContext;

import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.fusesource.stomp.client.Constants.*;
import static org.fusesource.hawtdispatch.Dispatch.*;

public class StompChannel {

private static final StompServerAdaptor STOMP_SERVER_ADAPTORS[] = new StompServerAdaptor[]{
Expand Down Expand Up @@ -64,6 +86,7 @@ public class StompChannel {
StompServerAdaptor serverAdaptor;
String clientId;
private long disconnectTimeout = 10000;
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);

public AsciiBuffer sessionId() {
return sessionId;
Expand All @@ -87,10 +110,20 @@ public StompChannel copy() {

CountDownLatch connectedLatch = new CountDownLatch(1);

private ScheduledFuture<?> heartBeatFuture;

public void connect() throws JMSException {
if (this.connected.compareAndSet(false, true)) {
try {
final Promise<CallbackConnection> future = new Promise<CallbackConnection>();
final Promise<CallbackConnection> future = new Promise<CallbackConnection>() {

@Override
public void onSuccess(final CallbackConnection value) {
rescheduleHeartBeat();
super.onSuccess(value);
}

};
Stomp stomp = new Stomp(brokerURI);
stomp.setLogin(userName);
stomp.setPasscode(password);
Expand Down Expand Up @@ -170,7 +203,7 @@ public void onSuccess(StompFrame value) {
});
}
});

stopHeartbeats();
// Wait for the disconnect to finish..
try {
cd.await(getDisconnectTimeout(), TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -199,6 +232,7 @@ public void sendMessage(StompJmsMessage copy, AsciiBuffer txid, boolean sync) th
} else {
sendFrame(frame);
}
rescheduleHeartBeat();
} catch (IOException e) {
throw StompJmsExceptionSupport.create(e);
}
Expand Down Expand Up @@ -330,6 +364,7 @@ public void onFailure(Throwable value) {
@Override
public void onSuccess(Void value) {
writeBufferRemaining.getAndAdd(size);
rescheduleHeartBeat();
}
});
}
Expand All @@ -341,6 +376,7 @@ public void onSuccess(Void value) {
@Override
public void onSuccess(Void value) {
writeBufferRemaining.getAndAdd(size);
rescheduleHeartBeat();
super.onSuccess(value);
}
};
Expand Down Expand Up @@ -575,4 +611,32 @@ public SSLContext getSslContext() {
public void setSslContext(SSLContext sslContext) {
this.sslContext = sslContext;
}

private void rescheduleHeartBeat() {
stopHeartbeats();
heartBeatFuture = scheduledThreadPoolExecutor.schedule(new Callable<Void>() {
public Void call() throws Exception {
StompFrame heartbeatFrame = new StompFrame(SEND);
heartbeatFrame.addHeader(DESTINATION, AsciiBuffer.ascii(channelId));
heartbeatFrame.content(NEWLINE);
heartbeatFrame.addContentLengthHeader();
sendFrame(heartbeatFrame);
rescheduleHeartBeat();
return null;
}
},
getSendInterval(Stomp.HEARTBEAT_INTERVAL),
TimeUnit.MILLISECONDS);
}

private long getSendInterval(String heartbeatInterval) {
return Long.parseLong(heartbeatInterval.split(",")[0]);
}

private synchronized void stopHeartbeats() {
if (null != heartBeatFuture && !(heartBeatFuture.isCancelled() || heartBeatFuture.isDone())) {
heartBeatFuture.cancel(false);
heartBeatFuture = null;
}
}
}