diff --git a/conduit/src/main/java/io/rtr/conduit/amqp/AMQPMessageBundle.java b/conduit/src/main/java/io/rtr/conduit/amqp/AMQPMessageBundle.java index e0fd3c6..cf03b08 100644 --- a/conduit/src/main/java/io/rtr/conduit/amqp/AMQPMessageBundle.java +++ b/conduit/src/main/java/io/rtr/conduit/amqp/AMQPMessageBundle.java @@ -4,22 +4,24 @@ import com.rabbitmq.client.Envelope; import io.rtr.conduit.amqp.transport.TransportMessageBundle; -import java.util.Collections; import java.util.HashMap; import java.util.Map; public class AMQPMessageBundle implements TransportMessageBundle { + public static final String CONTENT_TYPE_PLAINTEXT = "text/plain"; + public static final String CONTENT_TYPE_JSON = "application/json"; private final String consumerTag; private final Envelope envelope; private final AMQP.BasicProperties basicProperties; private final byte[] body; private static AMQP.BasicProperties initialProperties() { - return initialProperties(null); + return initialProperties(null, CONTENT_TYPE_PLAINTEXT); } - private static AMQP.BasicProperties initialProperties(Map additionalHeaders) { - Map headers = new HashMap(); + private static AMQP.BasicProperties initialProperties(final Map additionalHeaders, + final String contentType) { + final Map headers = new HashMap<>(); if (additionalHeaders != null) { headers.putAll(additionalHeaders); @@ -32,7 +34,7 @@ private static AMQP.BasicProperties initialProperties(Map additi .deliveryMode(2 /*persistent*/) .priority(0) .headers(headers) - .contentType("text/plain") + .contentType(contentType) .build(); } @@ -48,7 +50,7 @@ public AMQPMessageBundle(String message) { } public AMQPMessageBundle(String message, Map headers) { - this(null, null, initialProperties(headers), message.getBytes()); + this(null, null, initialProperties(headers, CONTENT_TYPE_JSON), message.getBytes()); } private AMQPMessageBundle(final Builder builder) { @@ -83,6 +85,7 @@ public static class Builder { private Envelope envelope; private AMQP.BasicProperties basicProperties; private Map headers = new HashMap<>(); + private String contentType = CONTENT_TYPE_PLAINTEXT; private byte[] body; public Builder consumerTag(final String consumerTag) { @@ -107,6 +110,11 @@ public Builder headers(final Map headers) { return this; } + public Builder contentType(final String contentType) { + this.contentType = contentType; + return this; + } + public Builder header(final String name, final Object value) { if (value == null) { this.headers.remove(name); @@ -127,9 +135,9 @@ public Builder body(final String body) { public AMQPMessageBundle build() { if (basicProperties == null) { - this.basicProperties = initialProperties(headers); - } else if (headers != null) { - throw new IllegalArgumentException("Both basicProperties and headers are set"); + this.basicProperties = initialProperties(headers, contentType); + } else if (!headers.isEmpty() && contentType != null) { + throw new IllegalArgumentException("Cannot combine basicProperties and custom property values"); } return new AMQPMessageBundle(this); } diff --git a/conduit/src/test/java/io/rtr/conduit/amqp/AMQPMessageBundleTest.java b/conduit/src/test/java/io/rtr/conduit/amqp/AMQPMessageBundleTest.java index d8f1d70..7a5251f 100644 --- a/conduit/src/test/java/io/rtr/conduit/amqp/AMQPMessageBundleTest.java +++ b/conduit/src/test/java/io/rtr/conduit/amqp/AMQPMessageBundleTest.java @@ -3,9 +3,11 @@ import com.rabbitmq.client.AMQP; import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import static io.rtr.conduit.amqp.AMQPMessageBundle.CONTENT_TYPE_JSON; import static java.util.Collections.singletonMap; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -44,21 +46,51 @@ void buildMessageWithHeaders_populatesPropertiesAndBody() { .header("foo", null) .header("bar", "baz") .header("foo2", 2) + .contentType(CONTENT_TYPE_JSON) .body("A message") .build(); assertThat(messageBundle.getBasicProperties()) .isNotNull() - .extracting(AMQP.BasicProperties::getHeaders) - .satisfies(headers -> assertThat(headers) - .containsEntry("bar", "baz") - .containsEntry("foo2", 2) - .doesNotContainKey("foo")); + .satisfies(props -> { + assertThat(props.getContentType()) + .isEqualTo("application/json"); + assertThat(props.getHeaders()) + .containsEntry("bar", "baz") + .containsEntry("foo2", 2) + .doesNotContainKey("foo"); + }); assertThat(messageBundle.getBody()) .satisfies(bytes -> assertThat(new String(bytes)) .isEqualTo("A message")); } + @Test + void buildMessageWithBasicProperties_populatesPropertiesAndBody() { + final AMQPMessageBundle messageBundle = AMQPMessageBundle.builder() + .basicProperties(new AMQP.BasicProperties.Builder() + .contentType("application/json") + .deliveryMode(2) + .priority(0) + .headers(Collections.singletonMap("conduit-retry-count", 0)) + .build()) + .body("{\"message\":\"A message\"") + .build(); + + assertThat(messageBundle.getBasicProperties()) + .isNotNull() + .satisfies(props -> { + assertThat(props.getContentType()) + .isEqualTo("application/json"); + assertThat(props.getHeaders()) + .hasSize(1) + .containsEntry("conduit-retry-count", 0); + }); + assertThat(messageBundle.getBody()) + .satisfies(bytes -> assertThat(new String(bytes)) + .isEqualTo("{\"message\":\"A message\"")); + } + @Test void settingBothBasicPropertiesAndHeaders_throws() { final AMQPMessageBundle.Builder builder = AMQPMessageBundle.builder() @@ -67,6 +99,6 @@ void settingBothBasicPropertiesAndHeaders_throws() { assertThatThrownBy(builder::build) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Both basicProperties and headers are set"); + .hasMessage("Cannot combine basicProperties and custom property values"); } }