Skip to content

Commit

Permalink
Fix serialization for list type headers (#31618)
Browse files Browse the repository at this point in the history
  • Loading branch information
Amar3tto authored Jun 20, 2024
1 parent cfbcdf0 commit 38e6e6c
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand Down Expand Up @@ -73,35 +76,45 @@ private static GetResponse serializableDeliveryOf(GetResponse processed) {
envelope, nextProperties, processed.getBody(), processed.getMessageCount());
}

private static Map<String, Object> serializableHeaders(Map<String, Object> headers) {
@VisibleForTesting
static Map<String, Object> serializableHeaders(Map<String, Object> headers) {
Map<String, Object> returned = new HashMap<>();
if (headers != null) {
for (Map.Entry<String, Object> h : headers.entrySet()) {
Object value = h.getValue();
if (!(value instanceof Serializable)) {
try {
if (value instanceof LongString) {
LongString longString = (LongString) value;
byte[] bytes = longString.getBytes();
String s = new String(bytes, StandardCharsets.UTF_8);
value = s;
} else {
throw new RuntimeException(String.format("no transformation defined for %s", value));
}
} catch (Throwable t) {
throw new UnsupportedOperationException(
String.format(
"can't make unserializable value %s a serializable value (which is mandatory for Apache Beam dataflow implementation)",
value),
t);
}
if (value instanceof List<?>) {
// Transformation for List type headers
value =
((List<?>) value)
.stream().map(RabbitMqMessage::getTransformedValue).collect(Collectors.toList());
} else if (!(value instanceof Serializable)) {
value = getTransformedValue(value);
}
returned.put(h.getKey(), value);
}
}
return returned;
}

private static Object getTransformedValue(Object value) {
try {
if (value instanceof LongString) {
LongString longString = (LongString) value;
byte[] bytes = longString.getBytes();
value = new String(bytes, StandardCharsets.UTF_8);
} else {
throw new RuntimeException(String.format("No transformation defined for %s", value));
}
} catch (Throwable t) {
throw new UnsupportedOperationException(
String.format(
"Can't make unserializable value %s a serializable value (which is mandatory for Apache Beam dataflow implementation)",
value),
t);
}
return value;
}

private final @Nullable String routingKey;
private final byte[] body;
private final String contentType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.rabbitmq;

import static org.junit.Assert.assertEquals;

import com.rabbitmq.client.LongString;
import com.rabbitmq.client.impl.LongStringHelper;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Test of {@link RabbitMqMessage}. */
@RunWith(JUnit4.class)
public class RabbitMqMessageTest implements Serializable {

@Test(expected = UnsupportedOperationException.class)
public void testSerializableHeadersThrowsIfValueIsNotSerializable() {
Map<String, Object> rawHeaders = new HashMap<>();
Object notSerializableObject = Optional.of(new Object());
rawHeaders.put("key1", notSerializableObject);
RabbitMqMessage.serializableHeaders(rawHeaders);
}

@Test
public void testSerializableHeadersWithLongStringValues() {
Map<String, Object> rawHeaders = new HashMap<>();
String key1 = "key1", key2 = "key2", value1 = "value1", value2 = "value2";
rawHeaders.put(key1, LongStringHelper.asLongString(value1));
rawHeaders.put(key2, LongStringHelper.asLongString(value2.getBytes(StandardCharsets.UTF_8)));

Map<String, Object> serializedHeaders = RabbitMqMessage.serializableHeaders(rawHeaders);

assertEquals(value1, serializedHeaders.get(key1));
assertEquals(value2, serializedHeaders.get(key2));
}

@Test
public void testSerializableHeadersWithListValue() {
Map<String, Object> rawHeaders = new HashMap<>();
List<String> expectedSerializedList = Lists.newArrayList("value1", "value2");
List<LongString> rawList =
expectedSerializedList.stream()
.map(LongStringHelper::asLongString)
.collect(Collectors.toList());
String key1 = "key1";
rawHeaders.put(key1, rawList);

Map<String, Object> serializedHeaders = RabbitMqMessage.serializableHeaders(rawHeaders);

assertEquals(expectedSerializedList, serializedHeaders.get(key1));
}
}

0 comments on commit 38e6e6c

Please sign in to comment.