diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java index 72571f311b29b..ad3c01b72ccf1 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java @@ -155,8 +155,12 @@ public abstract static class Record { @SchemaFieldNumber("11") public abstract @Nullable String getReplicationGroupMessageId(); + @SuppressWarnings("mutable") + @SchemaFieldNumber("12") + public abstract byte[] getAttachmentBytes(); + public static Builder builder() { - return new AutoValue_Solace_Record.Builder(); + return new AutoValue_Solace_Record.Builder().setAttachmentBytes(new byte[0]); } @AutoValue.Builder @@ -186,6 +190,8 @@ public abstract static class Builder { public abstract Builder setReplicationGroupMessageId( @Nullable String replicationGroupMessageId); + public abstract Builder setAttachmentBytes(byte[] attachmentBytes); + public abstract Record build(); } } @@ -198,21 +204,22 @@ public static class SolaceRecordMapper { return null; } - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ByteArrayOutputStream payloadBytesStream = new ByteArrayOutputStream(); if (msg.getContentLength() != 0) { try { - outputStream.write(msg.getBytes()); + payloadBytesStream.write(msg.getBytes()); } catch (IOException e) { - LOG.error("Could not write Bytes from the BytesXMLMessage to the Solace.record.", e); + LOG.error("Could not write bytes from the BytesXMLMessage to the Solace.record.", e); } } + + ByteArrayOutputStream attachmentBytesStream = new ByteArrayOutputStream(); if (msg.getAttachmentContentLength() != 0) { try { - outputStream.write(msg.getAttachmentByteBuffer().array()); + attachmentBytesStream.write(msg.getAttachmentByteBuffer().array()); } catch (IOException e) { LOG.error( - "Could not AttachmentByteBuffer from the BytesXMLMessage to the" + " Solace.record.", - e); + "Could not AttachmentByteBuffer from the BytesXMLMessage to the Solace.record.", e); } } @@ -227,7 +234,7 @@ public static class SolaceRecordMapper { destBuilder.setType(DestinationType.QUEUE); } else { LOG.error( - "SolaceIO: Unknown destination type for message {}, assuming that {} is a" + " topic", + "SolaceIO: Unknown destination type for message {}, assuming that {} is a topic", msg.getCorrelationId(), originalDestination.getName()); destBuilder.setType(DestinationType.TOPIC); @@ -248,7 +255,8 @@ public static class SolaceRecordMapper { msg.getReplicationGroupMessageId() != null ? msg.getReplicationGroupMessageId().toString() : null) - .setPayload(outputStream.toByteArray()) + .setPayload(payloadBytesStream.toByteArray()) + .setAttachmentBytes(attachmentBytesStream.toByteArray()) .build(); } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/SolaceRecordCoder.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/SolaceRecordCoder.java index fcad3b77a0bca..319ec78295340 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/SolaceRecordCoder.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/SolaceRecordCoder.java @@ -69,6 +69,7 @@ public void encode(Record value, @NonNull OutputStream outStream) throws IOExcep LONG_CODER.encode(value.getSenderTimestamp(), outStream); LONG_CODER.encode(value.getSequenceNumber(), outStream); LONG_CODER.encode(value.getTimeToLive(), outStream); + BYTE_CODER.encode(value.getAttachmentBytes(), outStream); } @Override @@ -100,6 +101,7 @@ public Record decode(InputStream inStream) throws IOException { .setSenderTimestamp(LONG_CODER.decode(inStream)) .setSequenceNumber(LONG_CODER.decode(inStream)) .setTimeToLive(LONG_CODER.decode(inStream)) + .setAttachmentBytes(BYTE_CODER.decode(inStream)) .build(); } } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceTest.java index 47b065be98a01..2972ddc15cbd5 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceTest.java @@ -18,14 +18,12 @@ package org.apache.beam.sdk.io.solace.data; import java.nio.charset.StandardCharsets; -import java.util.Map; import org.apache.beam.sdk.io.solace.data.Solace.Destination; import org.junit.Assert; import org.junit.Test; public class SolaceTest { - Map properties; Destination destination = Solace.Destination.builder() .setName("some destination") @@ -42,6 +40,8 @@ public class SolaceTest { Long timeToLive = 34567890L; String payloadString = "some payload"; byte[] payload = payloadString.getBytes(StandardCharsets.UTF_8); + String attachmentString = "some attachment"; + byte[] attachment = attachmentString.getBytes(StandardCharsets.UTF_8); @Test public void testRecordEquality() { @@ -58,6 +58,7 @@ public void testRecordEquality() { .setSequenceNumber(sequenceNumber) .setTimeToLive(timeToLive) .setPayload(payload) + .setAttachmentBytes(attachment) .build(); Solace.Record obj2 = @@ -73,6 +74,7 @@ public void testRecordEquality() { .setSequenceNumber(sequenceNumber) .setTimeToLive(timeToLive) .setPayload(payload) + .setAttachmentBytes(attachment) .build(); Solace.Record obj3 = @@ -88,6 +90,7 @@ public void testRecordEquality() { .setSequenceNumber(sequenceNumber) .setTimeToLive(timeToLive) .setPayload(payload) + .setAttachmentBytes(attachment) .build(); Assert.assertEquals(obj1, obj2); @@ -104,6 +107,8 @@ public void testRecordEquality() { Assert.assertEquals(obj1.getSequenceNumber(), sequenceNumber); Assert.assertEquals(obj1.getTimeToLive(), timeToLive); Assert.assertEquals(new String(obj1.getPayload(), StandardCharsets.UTF_8), payloadString); + Assert.assertEquals( + new String(obj1.getAttachmentBytes(), StandardCharsets.UTF_8), attachmentString); } @Test @@ -120,6 +125,7 @@ public void testRecordNullability() { Assert.assertNull(obj.getSenderTimestamp()); Assert.assertNull(obj.getSequenceNumber()); Assert.assertNull(obj.getTimeToLive()); + Assert.assertArrayEquals(obj.getAttachmentBytes(), new byte[0]); Assert.assertEquals(new String(obj.getPayload(), StandardCharsets.UTF_8), payloadString); }