Skip to content

Commit

Permalink
CE-1925: Update AMQPPublishProperties class to enable setting of time…
Browse files Browse the repository at this point in the history
…out and confirmEnabled properties (#135)

* CE-1925: Update AMQPPublishProperties to allow setting of timeout and confirmEnabled properties when overriding during a message publish. Also added builder to AMQPMessageBundle class

* CE-1925: Remove unnecessary public modifiers on Jupiter tests. Resolve issue with nested AMQPTransportTest

* CE-1925: Resolve "Refactor the code of the lambda to have only one invocation possibly throwing a runtime exception" issues in unit tests

* CE-1925: Remove public constructor from AMQPPublishProperties. Enforces builder usage. Limited parameter constructor could also result in inadvertent overriding of constructed publisher config at publish time
  • Loading branch information
jonhanley authored Mar 13, 2024
1 parent 3a6a105 commit 2b953d5
Show file tree
Hide file tree
Showing 16 changed files with 351 additions and 166 deletions.
7 changes: 7 additions & 0 deletions conduit/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

<properties>
<junit-bom.version>5.8.2</junit-bom.version>
<assertj.version>3.25.3</assertj.version>
<logback.version>1.2.11</logback.version>
<slf4j.version>1.7.35</slf4j.version>
<testcontainers-bom.version>1.16.3</testcontainers-bom.version>
Expand Down Expand Up @@ -65,6 +66,12 @@
<version>4.3.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<!-- For verifying with RabbitMQ -->
<dependency>
<groupId>org.testcontainers</groupId>
Expand Down
77 changes: 73 additions & 4 deletions conduit/src/main/java/io/rtr/conduit/amqp/AMQPMessageBundle.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import com.rabbitmq.client.Envelope;
import io.rtr.conduit.amqp.transport.TransportMessageBundle;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class AMQPMessageBundle implements TransportMessageBundle {
private String consumerTag;
private Envelope envelope;
private AMQP.BasicProperties basicProperties;
private byte[] body;
private final String consumerTag;
private final Envelope envelope;
private final AMQP.BasicProperties basicProperties;
private final byte[] body;

private static AMQP.BasicProperties initialProperties() {
return initialProperties(null);
Expand Down Expand Up @@ -50,6 +51,13 @@ public AMQPMessageBundle(String message, Map<String, Object> headers) {
this(null, null, initialProperties(headers), message.getBytes());
}

private AMQPMessageBundle(final Builder builder) {
this.consumerTag = builder.consumerTag;
this.envelope = builder.envelope;
this.basicProperties = builder.basicProperties;
this.body = builder.body;
}

public String getConsumerTag() {
return consumerTag;
}
Expand All @@ -65,4 +73,65 @@ public AMQP.BasicProperties getBasicProperties() {
public byte[] getBody() {
return body;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private String consumerTag;
private Envelope envelope;
private AMQP.BasicProperties basicProperties;
private Map<String, Object> headers = new HashMap<>();
private byte[] body;

public Builder consumerTag(final String consumerTag) {
this.consumerTag = consumerTag;
return this;
}

public Builder envelope(final Envelope envelope) {
this.envelope = envelope;
return this;
}

public Builder basicProperties(final AMQP.BasicProperties basicProperties) {
this.basicProperties = basicProperties;
return this;
}

public Builder headers(final Map<String, Object> headers) {
if (headers != null) {
this.headers = new HashMap<>(headers);
}
return this;
}

public Builder header(final String name, final Object value) {
if (value == null) {
this.headers.remove(name);
} else {
this.headers.put(name, value);
}
return this;
}

public Builder body(final byte[] body) {
this.body = body;
return this;
}

public Builder body(final String body) {
return body(body.getBytes());
}

public AMQPMessageBundle build() {
if (basicProperties == null) {
this.basicProperties = initialProperties(headers);
} else if (headers != null) {
throw new IllegalArgumentException("Both basicProperties and headers are set");
}
return new AMQPMessageBundle(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,16 @@
import io.rtr.conduit.amqp.transport.TransportPublishProperties;

public class AMQPPublishProperties implements TransportPublishProperties {
private String exchange;
private String routingKey;
private long timeout;
private boolean confirmEnabled;

AMQPPublishProperties(String exchange, String routingKey, long timeout, boolean confirmEnabled) {
this.exchange = exchange;
this.routingKey = routingKey;
this.timeout = timeout;
this.confirmEnabled = confirmEnabled;
}
private final String exchange;
private final String routingKey;
private final long timeout;
private final boolean confirmEnabled;

public AMQPPublishProperties(String exchange, String routingKey) {
this(exchange, routingKey, 100, false);
private AMQPPublishProperties(final Builder builder) {
this.exchange = builder.exchange;
this.routingKey = builder.routingKey;
this.timeout = builder.timeout;
this.confirmEnabled = builder.confirmEnabled;
}

public String getExchange() {
Expand All @@ -34,4 +30,46 @@ public long getTimeout() {
public boolean isConfirmEnabled() {
return confirmEnabled;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private String exchange;
private String routingKey;
private long timeout;
private boolean confirmEnabled;

public Builder exchange(final String exchange) {
this.exchange = exchange;
return this;
}

public Builder routingKey(final String routingKey) {
this.routingKey = routingKey;
return this;
}

public Builder timeout(final long timeout) {
this.timeout = timeout;
return this;
}

public Builder confirmEnabled(final boolean confirmEnabled) {
this.confirmEnabled = confirmEnabled;
return this;
}

public Builder of(final AMQPPublishProperties base) {
return exchange(base.getExchange())
.routingKey(base.getRoutingKey())
.timeout(base.getTimeout())
.confirmEnabled(base.isConfirmEnabled());
}

public AMQPPublishProperties build() {
return new AMQPPublishProperties(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ protected AMQPConnectionProperties buildConnectionProperties() {

@Override
protected AMQPPublishProperties buildPublishProperties() {
return new AMQPPublishProperties(exchange, routingKey, publishTimeout, confirmEnabled);
return AMQPPublishProperties.builder()
.exchange(exchange)
.routingKey(routingKey)
.timeout(publishTimeout)
.confirmEnabled(confirmEnabled)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package io.rtr.conduit.amqp;

import com.rabbitmq.client.AMQP;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class AMQPMessageBundleTest {
class AMQPMessageBundleTest {

@Test
public void testMessageBundleHeaders() {
void testMessageBundleHeaders() {
// create message with default headers
AMQPMessageBundle bundle1 = new AMQPMessageBundle("test");
Map<String, Object> headers1 = new HashMap<>(bundle1.getBasicProperties().getHeaders());
Expand All @@ -28,7 +33,40 @@ public void testMessageBundleHeaders() {
AMQPMessageBundle bundle2 = new AMQPMessageBundle("test", headers2);
headers2 = new HashMap<>(bundle2.getBasicProperties().getHeaders());

assertTrue(headers2.size() > 0);
assertFalse(headers2.isEmpty());
assertEquals(headers1, headers2);
}

@Test
void buildMessageWithHeaders_populatesPropertiesAndBody() {
final AMQPMessageBundle messageBundle = AMQPMessageBundle.builder()
.header("foo", 1)
.header("foo", null)
.header("bar", "baz")
.header("foo2", 2)
.body("A message")
.build();

assertThat(messageBundle.getBasicProperties())
.isNotNull()
.extracting(AMQP.BasicProperties::getHeaders)
.satisfies(headers -> assertThat(headers)
.containsEntry("bar", "baz")
.containsEntry("foo2", 2)
.doesNotContainKey("foo"));
assertThat(messageBundle.getBody())
.satisfies(bytes -> assertThat(new String(bytes))
.isEqualTo("A message"));
}

@Test
void settingBothBasicPropertiesAndHeaders_throws() {
final AMQPMessageBundle.Builder builder = AMQPMessageBundle.builder()
.basicProperties(new AMQP.BasicProperties())
.headers(singletonMap("foo", 1));

assertThatThrownBy(builder::build)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Both basicProperties and headers are set");
}
}
Loading

0 comments on commit 2b953d5

Please sign in to comment.