Skip to content

Commit

Permalink
Fixes #6 Now using ActiveMQs own Redelivery Policy
Browse files Browse the repository at this point in the history
  • Loading branch information
mbknor committed Sep 5, 2014
1 parent 870858a commit 5e17ae0
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kjetland.dropwizard.activemq.errors.JsonError;
import io.dropwizard.lifecycle.Managed;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,7 +86,7 @@ public void stop() throws Exception {
log.info("Stopped receiver for " + destination);
}

private void processMessage(Message message) {
private void processMessage(ActiveMQMessageConsumer messageConsumer, Message message) {
String json = null;
try {

Expand Down Expand Up @@ -113,6 +114,12 @@ private void processMessage(Message message) {
} catch (JMSException x) {
throw new RuntimeException(x);
}
} else {
try {
messageConsumer.rollback();
} catch (JMSException e1) {
throw new RuntimeException("Error rollbacking failed message", e1);
}
}
}
}
Expand Down Expand Up @@ -150,7 +157,7 @@ public void run() {
try {

final Destination d = destinationCreator.create(session, destination);
final MessageConsumer messageConsumer = session.createConsumer(d);
final ActiveMQMessageConsumer messageConsumer = (ActiveMQMessageConsumer)session.createConsumer(d);
try {

if (verboseInitLogging) {
Expand Down Expand Up @@ -204,15 +211,15 @@ public void run() {
log.debug("Message-checker-thread stopped");
}

private void runReceiveLoop(MessageConsumer messageConsumer) throws JMSException {
private void runReceiveLoop(ActiveMQMessageConsumer messageConsumer) throws JMSException {
while(!shouldStop.get()) {
if (log.isDebugEnabled()) {
log.debug("Checking for new message");
}
Message message = messageConsumer.receive(200);
errorsInARowCount = 0;
if (message != null) {
processMessage(message);
processMessage(messageConsumer, message);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.kjetland.dropwizard.activemq;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Optional;

import static org.junit.Assert.assertEquals;

public class ActiveMQReceiverHandlerReliveryTest {

final String url = "tcp://localhost:31219?" +
"jms.redeliveryPolicy.maximumRedeliveries=3" +
"&jms.redeliveryPolicy.initialRedeliveryDelay=100" +
"&jms.redeliveryPolicy.redeliveryDelay=100";

BrokerService broker;

@Before
public void setUp() throws Exception {
broker = new BrokerService();
// configure the broker
broker.addConnector(url);
broker.start();

errorCount = 0;
okCount = 0;
}

@After
public void tearDown() throws Exception {
broker.stop();
// Just give the broker some time to stop
Thread.sleep(1500);
}

int errorCount;
int okCount;

private void receiveMessage(String message) {

if ( message.equals("fail")) {
errorCount++;
throw new RuntimeException("Error in receiveMessage");
} else {
okCount++;
System.out.println("receiveMessage: " + message);
}
}

public boolean exceptionHandler(String message, Exception exception) {
System.out.println("exceptionHandler: " + message + " - " + exception.getMessage());
return false;
}

@Test
public void testRedeliveryQueue() throws Exception {
doTestRedelivery("queue:someQueue");
}

@Test
public void testRedeliveryTopic() throws Exception {
doTestRedelivery("topic:someTopic");
}

private void doTestRedelivery(String destinationName) throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

ObjectMapper objectMapper = new ObjectMapper();

ActiveMQReceiverHandler<String> h = new ActiveMQReceiverHandler<>(
destinationName,
connectionFactory,
(m)->receiveMessage(m),
String.class,
objectMapper,
(m,e) -> exceptionHandler(m,e),
1);

h.start();

ActiveMQSender sender = new ActiveMQSenderImpl(connectionFactory, objectMapper, destinationName, Optional.<Integer>empty(), false);

sender.sendJson("fail");
sender.sendJson("ok1");
sender.sendJson("ok2");

Thread.sleep(1000);

assertEquals(3+1, errorCount);
assertEquals(2, okCount);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.kjetland.dropwizard.activemq;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class ActiveMQReceiverHandlerTest {
Topic destinationTopic;

@Mock
MessageConsumer messageConsumer;
ActiveMQMessageConsumer messageConsumer;

ObjectMapper objectMapper;

Expand Down Expand Up @@ -213,7 +214,7 @@ public void testExceptionInMessageConsumer_ConsumerIsClosed() throws Exception {
(m)->receiveMessage(m),
String.class,
objectMapper,
(m,e) -> exceptionHandler(m,e),
(m,e) -> exceptionHandler(m, e),
1);

h.start();
Expand All @@ -224,7 +225,7 @@ public void testExceptionInMessageConsumer_ConsumerIsClosed() throws Exception {
assertTrue(receivedMessages.contains("b"));
assertTrue(receivedMessages.contains("d"));
assertEquals(3, receivedMessages.size());
assertTrue(receivedExceptions.size()==0);
assertTrue(receivedExceptions.size() == 0);
h.stop();
}
}
14 changes: 14 additions & 0 deletions src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} %X{akkaSource} %X{akkaPersistenceRecovering} - %msg%n</pattern>
</encoder>
</appender>

<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>

0 comments on commit 5e17ae0

Please sign in to comment.