Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CE-2035: Bug fix setting of AMQPMessageBuilder basicProperties #136

Merged
merged 2 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions conduit/src/main/java/io/rtr/conduit/amqp/AMQPMessageBundle.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,24 @@
import com.rabbitmq.client.Envelope;
import io.rtr.conduit.amqp.transport.TransportMessageBundle;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class AMQPMessageBundle implements TransportMessageBundle {
public static final String CONTENT_TYPE_PLAINTEXT = "text/plain";
public static final String CONTENT_TYPE_JSON = "application/json";
private final String consumerTag;
private final Envelope envelope;
private final AMQP.BasicProperties basicProperties;
private final byte[] body;

private static AMQP.BasicProperties initialProperties() {
return initialProperties(null);
return initialProperties(null, CONTENT_TYPE_PLAINTEXT);
}

private static AMQP.BasicProperties initialProperties(Map<String, Object> additionalHeaders) {
Map<String, Object> headers = new HashMap<String, Object>();
private static AMQP.BasicProperties initialProperties(final Map<String, Object> additionalHeaders,
final String contentType) {
final Map<String, Object> headers = new HashMap<>();

if (additionalHeaders != null) {
headers.putAll(additionalHeaders);
Expand All @@ -32,7 +34,7 @@ private static AMQP.BasicProperties initialProperties(Map<String, Object> additi
.deliveryMode(2 /*persistent*/)
.priority(0)
.headers(headers)
.contentType("text/plain")
.contentType(contentType)
.build();
}

Expand All @@ -48,7 +50,7 @@ public AMQPMessageBundle(String message) {
}

public AMQPMessageBundle(String message, Map<String, Object> headers) {
this(null, null, initialProperties(headers), message.getBytes());
this(null, null, initialProperties(headers, CONTENT_TYPE_JSON), message.getBytes());
}

private AMQPMessageBundle(final Builder builder) {
Expand Down Expand Up @@ -83,6 +85,7 @@ public static class Builder {
private Envelope envelope;
private AMQP.BasicProperties basicProperties;
private Map<String, Object> headers = new HashMap<>();
private String contentType = CONTENT_TYPE_PLAINTEXT;
private byte[] body;

public Builder consumerTag(final String consumerTag) {
Expand All @@ -107,6 +110,11 @@ public Builder headers(final Map<String, Object> headers) {
return this;
}

public Builder contentType(final String contentType) {
this.contentType = contentType;
return this;
}

public Builder header(final String name, final Object value) {
if (value == null) {
this.headers.remove(name);
Expand All @@ -127,9 +135,9 @@ public Builder body(final String body) {

public AMQPMessageBundle build() {
if (basicProperties == null) {
this.basicProperties = initialProperties(headers);
} else if (headers != null) {
throw new IllegalArgumentException("Both basicProperties and headers are set");
this.basicProperties = initialProperties(headers, contentType);
} else if (!headers.isEmpty() && contentType != null) {
throw new IllegalArgumentException("Cannot combine basicProperties and custom property values");
}
return new AMQPMessageBundle(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import com.rabbitmq.client.AMQP;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static io.rtr.conduit.amqp.AMQPMessageBundle.CONTENT_TYPE_JSON;
import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -44,21 +46,51 @@ void buildMessageWithHeaders_populatesPropertiesAndBody() {
.header("foo", null)
.header("bar", "baz")
.header("foo2", 2)
.contentType(CONTENT_TYPE_JSON)
.body("A message")
.build();

assertThat(messageBundle.getBasicProperties())
.isNotNull()
.extracting(AMQP.BasicProperties::getHeaders)
.satisfies(headers -> assertThat(headers)
.containsEntry("bar", "baz")
.containsEntry("foo2", 2)
.doesNotContainKey("foo"));
.satisfies(props -> {
assertThat(props.getContentType())
.isEqualTo("application/json");
assertThat(props.getHeaders())
.containsEntry("bar", "baz")
.containsEntry("foo2", 2)
.doesNotContainKey("foo");
});
assertThat(messageBundle.getBody())
.satisfies(bytes -> assertThat(new String(bytes))
.isEqualTo("A message"));
}

@Test
void buildMessageWithBasicProperties_populatesPropertiesAndBody() {
final AMQPMessageBundle messageBundle = AMQPMessageBundle.builder()
.basicProperties(new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2)
.priority(0)
.headers(Collections.singletonMap("conduit-retry-count", 0))
.build())
.body("{\"message\":\"A message\"")
.build();

assertThat(messageBundle.getBasicProperties())
.isNotNull()
.satisfies(props -> {
assertThat(props.getContentType())
.isEqualTo("application/json");
assertThat(props.getHeaders())
.hasSize(1)
.containsEntry("conduit-retry-count", 0);
});
assertThat(messageBundle.getBody())
.satisfies(bytes -> assertThat(new String(bytes))
.isEqualTo("{\"message\":\"A message\""));
}

@Test
void settingBothBasicPropertiesAndHeaders_throws() {
final AMQPMessageBundle.Builder builder = AMQPMessageBundle.builder()
Expand All @@ -67,6 +99,6 @@ void settingBothBasicPropertiesAndHeaders_throws() {

assertThatThrownBy(builder::build)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Both basicProperties and headers are set");
.hasMessage("Cannot combine basicProperties and custom property values");
}
}
Loading