Skip to content

Commit

Permalink
Add a field to Solace.Record mapped from BytesXMLMessage.getAttachmen…
Browse files Browse the repository at this point in the history
…tByteBuffer()
  • Loading branch information
bzablocki committed Jun 7, 2024
1 parent f6833d2 commit df1eb6c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,12 @@ public abstract static class Record {
@SchemaFieldNumber("11")
public abstract @Nullable String getReplicationGroupMessageId();

@SuppressWarnings("mutable")
@SchemaFieldNumber("12")
public abstract byte[] getAttachmentBytes();

public static Builder builder() {
return new AutoValue_Solace_Record.Builder();
return new AutoValue_Solace_Record.Builder().setAttachmentBytes(new byte[0]);
}

@AutoValue.Builder
Expand Down Expand Up @@ -186,6 +190,8 @@ public abstract static class Builder {
public abstract Builder setReplicationGroupMessageId(
@Nullable String replicationGroupMessageId);

public abstract Builder setAttachmentBytes(byte[] attachmentBytes);

public abstract Record build();
}
}
Expand All @@ -198,21 +204,22 @@ public static class SolaceRecordMapper {
return null;
}

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ByteArrayOutputStream payloadBytesStream = new ByteArrayOutputStream();
if (msg.getContentLength() != 0) {
try {
outputStream.write(msg.getBytes());
payloadBytesStream.write(msg.getBytes());
} catch (IOException e) {
LOG.error("Could not write Bytes from the BytesXMLMessage to the Solace.record.", e);
LOG.error("Could not write bytes from the BytesXMLMessage to the Solace.record.", e);
}
}

ByteArrayOutputStream attachmentBytesStream = new ByteArrayOutputStream();
if (msg.getAttachmentContentLength() != 0) {
try {
outputStream.write(msg.getAttachmentByteBuffer().array());
attachmentBytesStream.write(msg.getAttachmentByteBuffer().array());
} catch (IOException e) {
LOG.error(
"Could not AttachmentByteBuffer from the BytesXMLMessage to the" + " Solace.record.",
e);
"Could not AttachmentByteBuffer from the BytesXMLMessage to the Solace.record.", e);
}
}

Expand All @@ -227,7 +234,7 @@ public static class SolaceRecordMapper {
destBuilder.setType(DestinationType.QUEUE);
} else {
LOG.error(
"SolaceIO: Unknown destination type for message {}, assuming that {} is a" + " topic",
"SolaceIO: Unknown destination type for message {}, assuming that {} is a topic",
msg.getCorrelationId(),
originalDestination.getName());
destBuilder.setType(DestinationType.TOPIC);
Expand All @@ -248,7 +255,8 @@ public static class SolaceRecordMapper {
msg.getReplicationGroupMessageId() != null
? msg.getReplicationGroupMessageId().toString()
: null)
.setPayload(outputStream.toByteArray())
.setPayload(payloadBytesStream.toByteArray())
.setAttachmentBytes(attachmentBytesStream.toByteArray())
.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public void encode(Record value, @NonNull OutputStream outStream) throws IOExcep
LONG_CODER.encode(value.getSenderTimestamp(), outStream);
LONG_CODER.encode(value.getSequenceNumber(), outStream);
LONG_CODER.encode(value.getTimeToLive(), outStream);
BYTE_CODER.encode(value.getAttachmentBytes(), outStream);
}

@Override
Expand Down Expand Up @@ -100,6 +101,7 @@ public Record decode(InputStream inStream) throws IOException {
.setSenderTimestamp(LONG_CODER.decode(inStream))
.setSequenceNumber(LONG_CODER.decode(inStream))
.setTimeToLive(LONG_CODER.decode(inStream))
.setAttachmentBytes(BYTE_CODER.decode(inStream))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
package org.apache.beam.sdk.io.solace.data;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.beam.sdk.io.solace.data.Solace.Destination;
import org.junit.Assert;
import org.junit.Test;

public class SolaceTest {

Map<String, Object> properties;
Destination destination =
Solace.Destination.builder()
.setName("some destination")
Expand All @@ -42,6 +40,8 @@ public class SolaceTest {
Long timeToLive = 34567890L;
String payloadString = "some payload";
byte[] payload = payloadString.getBytes(StandardCharsets.UTF_8);
String attachmentString = "some attachment";
byte[] attachment = attachmentString.getBytes(StandardCharsets.UTF_8);

@Test
public void testRecordEquality() {
Expand All @@ -58,6 +58,7 @@ public void testRecordEquality() {
.setSequenceNumber(sequenceNumber)
.setTimeToLive(timeToLive)
.setPayload(payload)
.setAttachmentBytes(attachment)
.build();

Solace.Record obj2 =
Expand All @@ -73,6 +74,7 @@ public void testRecordEquality() {
.setSequenceNumber(sequenceNumber)
.setTimeToLive(timeToLive)
.setPayload(payload)
.setAttachmentBytes(attachment)
.build();

Solace.Record obj3 =
Expand All @@ -88,6 +90,7 @@ public void testRecordEquality() {
.setSequenceNumber(sequenceNumber)
.setTimeToLive(timeToLive)
.setPayload(payload)
.setAttachmentBytes(attachment)
.build();

Assert.assertEquals(obj1, obj2);
Expand All @@ -104,6 +107,8 @@ public void testRecordEquality() {
Assert.assertEquals(obj1.getSequenceNumber(), sequenceNumber);
Assert.assertEquals(obj1.getTimeToLive(), timeToLive);
Assert.assertEquals(new String(obj1.getPayload(), StandardCharsets.UTF_8), payloadString);
Assert.assertEquals(
new String(obj1.getAttachmentBytes(), StandardCharsets.UTF_8), attachmentString);
}

@Test
Expand All @@ -120,6 +125,7 @@ public void testRecordNullability() {
Assert.assertNull(obj.getSenderTimestamp());
Assert.assertNull(obj.getSequenceNumber());
Assert.assertNull(obj.getTimeToLive());
Assert.assertArrayEquals(obj.getAttachmentBytes(), new byte[0]);
Assert.assertEquals(new String(obj.getPayload(), StandardCharsets.UTF_8), payloadString);
}

Expand Down

0 comments on commit df1eb6c

Please sign in to comment.