Skip to content

Commit

Permalink
CE-2035: Bug fix setting of AMQPMessageBuilder basicProperties (#136)
Browse files Browse the repository at this point in the history
* CE-2035: Fix AMQPMessageBuilder to support building with custom AMQP.BasicProperties

* CE-2035: Extend AMQPMessageBundle builder to support specifying the body content type
  • Loading branch information
jonhanley authored Apr 15, 2024
1 parent a879917 commit d6ab7fd
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 15 deletions.
26 changes: 17 additions & 9 deletions conduit/src/main/java/io/rtr/conduit/amqp/AMQPMessageBundle.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,24 @@
import com.rabbitmq.client.Envelope;
import io.rtr.conduit.amqp.transport.TransportMessageBundle;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class AMQPMessageBundle implements TransportMessageBundle {
public static final String CONTENT_TYPE_PLAINTEXT = "text/plain";
public static final String CONTENT_TYPE_JSON = "application/json";
private final String consumerTag;
private final Envelope envelope;
private final AMQP.BasicProperties basicProperties;
private final byte[] body;

private static AMQP.BasicProperties initialProperties() {
return initialProperties(null);
return initialProperties(null, CONTENT_TYPE_PLAINTEXT);
}

private static AMQP.BasicProperties initialProperties(Map<String, Object> additionalHeaders) {
Map<String, Object> headers = new HashMap<String, Object>();
private static AMQP.BasicProperties initialProperties(final Map<String, Object> additionalHeaders,
final String contentType) {
final Map<String, Object> headers = new HashMap<>();

if (additionalHeaders != null) {
headers.putAll(additionalHeaders);
Expand All @@ -32,7 +34,7 @@ private static AMQP.BasicProperties initialProperties(Map<String, Object> additi
.deliveryMode(2 /*persistent*/)
.priority(0)
.headers(headers)
.contentType("text/plain")
.contentType(contentType)
.build();
}

Expand All @@ -48,7 +50,7 @@ public AMQPMessageBundle(String message) {
}

public AMQPMessageBundle(String message, Map<String, Object> headers) {
this(null, null, initialProperties(headers), message.getBytes());
this(null, null, initialProperties(headers, CONTENT_TYPE_JSON), message.getBytes());
}

private AMQPMessageBundle(final Builder builder) {
Expand Down Expand Up @@ -83,6 +85,7 @@ public static class Builder {
private Envelope envelope;
private AMQP.BasicProperties basicProperties;
private Map<String, Object> headers = new HashMap<>();
private String contentType = CONTENT_TYPE_PLAINTEXT;
private byte[] body;

public Builder consumerTag(final String consumerTag) {
Expand All @@ -107,6 +110,11 @@ public Builder headers(final Map<String, Object> headers) {
return this;
}

public Builder contentType(final String contentType) {
this.contentType = contentType;
return this;
}

public Builder header(final String name, final Object value) {
if (value == null) {
this.headers.remove(name);
Expand All @@ -127,9 +135,9 @@ public Builder body(final String body) {

public AMQPMessageBundle build() {
if (basicProperties == null) {
this.basicProperties = initialProperties(headers);
} else if (headers != null) {
throw new IllegalArgumentException("Both basicProperties and headers are set");
this.basicProperties = initialProperties(headers, contentType);
} else if (!headers.isEmpty() && contentType != null) {
throw new IllegalArgumentException("Cannot combine basicProperties and custom property values");
}
return new AMQPMessageBundle(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import com.rabbitmq.client.AMQP;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static io.rtr.conduit.amqp.AMQPMessageBundle.CONTENT_TYPE_JSON;
import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -44,21 +46,51 @@ void buildMessageWithHeaders_populatesPropertiesAndBody() {
.header("foo", null)
.header("bar", "baz")
.header("foo2", 2)
.contentType(CONTENT_TYPE_JSON)
.body("A message")
.build();

assertThat(messageBundle.getBasicProperties())
.isNotNull()
.extracting(AMQP.BasicProperties::getHeaders)
.satisfies(headers -> assertThat(headers)
.containsEntry("bar", "baz")
.containsEntry("foo2", 2)
.doesNotContainKey("foo"));
.satisfies(props -> {
assertThat(props.getContentType())
.isEqualTo("application/json");
assertThat(props.getHeaders())
.containsEntry("bar", "baz")
.containsEntry("foo2", 2)
.doesNotContainKey("foo");
});
assertThat(messageBundle.getBody())
.satisfies(bytes -> assertThat(new String(bytes))
.isEqualTo("A message"));
}

@Test
void buildMessageWithBasicProperties_populatesPropertiesAndBody() {
final AMQPMessageBundle messageBundle = AMQPMessageBundle.builder()
.basicProperties(new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2)
.priority(0)
.headers(Collections.singletonMap("conduit-retry-count", 0))
.build())
.body("{\"message\":\"A message\"")
.build();

assertThat(messageBundle.getBasicProperties())
.isNotNull()
.satisfies(props -> {
assertThat(props.getContentType())
.isEqualTo("application/json");
assertThat(props.getHeaders())
.hasSize(1)
.containsEntry("conduit-retry-count", 0);
});
assertThat(messageBundle.getBody())
.satisfies(bytes -> assertThat(new String(bytes))
.isEqualTo("{\"message\":\"A message\""));
}

@Test
void settingBothBasicPropertiesAndHeaders_throws() {
final AMQPMessageBundle.Builder builder = AMQPMessageBundle.builder()
Expand All @@ -67,6 +99,6 @@ void settingBothBasicPropertiesAndHeaders_throws() {

assertThatThrownBy(builder::build)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Both basicProperties and headers are set");
.hasMessage("Cannot combine basicProperties and custom property values");
}
}

0 comments on commit d6ab7fd

Please sign in to comment.