Skip to content

Commit

Permalink
GH-1480: Switch to CompletableFuture in s-r-stream
Browse files Browse the repository at this point in the history
Resolves #1480
  • Loading branch information
garyrussell authored and artembilan committed Jul 28, 2022
1 parent 9e04fb1 commit 67bfec9
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.rabbit.stream.producer;

import java.util.concurrent.CompletableFuture;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;

import com.rabbitmq.stream.MessageBuilder;

/**
* Provides methods for sending messages using a RabbitMQ Stream producer,
* returning {@link CompletableFuture}.
*
* @author Gary Russell
* @since 2.4.7
*
*/
public interface RabbitStreamOperations2 extends AutoCloseable {

/**
* Send a Spring AMQP message.
* @param message the message.
* @return a future to indicate success/failure.
*/
CompletableFuture<Boolean> send(Message message);

/**
* Convert to and send a Spring AMQP message.
* @param message the payload.
* @return a future to indicate success/failure.
*/
CompletableFuture<Boolean> convertAndSend(Object message);

/**
* Convert to and send a Spring AMQP message. If a {@link MessagePostProcessor} is
* provided and returns {@code null}, the message is not sent and the future is
* completed with {@code false}.
* @param message the payload.
* @param mpp a message post processor.
* @return a future to indicate success/failure.
*/
CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

/**
* Send a native stream message.
* @param message the message.
* @return a future to indicate success/failure.
* @see #messageBuilder()
*/
CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);

/**
* Return the producer's {@link MessageBuilder} to create native stream messages.
* @return the builder.
* @see #send(com.rabbitmq.stream.Message)
*/
MessageBuilder messageBuilder();

/**
* Return the message converter.
* @return the converter.
*/
MessageConverter messageConverter();

/**
* Return the stream message converter.
* @return the converter;
*/
StreamMessageConverter streamMessageConverter();

@Override
default void close() throws AmqpException {
// narrow exception to avoid compiler warning - see
// https://bugs.openjdk.java.net/browse/JDK-8155591
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@
*
* @author Gary Russell
* @since 2.4
*
* @deprecated in favor of {@link RabbitStreamTemplate2}.
*/
@Deprecated
public class RabbitStreamTemplate implements RabbitStreamOperations, BeanNameAware {

protected final LogAccessor logger = new LogAccessor(getClass()); // NOSONAR
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.rabbit.stream.producer;

import java.util.concurrent.CompletableFuture;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.rabbit.stream.support.converter.DefaultStreamMessageConverter;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
import org.springframework.util.Assert;

import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.ProducerBuilder;

/**
* Default implementation of {@link RabbitStreamOperations}.
*
* @author Gary Russell
* @since 2.4.7
*
*/
public class RabbitStreamTemplate2 implements RabbitStreamOperations2, BeanNameAware {

protected final LogAccessor logger = new LogAccessor(getClass()); // NOSONAR

private final Environment environment;

private final String streamName;

private MessageConverter messageConverter = new SimpleMessageConverter();

private StreamMessageConverter streamConverter = new DefaultStreamMessageConverter();

private boolean streamConverterSet;

private Producer producer;

private String beanName;

private ProducerCustomizer producerCustomizer = (name, builder) -> { };

/**
* Construct an instance with the provided {@link Environment}.
* @param environment the environment.
* @param streamName the stream name.
*/
public RabbitStreamTemplate2(Environment environment, String streamName) {
Assert.notNull(environment, "'environment' cannot be null");
Assert.notNull(streamName, "'streamName' cannot be null");
this.environment = environment;
this.streamName = streamName;
}


private synchronized Producer createOrGetProducer() {
if (this.producer == null) {
ProducerBuilder builder = this.environment.producerBuilder();
builder.stream(this.streamName);
this.producerCustomizer.accept(this.beanName, builder);
this.producer = builder.build();
if (!this.streamConverterSet) {
((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier(
() -> this.producer.messageBuilder());
}
}
return this.producer;
}

@Override
public synchronized void setBeanName(String name) {
this.beanName = name;
}

/**
* Set a converter for {@link #convertAndSend(Object)} operations.
* @param messageConverter the converter.
*/
public void setMessageConverter(MessageConverter messageConverter) {
Assert.notNull(messageConverter, "'messageConverter' cannot be null");
this.messageConverter = messageConverter;
}

/**
* Set a converter to convert from {@link Message} to {@link com.rabbitmq.stream.Message}
* for {@link #send(Message)} and {@link #convertAndSend(Object)} methods.
* @param streamConverter the converter.
*/
public synchronized void setStreamConverter(StreamMessageConverter streamConverter) {
Assert.notNull(streamConverter, "'streamConverter' cannot be null");
this.streamConverter = streamConverter;
this.streamConverterSet = true;
}

/**
* Used to customize the {@link ProducerBuilder} before the {@link Producer} is built.
* @param producerCustomizer the customizer;
*/
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
Assert.notNull(producerCustomizer, "'producerCustomizer' cannot be null");
this.producerCustomizer = producerCustomizer;
}

@Override
public MessageConverter messageConverter() {
return this.messageConverter;
}


@Override
public StreamMessageConverter streamMessageConverter() {
return this.streamConverter;
}


@Override
public CompletableFuture<Boolean> send(Message message) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
createOrGetProducer().send(this.streamConverter.fromMessage(message), handleConfirm(future));
return future;
}

@Override
public CompletableFuture<Boolean> convertAndSend(Object message) {
return convertAndSend(message, null);
}

@Override
public CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp) {
Message message2 = this.messageConverter.toMessage(message, new StreamMessageProperties());
Assert.notNull(message2, "The message converter returned null");
if (mpp != null) {
message2 = mpp.postProcessMessage(message2);
if (message2 == null) {
this.logger.debug("Message Post Processor returned null, message not sent");
CompletableFuture<Boolean> future = new CompletableFuture<>();
future.complete(false);
return future;
}
}
return send(message2);
}


@Override
public CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
createOrGetProducer().send(message, handleConfirm(future));
return future;
}

@Override
public MessageBuilder messageBuilder() {
return createOrGetProducer().messageBuilder();
}

private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future) {
return confStatus -> {
if (confStatus.isConfirmed()) {
future.complete(true);
}
else {
int code = confStatus.getCode();
String errorMessage;
switch (code) {
case Constants.CODE_MESSAGE_ENQUEUEING_FAILED:
errorMessage = "Message Enqueueing Failed";
break;
case Constants.CODE_PRODUCER_CLOSED:
errorMessage = "Producer Closed";
break;
case Constants.CODE_PRODUCER_NOT_AVAILABLE:
errorMessage = "Producer Not Available";
break;
case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT:
errorMessage = "Publish Confirm Timeout";
break;
default:
errorMessage = "Unknown code: " + code;
break;
}
future.completeExceptionally(new StreamSendException(errorMessage, code));
}
};
}

/**
* {@inheritDoc}
* <p>
* <b>Close the underlying producer; a new producer will be created on the next
* operation that requires one.</b>
*/
@Override
public synchronized void close() {
if (this.producer != null) {
this.producer.close();
this.producer = null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate2;
import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
Expand Down Expand Up @@ -70,7 +70,7 @@ public class RabbitListenerTests extends AbstractIntegrationTests {
Config config;

@Test
void simple(@Autowired RabbitStreamTemplate template) throws Exception {
void simple(@Autowired RabbitStreamTemplate2 template) throws Exception {
Future<Boolean> future = template.convertAndSend("foo");
assertThat(future.get(10, TimeUnit.SECONDS)).isTrue();
future = template.convertAndSend("bar", msg -> msg);
Expand Down Expand Up @@ -247,8 +247,8 @@ RabbitTemplate template(CachingConnectionFactory cf) {
}

@Bean
RabbitStreamTemplate streamTemplate1(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
RabbitStreamTemplate2 streamTemplate1(Environment env) {
RabbitStreamTemplate2 template = new RabbitStreamTemplate2(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}
Expand Down
3 changes: 3 additions & 0 deletions src/reference/asciidoc/stream.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ The `ProducerCustomizer` provides a mechanism to customize the producer before i

Refer to the https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/[Java Client Documentation] about customizing the `Environment` and `Producer`.

IMPORTANT: In version 2.4.7 `RabbitStreamOperations` and `RabbitStreamTemplate` have been deprecated in favor of `RabbitStreamOperations2` and `RabbitStreamTemplate2` respectively; they return `CompletableFuture` instead of `ListenableFuture`.
`RabbitStreamOperations` and `RabbitStreamTemplate` will be removed in 3.0.

==== Receiving Messages

Asynchronous message reception is provided by the `StreamListenerContainer` (and the `StreamRabbitListenerContainerFactory` when using `@RabbitListener`).
Expand Down
5 changes: 5 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ See <<json-message-converter>> for more information.

The `AsyncRabbitTemplate` is deprecated in favor of `AsyncRabbitTemplate2` which returns `CompletableFuture` s instead of `ListenableFuture` s.
See <<async-template>> for more information.

==== Stream Support Changes

`RabbitStreamOperations` and `RabbitStreamTemplate` have been deprecated in favor of `RabbitStreamOperations2` and `RabbitStreamTemplate2` respectively; they return `CompletableFuture` instead of `ListenableFuture`.
See <<stream-support>> for more information.

0 comments on commit 67bfec9

Please sign in to comment.