diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamOperations2.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamOperations2.java new file mode 100644 index 0000000000..a60c489c57 --- /dev/null +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamOperations2.java @@ -0,0 +1,97 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rabbit.stream.producer; + +import java.util.concurrent.CompletableFuture; + +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.lang.Nullable; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; + +import com.rabbitmq.stream.MessageBuilder; + +/** + * Provides methods for sending messages using a RabbitMQ Stream producer, + * returning {@link CompletableFuture}. + * + * @author Gary Russell + * @since 2.4.7 + * + */ +public interface RabbitStreamOperations2 extends AutoCloseable { + + /** + * Send a Spring AMQP message. + * @param message the message. + * @return a future to indicate success/failure. + */ + CompletableFuture send(Message message); + + /** + * Convert to and send a Spring AMQP message. + * @param message the payload. + * @return a future to indicate success/failure. + */ + CompletableFuture convertAndSend(Object message); + + /** + * Convert to and send a Spring AMQP message. If a {@link MessagePostProcessor} is + * provided and returns {@code null}, the message is not sent and the future is + * completed with {@code false}. + * @param message the payload. + * @param mpp a message post processor. + * @return a future to indicate success/failure. + */ + CompletableFuture convertAndSend(Object message, @Nullable MessagePostProcessor mpp); + + /** + * Send a native stream message. + * @param message the message. + * @return a future to indicate success/failure. + * @see #messageBuilder() + */ + CompletableFuture send(com.rabbitmq.stream.Message message); + + /** + * Return the producer's {@link MessageBuilder} to create native stream messages. + * @return the builder. + * @see #send(com.rabbitmq.stream.Message) + */ + MessageBuilder messageBuilder(); + + /** + * Return the message converter. + * @return the converter. + */ + MessageConverter messageConverter(); + + /** + * Return the stream message converter. + * @return the converter; + */ + StreamMessageConverter streamMessageConverter(); + + @Override + default void close() throws AmqpException { + // narrow exception to avoid compiler warning - see + // https://bugs.openjdk.java.net/browse/JDK-8155591 + } + +} diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java index e8c14bc6c6..ca3a89ded3 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java @@ -42,8 +42,9 @@ * * @author Gary Russell * @since 2.4 - * + * @deprecated in favor of {@link RabbitStreamTemplate2}. */ +@Deprecated public class RabbitStreamTemplate implements RabbitStreamOperations, BeanNameAware { protected final LogAccessor logger = new LogAccessor(getClass()); // NOSONAR diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate2.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate2.java new file mode 100644 index 0000000000..236d067561 --- /dev/null +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate2.java @@ -0,0 +1,225 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rabbit.stream.producer; + +import java.util.concurrent.CompletableFuture; + +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.amqp.support.converter.SimpleMessageConverter; +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.core.log.LogAccessor; +import org.springframework.lang.Nullable; +import org.springframework.rabbit.stream.support.StreamMessageProperties; +import org.springframework.rabbit.stream.support.converter.DefaultStreamMessageConverter; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; +import org.springframework.util.Assert; + +import com.rabbitmq.stream.ConfirmationHandler; +import com.rabbitmq.stream.Constants; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.MessageBuilder; +import com.rabbitmq.stream.Producer; +import com.rabbitmq.stream.ProducerBuilder; + +/** + * Default implementation of {@link RabbitStreamOperations}. + * + * @author Gary Russell + * @since 2.4.7 + * + */ +public class RabbitStreamTemplate2 implements RabbitStreamOperations2, BeanNameAware { + + protected final LogAccessor logger = new LogAccessor(getClass()); // NOSONAR + + private final Environment environment; + + private final String streamName; + + private MessageConverter messageConverter = new SimpleMessageConverter(); + + private StreamMessageConverter streamConverter = new DefaultStreamMessageConverter(); + + private boolean streamConverterSet; + + private Producer producer; + + private String beanName; + + private ProducerCustomizer producerCustomizer = (name, builder) -> { }; + + /** + * Construct an instance with the provided {@link Environment}. + * @param environment the environment. + * @param streamName the stream name. + */ + public RabbitStreamTemplate2(Environment environment, String streamName) { + Assert.notNull(environment, "'environment' cannot be null"); + Assert.notNull(streamName, "'streamName' cannot be null"); + this.environment = environment; + this.streamName = streamName; + } + + + private synchronized Producer createOrGetProducer() { + if (this.producer == null) { + ProducerBuilder builder = this.environment.producerBuilder(); + builder.stream(this.streamName); + this.producerCustomizer.accept(this.beanName, builder); + this.producer = builder.build(); + if (!this.streamConverterSet) { + ((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier( + () -> this.producer.messageBuilder()); + } + } + return this.producer; + } + + @Override + public synchronized void setBeanName(String name) { + this.beanName = name; + } + + /** + * Set a converter for {@link #convertAndSend(Object)} operations. + * @param messageConverter the converter. + */ + public void setMessageConverter(MessageConverter messageConverter) { + Assert.notNull(messageConverter, "'messageConverter' cannot be null"); + this.messageConverter = messageConverter; + } + + /** + * Set a converter to convert from {@link Message} to {@link com.rabbitmq.stream.Message} + * for {@link #send(Message)} and {@link #convertAndSend(Object)} methods. + * @param streamConverter the converter. + */ + public synchronized void setStreamConverter(StreamMessageConverter streamConverter) { + Assert.notNull(streamConverter, "'streamConverter' cannot be null"); + this.streamConverter = streamConverter; + this.streamConverterSet = true; + } + + /** + * Used to customize the {@link ProducerBuilder} before the {@link Producer} is built. + * @param producerCustomizer the customizer; + */ + public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) { + Assert.notNull(producerCustomizer, "'producerCustomizer' cannot be null"); + this.producerCustomizer = producerCustomizer; + } + + @Override + public MessageConverter messageConverter() { + return this.messageConverter; + } + + + @Override + public StreamMessageConverter streamMessageConverter() { + return this.streamConverter; + } + + + @Override + public CompletableFuture send(Message message) { + CompletableFuture future = new CompletableFuture<>(); + createOrGetProducer().send(this.streamConverter.fromMessage(message), handleConfirm(future)); + return future; + } + + @Override + public CompletableFuture convertAndSend(Object message) { + return convertAndSend(message, null); + } + + @Override + public CompletableFuture convertAndSend(Object message, @Nullable MessagePostProcessor mpp) { + Message message2 = this.messageConverter.toMessage(message, new StreamMessageProperties()); + Assert.notNull(message2, "The message converter returned null"); + if (mpp != null) { + message2 = mpp.postProcessMessage(message2); + if (message2 == null) { + this.logger.debug("Message Post Processor returned null, message not sent"); + CompletableFuture future = new CompletableFuture<>(); + future.complete(false); + return future; + } + } + return send(message2); + } + + + @Override + public CompletableFuture send(com.rabbitmq.stream.Message message) { + CompletableFuture future = new CompletableFuture<>(); + createOrGetProducer().send(message, handleConfirm(future)); + return future; + } + + @Override + public MessageBuilder messageBuilder() { + return createOrGetProducer().messageBuilder(); + } + + private ConfirmationHandler handleConfirm(CompletableFuture future) { + return confStatus -> { + if (confStatus.isConfirmed()) { + future.complete(true); + } + else { + int code = confStatus.getCode(); + String errorMessage; + switch (code) { + case Constants.CODE_MESSAGE_ENQUEUEING_FAILED: + errorMessage = "Message Enqueueing Failed"; + break; + case Constants.CODE_PRODUCER_CLOSED: + errorMessage = "Producer Closed"; + break; + case Constants.CODE_PRODUCER_NOT_AVAILABLE: + errorMessage = "Producer Not Available"; + break; + case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT: + errorMessage = "Publish Confirm Timeout"; + break; + default: + errorMessage = "Unknown code: " + code; + break; + } + future.completeExceptionally(new StreamSendException(errorMessage, code)); + } + }; + } + + /** + * {@inheritDoc} + *

+ * Close the underlying producer; a new producer will be created on the next + * operation that requires one. + */ + @Override + public synchronized void close() { + if (this.producer != null) { + this.producer.close(); + this.producer = null; + } + } + +} diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java index b0f914d7ba..652ee1d8f0 100644 --- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java @@ -42,7 +42,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; -import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate2; import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean; import org.springframework.rabbit.stream.support.StreamMessageProperties; import org.springframework.retry.interceptor.RetryOperationsInterceptor; @@ -70,7 +70,7 @@ public class RabbitListenerTests extends AbstractIntegrationTests { Config config; @Test - void simple(@Autowired RabbitStreamTemplate template) throws Exception { + void simple(@Autowired RabbitStreamTemplate2 template) throws Exception { Future future = template.convertAndSend("foo"); assertThat(future.get(10, TimeUnit.SECONDS)).isTrue(); future = template.convertAndSend("bar", msg -> msg); @@ -247,8 +247,8 @@ RabbitTemplate template(CachingConnectionFactory cf) { } @Bean - RabbitStreamTemplate streamTemplate1(Environment env) { - RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1"); + RabbitStreamTemplate2 streamTemplate1(Environment env) { + RabbitStreamTemplate2 template = new RabbitStreamTemplate2(env, "test.stream.queue1"); template.setProducerCustomizer((name, builder) -> builder.name("test")); return template; } diff --git a/src/reference/asciidoc/stream.adoc b/src/reference/asciidoc/stream.adoc index 5e24395bee..5849eca26e 100644 --- a/src/reference/asciidoc/stream.adoc +++ b/src/reference/asciidoc/stream.adoc @@ -67,6 +67,9 @@ The `ProducerCustomizer` provides a mechanism to customize the producer before i Refer to the https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/[Java Client Documentation] about customizing the `Environment` and `Producer`. +IMPORTANT: In version 2.4.7 `RabbitStreamOperations` and `RabbitStreamTemplate` have been deprecated in favor of `RabbitStreamOperations2` and `RabbitStreamTemplate2` respectively; they return `CompletableFuture` instead of `ListenableFuture`. +`RabbitStreamOperations` and `RabbitStreamTemplate` will be removed in 3.0. + ==== Receiving Messages Asynchronous message reception is provided by the `StreamListenerContainer` (and the `StreamRabbitListenerContainerFactory` when using `@RabbitListener`). diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 5b4068dc10..d067dce333 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -33,3 +33,8 @@ See <> for more information. The `AsyncRabbitTemplate` is deprecated in favor of `AsyncRabbitTemplate2` which returns `CompletableFuture` s instead of `ListenableFuture` s. See <> for more information. + +==== Stream Support Changes + +`RabbitStreamOperations` and `RabbitStreamTemplate` have been deprecated in favor of `RabbitStreamOperations2` and `RabbitStreamTemplate2` respectively; they return `CompletableFuture` instead of `ListenableFuture`. +See <> for more information.