Skip to content

Commit

Permalink
Improve rabbit template
Browse files Browse the repository at this point in the history
* Remove double synchronized

* Polish `receiveForReply`

Change `receiveMessage` from method argument to local variable since it's always passed as null

* Remove exception casting in `consumerDelivery`

Since `doExecute()` eventually will still wrap all exceptions using `RabbitExceptionTranslator.convertRabbitAccessException(e)`

As now exceptions are being re-thrown move consumer cancelling logic inside finally block

* Guard `setReplyAddress()`

Use same lock as `evaluateFastReplyTo()` is guarded via `RabbitTemplate` lock in `doSendAndReceive()`

* Remove locking on `publisherConfirmChannels` map

When getting unconfirmed messages count/correlation data remove locking on entire map
since concurrent map iterator's are thread safe and underlying Channel's implementation (`PublisherCallbackChannelImpl`) already use
synchronization in expire and `getPendingConfirmsCount()`

* Extract common consumer canceling logic

* Access `directReplyToContainers` map only via lock

* remove double checking for null container

* change copyright

* Polishing - PR comments

* Revert "Access directReplyToContainers map only via lock"

This reverts commit 489df24
  • Loading branch information
marknorkin authored and artembilan committed Feb 7, 2018
1 parent 18b2090 commit 84ca73e
Showing 1 changed file with 37 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -135,6 +135,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Ernest Sadykov
* @author Mark Norkin
* @since 1.0
*/
public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware, RabbitOperations, MessageListener,
Expand Down Expand Up @@ -341,7 +342,7 @@ public String getEncoding() {
* exchange and routing key.
* @param replyAddress the replyAddress to set
*/
public void setReplyAddress(String replyAddress) {
public synchronized void setReplyAddress(String replyAddress) {
this.replyAddress = replyAddress;
this.evaluatedFastReplyTo = false;
}
Expand Down Expand Up @@ -726,13 +727,11 @@ public Collection<String> expectedQueueNames() {
*/
public Collection<CorrelationData> getUnconfirmed(long age) {
Set<CorrelationData> unconfirmed = new HashSet<>();
synchronized (this.publisherConfirmChannels) {
long cutoffTime = System.currentTimeMillis() - age;
for (Channel channel : this.publisherConfirmChannels.keySet()) {
Collection<PendingConfirm> confirms = ((PublisherCallbackChannel) channel).expire(this, cutoffTime);
for (PendingConfirm confirm : confirms) {
unconfirmed.add(confirm.getCorrelationData());
}
long cutoffTime = System.currentTimeMillis() - age;
for (Channel channel : this.publisherConfirmChannels.keySet()) {
Collection<PendingConfirm> confirms = ((PublisherCallbackChannel) channel).expire(this, cutoffTime);
for (PendingConfirm confirm : confirms) {
unconfirmed.add(confirm.getCorrelationData());
}
}
return unconfirmed.size() > 0 ? unconfirmed : null;
Expand All @@ -744,12 +743,10 @@ public Collection<CorrelationData> getUnconfirmed(long age) {
* @since 2.0
*/
public int getUnconfirmedCount() {
synchronized (this.publisherConfirmChannels) {
return this.publisherConfirmChannels.keySet()
.stream()
.mapToInt(channel -> ((PublisherCallbackChannel) channel).getPendingConfirmsCount(this))
.sum();
}
return this.publisherConfirmChannels.keySet()
.stream()
.mapToInt(channel -> ((PublisherCallbackChannel) channel).getPendingConfirmsCount(this))
.sum();
}

@Override
Expand Down Expand Up @@ -788,11 +785,9 @@ protected void doStop() {
@Override
public boolean isRunning() {
synchronized (this.directReplyToContainers) {
synchronized (this.directReplyToContainers) {
return this.directReplyToContainers.values()
.stream()
.anyMatch(AbstractMessageListenerContainer::isRunning);
}
return this.directReplyToContainers.values()
.stream()
.anyMatch(AbstractMessageListenerContainer::isRunning);
}
}

Expand Down Expand Up @@ -1138,19 +1133,18 @@ public <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<
private <R, S> boolean doReceiveAndReply(final String queueName, final ReceiveAndReplyCallback<R, S> callback,
final ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
return execute(channel -> {
Message receiveMessage = null;

receiveMessage = receiveForReply(queueName, channel, receiveMessage);
Message receiveMessage = receiveForReply(queueName, channel);
if (receiveMessage != null) {
return sendReply(callback, replyToAddressCallback, channel, receiveMessage);
}
return false;
}, obtainTargetConnectionFactory(this.receiveConnectionFactorySelectorExpression, queueName));
}

private Message receiveForReply(final String queueName, Channel channel, Message receiveMessage) throws Exception {
private Message receiveForReply(final String queueName, Channel channel) throws Exception {
boolean channelTransacted = isChannelTransacted();
boolean channelLocallyTransacted = isChannelLocallyTransacted(channel);
Message receiveMessage = null;
if (RabbitTemplate.this.receiveTimeout == 0) {
GetResponse response = channel.basicGet(queueName, !channelTransacted);
// Response can be null in the case that there is no message on the queue.
Expand Down Expand Up @@ -1189,7 +1183,7 @@ else if (channelTransacted) {

private Delivery consumeDelivery(Channel channel, String queueName, long timeoutMillis) throws Exception {
Delivery delivery = null;
Throwable exception = null;
RuntimeException exception = null;
CompletableFuture<Delivery> future = new CompletableFuture<>();
DefaultConsumer consumer = createConsumer(queueName, channel, future,
timeoutMillis < 0 ? DEFAULT_CONSUME_TIMEOUT : timeoutMillis);
Expand All @@ -1202,34 +1196,20 @@ private Delivery consumeDelivery(Channel channel, String queueName, long timeout
}
}
catch (ExecutionException e) {
this.logger.error("Consumer failed to receive message: " + consumer, e.getCause());
exception = e.getCause();
Throwable cause = e.getCause();
this.logger.error("Consumer failed to receive message: " + consumer, cause);
exception = RabbitExceptionTranslator.convertRabbitAccessException(cause);
throw exception;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (TimeoutException e) {
// no result in time
}
try {
finally {
if (exception == null || !(exception instanceof ConsumerCancelledException)) {
channel.basicCancel(consumer.getConsumerTag());
}
}
catch (Exception e) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Failed to cancel consumer: " + consumer, e);
}
}
if (exception != null) {
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
}
else if (exception instanceof Error) {
throw (Error) exception;
}
else {
throw new AmqpException(exception);
cancelConsumerQuietly(channel, consumer);
}
}
return delivery;
Expand Down Expand Up @@ -1606,16 +1586,23 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
}
finally {
this.replyHolder.remove(messageTag);
try {
channel.basicCancel(consumerTag);
}
catch (Exception e) {
}
cancelConsumerQuietly(channel, consumer);
}
return reply;
}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
}

private void cancelConsumerQuietly(Channel channel, DefaultConsumer consumer) {
try {
channel.basicCancel(consumer.getConsumerTag());
}
catch (Exception e) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Failed to cancel consumer: " + consumer, e);
}
}
}

protected Message doSendAndReceiveWithFixed(final String exchange, final String routingKey, final Message message,
final CorrelationData correlationData) {
Assert.state(this.isListener, () -> "RabbitTemplate is not configured as MessageListener - "
Expand Down

0 comments on commit 84ca73e

Please sign in to comment.