Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CE-1925: Update AMQPPublishProperties class to enable setting of timeout and confirmEnabled properties #135

Merged
merged 4 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions conduit/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

<properties>
<junit-bom.version>5.8.2</junit-bom.version>
<assertj.version>3.25.3</assertj.version>
<logback.version>1.2.11</logback.version>
<slf4j.version>1.7.35</slf4j.version>
<testcontainers-bom.version>1.16.3</testcontainers-bom.version>
Expand Down Expand Up @@ -65,6 +66,12 @@
<version>4.3.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<!-- For verifying with RabbitMQ -->
<dependency>
<groupId>org.testcontainers</groupId>
Expand Down
77 changes: 73 additions & 4 deletions conduit/src/main/java/io/rtr/conduit/amqp/AMQPMessageBundle.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import com.rabbitmq.client.Envelope;
import io.rtr.conduit.amqp.transport.TransportMessageBundle;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class AMQPMessageBundle implements TransportMessageBundle {
private String consumerTag;
private Envelope envelope;
private AMQP.BasicProperties basicProperties;
private byte[] body;
private final String consumerTag;
private final Envelope envelope;
private final AMQP.BasicProperties basicProperties;
private final byte[] body;

private static AMQP.BasicProperties initialProperties() {
return initialProperties(null);
Expand Down Expand Up @@ -50,6 +51,13 @@ public AMQPMessageBundle(String message, Map<String, Object> headers) {
this(null, null, initialProperties(headers), message.getBytes());
}

private AMQPMessageBundle(final Builder builder) {
this.consumerTag = builder.consumerTag;
this.envelope = builder.envelope;
this.basicProperties = builder.basicProperties;
this.body = builder.body;
}

public String getConsumerTag() {
return consumerTag;
}
Expand All @@ -65,4 +73,65 @@ public AMQP.BasicProperties getBasicProperties() {
public byte[] getBody() {
return body;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private String consumerTag;
private Envelope envelope;
private AMQP.BasicProperties basicProperties;
private Map<String, Object> headers = new HashMap<>();
private byte[] body;

public Builder consumerTag(final String consumerTag) {
this.consumerTag = consumerTag;
return this;
}

public Builder envelope(final Envelope envelope) {
this.envelope = envelope;
return this;
}

public Builder basicProperties(final AMQP.BasicProperties basicProperties) {
this.basicProperties = basicProperties;
return this;
}

public Builder headers(final Map<String, Object> headers) {
if (headers != null) {
this.headers = new HashMap<>(headers);
}
return this;
}

public Builder header(final String name, final Object value) {
if (value == null) {
this.headers.remove(name);
} else {
this.headers.put(name, value);
}
return this;
}

public Builder body(final byte[] body) {
this.body = body;
return this;
}

public Builder body(final String body) {
return body(body.getBytes());
}

public AMQPMessageBundle build() {
if (basicProperties == null) {
this.basicProperties = initialProperties(headers);
} else if (headers != null) {
throw new IllegalArgumentException("Both basicProperties and headers are set");
}
return new AMQPMessageBundle(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@
import io.rtr.conduit.amqp.transport.TransportPublishProperties;

public class AMQPPublishProperties implements TransportPublishProperties {
private String exchange;
private String routingKey;
private long timeout;
private boolean confirmEnabled;
private final String exchange;
private final String routingKey;
private final long timeout;
private final boolean confirmEnabled;

AMQPPublishProperties(String exchange, String routingKey, long timeout, boolean confirmEnabled) {
public AMQPPublishProperties(String exchange, String routingKey) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this one be private now? We could force folks to move to the builder when upgrading

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opted to go with a non-breaking change, influence mostly by this being a public repo. But happy to change.

On review I'm actually thinking it might be a good idea for this to catch people to make them think about if their publisher config is being overridden here

this.exchange = exchange;
this.routingKey = routingKey;
this.timeout = timeout;
this.confirmEnabled = confirmEnabled;
this.timeout = 100;
this.confirmEnabled = false;
}

public AMQPPublishProperties(String exchange, String routingKey) {
this(exchange, routingKey, 100, false);
private AMQPPublishProperties(final Builder builder) {
this.exchange = builder.exchange;
this.routingKey = builder.routingKey;
this.timeout = builder.timeout;
this.confirmEnabled = builder.confirmEnabled;
}

public String getExchange() {
Expand All @@ -34,4 +37,46 @@ public long getTimeout() {
public boolean isConfirmEnabled() {
return confirmEnabled;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private String exchange;
private String routingKey;
private long timeout;
private boolean confirmEnabled;

public Builder exchange(final String exchange) {
this.exchange = exchange;
return this;
}

public Builder routingKey(final String routingKey) {
this.routingKey = routingKey;
return this;
}

public Builder timeout(final long timeout) {
this.timeout = timeout;
return this;
}

public Builder confirmEnabled(final boolean confirmEnabled) {
this.confirmEnabled = confirmEnabled;
return this;
}

public Builder of(final AMQPPublishProperties base) {
return exchange(base.getExchange())
.routingKey(base.getRoutingKey())
.timeout(base.getTimeout())
.confirmEnabled(base.isConfirmEnabled());
}

public AMQPPublishProperties build() {
return new AMQPPublishProperties(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ protected AMQPConnectionProperties buildConnectionProperties() {

@Override
protected AMQPPublishProperties buildPublishProperties() {
return new AMQPPublishProperties(exchange, routingKey, publishTimeout, confirmEnabled);
return AMQPPublishProperties.builder()
.exchange(exchange)
.routingKey(routingKey)
.timeout(publishTimeout)
.confirmEnabled(confirmEnabled)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package io.rtr.conduit.amqp;

import com.rabbitmq.client.AMQP;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class AMQPMessageBundleTest {
class AMQPMessageBundleTest {

@Test
public void testMessageBundleHeaders() {
void testMessageBundleHeaders() {
// create message with default headers
AMQPMessageBundle bundle1 = new AMQPMessageBundle("test");
Map<String, Object> headers1 = new HashMap<>(bundle1.getBasicProperties().getHeaders());
Expand All @@ -28,7 +33,40 @@ public void testMessageBundleHeaders() {
AMQPMessageBundle bundle2 = new AMQPMessageBundle("test", headers2);
headers2 = new HashMap<>(bundle2.getBasicProperties().getHeaders());

assertTrue(headers2.size() > 0);
assertFalse(headers2.isEmpty());
assertEquals(headers1, headers2);
}

@Test
void buildMessageWithHeaders_populatesPropertiesAndBody() {
final AMQPMessageBundle messageBundle = AMQPMessageBundle.builder()
.header("foo", 1)
.header("foo", null)
.header("bar", "baz")
.header("foo2", 2)
.body("A message")
.build();

assertThat(messageBundle.getBasicProperties())
.isNotNull()
.extracting(AMQP.BasicProperties::getHeaders)
.satisfies(headers -> assertThat(headers)
.containsEntry("bar", "baz")
.containsEntry("foo2", 2)
.doesNotContainKey("foo"));
assertThat(messageBundle.getBody())
.satisfies(bytes -> assertThat(new String(bytes))
.isEqualTo("A message"));
}

@Test
void settingBothBasicPropertiesAndHeaders_throws() {
final AMQPMessageBundle.Builder builder = AMQPMessageBundle.builder()
.basicProperties(new AMQP.BasicProperties())
.headers(singletonMap("foo", 1));

assertThatThrownBy(builder::build)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Both basicProperties and headers are set");
}
}
Loading
Loading