Skip to content

Commit

Permalink
GH-1181: Fix memory leak with user correlation
Browse files Browse the repository at this point in the history
Resolves #1181
  • Loading branch information
garyrussell authored and artembilan committed Apr 9, 2020
1 parent ed41369 commit 15f1ed5
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1929,25 +1929,19 @@ private Message doSendAndReceiveWithDirect(String exchange, String routingKey, M
private Message doSendAndReceiveAsListener(final String exchange, final String routingKey, final Message message,
final CorrelationData correlationData, Channel channel) throws Exception { // NOSONAR
final PendingReply pendingReply = new PendingReply();
String messageTag = String.valueOf(this.messageTagProvider.incrementAndGet());
String messageTag = null;
if (this.userCorrelationId) {
String correlationId;
if (this.correlationKey != null) {
correlationId = (String) message.getMessageProperties().getHeaders().get(this.correlationKey);
messageTag = (String) message.getMessageProperties().getHeaders().get(this.correlationKey);
}
else {
correlationId = message.getMessageProperties().getCorrelationId();
}
if (correlationId == null) {
this.replyHolder.put(messageTag, pendingReply);
}
else {
this.replyHolder.put(correlationId, pendingReply);
messageTag = message.getMessageProperties().getCorrelationId();
}
}
else {
this.replyHolder.put(messageTag, pendingReply);
if (messageTag == null) {
messageTag = String.valueOf(this.messageTagProvider.incrementAndGet());
}
this.replyHolder.put(messageTag, pendingReply);
saveAndSetProperties(message, pendingReply, messageTag);

if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -659,6 +659,7 @@ public void testSendInExternalTransactionWithRollback() throws Exception {
assertThat(result).isEqualTo(null);
}

@SuppressWarnings("unchecked")
@Test
public void testAtomicSendAndReceive() throws Exception {
final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
Expand Down Expand Up @@ -689,10 +690,12 @@ public void testAtomicSendAndReceive() throws Exception {
// Message was consumed so nothing left on queue
reply = template.receive();
assertThat(reply).isEqualTo(null);
assertThat(TestUtils.getPropertyValue(template, "replyHolder", Map.class)).hasSize(0);
template.stop();
cachingConnectionFactory.destroy();
}

@SuppressWarnings("unchecked")
@Test
public void testAtomicSendAndReceiveUserCorrelation() throws Exception {
final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
Expand Down Expand Up @@ -732,6 +735,7 @@ public void testAtomicSendAndReceiveUserCorrelation() throws Exception {
// Message was consumed so nothing left on queue
reply = template.receive();
assertThat(reply).isEqualTo(null);
assertThat(TestUtils.getPropertyValue(template, "replyHolder", Map.class)).hasSize(0);
template.stop();
container.stop();
cachingConnectionFactory.destroy();
Expand Down Expand Up @@ -1330,6 +1334,7 @@ public void testSendAndReceiveNeverFastWitReplyQueue() {
sendAndReceiveFastGuts(true, true, false);
}

@SuppressWarnings("unchecked")
private void sendAndReceiveFastGuts(boolean tempQueue, boolean setDirectReplyToExplicitly, boolean expectUsedTemp) {
RabbitTemplate template = createSendAndReceiveRabbitTemplate(this.connectionFactory);
try {
Expand Down Expand Up @@ -1368,6 +1373,7 @@ public Message handleMessage(Message message) {
else {
assertThat(replyToWas.get()).startsWith(Address.AMQ_RABBITMQ_REPLY_TO);
}
assertThat(TestUtils.getPropertyValue(template, "replyHolder", Map.class)).hasSize(0);
}
catch (Exception e) {
assertThat(e.getCause().getCause().getMessage()).contains("404");
Expand All @@ -1378,6 +1384,7 @@ public Message handleMessage(Message message) {
}
}

@SuppressWarnings("unchecked")
@Test
public void testReplyCompressionWithContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
Expand Down Expand Up @@ -1406,6 +1413,7 @@ public String handleMessage(String message) {
GUnzipPostProcessor unzipper = new GUnzipPostProcessor();
reply = unzipper.postProcessMessage(reply);
assertThat(new String(reply.getBody())).isEqualTo("FOO");
assertThat(TestUtils.getPropertyValue(template, "replyHolder", Map.class)).hasSize(0);
}
finally {
template.stop();
Expand All @@ -1420,7 +1428,7 @@ protected RabbitTemplate createSendAndReceiveRabbitTemplate(ConnectionFactory co
}

@Test
public void testDegugLogOnPassiveDeclaration() {
public void testDebugLogOnPassiveDeclaration() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
Log logger = spy(TestUtils.getPropertyValue(connectionFactory, "logger", Log.class));
doReturn(true).when(logger).isDebugEnabled();
Expand Down

0 comments on commit 15f1ed5

Please sign in to comment.