Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8326: Introduce List Serde #6592

Merged
merged 49 commits into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
131b12a
Add List serializer, desearializer, serde
yeralin Apr 17, 2019
146c06a
Fix formatting and unused imports
yeralin Apr 17, 2019
6f18568
Delete ListSerde, and use WrapperSerde instead
May 6, 2019
0fa8495
Prevent possible NPE when data is null or empty
May 6, 2019
dae39d7
Use try-with-resources to utilize auto stream closing
May 6, 2019
36d206d
Remove comparator parameter from ListDeserializer constructor
May 7, 2019
c93d91e
Implement a test case for ListSerde
May 13, 2019
aaf22ce
Close de/serializers and propagate parameters in configure methods
May 24, 2019
b097996
Initialize ArrayList with size for better performance
May 24, 2019
89b22b5
Add another test case for ListSerde
Jun 10, 2019
9dee55c
Add support for fixed/variable size entries encoding
Jun 21, 2019
bd0ec99
Force user to pass list class to listSerde
Jun 21, 2019
9152216
Introduce default zero-arg constructors
Jul 16, 2019
204d1e7
Introduce 4 new configuration parameters for ListSerde
Jul 16, 2019
2a0149a
Update configuration strategy for ListSerde
Jul 24, 2019
c8b9f1a
Update the code due to review changes
yeralin Jul 31, 2019
90cc373
Set all new config definitions of Type.CLASS
yeralin Aug 5, 2019
6e61cd1
Set default values for newly introduced properties to null
yeralin Aug 12, 2019
a884d8b
Suppress unchecked warnings
yeralin Aug 12, 2019
fe95b01
Allow import of "org.apache.kafka.clients" package in serialization p…
yeralin Sep 9, 2019
7353b65
Fix spotbug warning
yeralin Sep 13, 2019
7530669
Generify List class
yeralin Sep 30, 2019
17cb8e7
Remove deprecated import
yeralin Oct 1, 2019
85a791b
Use mkMap and mkEntry to populate fixedLengthDeserializers map
yeralin Oct 30, 2019
11d09d2
Move SuppressWarnings statement on a method level
yeralin Oct 30, 2019
71bdf49
Update UUID fixed size to 36 bytes
yeralin Oct 30, 2019
097e780
Add more test cases to cover other types of fixed sizes
yeralin Oct 30, 2019
a4d4d89
Rearrange defines
yeralin Nov 12, 2019
bd9740f
Modify interface definitions
yeralin Nov 12, 2019
dafda14
Make more descriptive docs
yeralin Nov 12, 2019
bac39aa
Update the code due to review comments
yeralin Jan 27, 2020
8edaacc
Refactor configure methods for list (de)serializers
yeralin Feb 13, 2020
f6326f1
Introduce list (de)serializers configuration tests
yeralin Feb 13, 2020
dea18d0
Refactor getters
yeralin Jul 13, 2020
eb0db3a
Add null-index-list and negative-size serialization strategies functi…
yeralin Jul 13, 2020
44494eb
Add test coverage for serialization strategies functionality
yeralin Jul 13, 2020
569969e
Update existing test cases
yeralin Jul 13, 2020
da88bb7
Migrate to Junit 5 Jupiter
Jan 20, 2021
614aa89
Fix merge conflict
Feb 12, 2021
b982f36
Remove extraneous configurations
Apr 26, 2021
9e02a90
Rename constants
Apr 26, 2021
15eb0d8
Use primitives' BYTES constants
Apr 26, 2021
cdbbbd5
Validate passed constructor parameters
Apr 26, 2021
e8a5e59
Throw when trying to configure initialized list
Apr 27, 2021
819e186
Refactor list (de)serializer tests
Apr 27, 2021
f58bfce
Review refactoring
May 4, 2021
b4989fe
Review changes
May 11, 2021
e7c7789
Add documentation changes
May 12, 2021
37569c9
Review changes
May 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
</subpackage>

<subpackage name="serialization">
<allow pkg="org.apache.kafka.clients" />
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved
<allow class="org.apache.kafka.common.errors.SerializationException" />
<allow class="org.apache.kafka.common.header.Headers" />
</subpackage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,26 @@ public class CommonClientConfigs {
+ "elapses the client will resend the request if necessary or fail the request if "
+ "retries are exhausted.";

public static final String DEFAULT_LIST_KEY_SERDE_INNER_CLASS = "default.list.key.serde.inner";
yeralin marked this conversation as resolved.
Show resolved Hide resolved
yeralin marked this conversation as resolved.
Show resolved Hide resolved
public static final String DEFAULT_LIST_KEY_SERDE_INNER_CLASS_DOC = "Default inner class of list serde for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. "
+ "This configuration will be read if and only if <code>default.key.serde</code> configuration is set to <code>org.apache.kafka.common.serialization.Serdes.ListSerde</code>";

public static final String DEFAULT_LIST_VALUE_SERDE_INNER_CLASS = "default.list.value.serde.inner";
public static final String DEFAULT_LIST_VALUE_SERDE_INNER_CLASS_DOC = "Default inner class of list serde for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. "
+ "This configuration will be read if and only if <code>default.value.serde</code> configuration is set to <code>org.apache.kafka.common.serialization.Serdes.ListSerde</code>";

public static final String DEFAULT_LIST_KEY_SERDE_TYPE_CLASS = "default.list.key.serde.type";
public static final String DEFAULT_LIST_KEY_SERDE_TYPE_CLASS_DOC = "Default class for key that implements the <code>java.util.List</code> interface. "
+ "This configuration will be read if and only if <code>default.key.serde</code> configuration is set to <code>org.apache.kafka.common.serialization.Serdes.ListSerde</code> "
+ "Note when list serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via '"
yeralin marked this conversation as resolved.
Show resolved Hide resolved
+ DEFAULT_LIST_KEY_SERDE_INNER_CLASS + "'";

public static final String DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS = "default.list.value.serde.type";
public static final String DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS_DOC = "Default class for value that implements the <code>java.util.List</code> interface. "
+ "This configuration will be read if and only if <code>default.value.serde</code> configuration is set to <code>org.apache.kafka.common.serialization.Serdes.ListSerde</code> "
+ "Note when list serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via '"
+ DEFAULT_LIST_VALUE_SERDE_INNER_CLASS + "'";

public static final String GROUP_ID_CONFIG = "group.id";
public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;

import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serdes.ListSerde;
import org.apache.kafka.common.utils.Utils;

public class ListDeserializer<Inner> implements Deserializer<List<Inner>> {

private static final Map<Class<? extends Deserializer<?>>, Integer> FIXED_LENGTH_DESERIALIZERS = mkMap(
mkEntry(ShortDeserializer.class, Short.BYTES),
mkEntry(IntegerDeserializer.class, Integer.BYTES),
mkEntry(FloatDeserializer.class, Float.BYTES),
mkEntry(LongDeserializer.class, Long.BYTES),
mkEntry(DoubleDeserializer.class, Double.BYTES),
mkEntry(UUIDDeserializer.class, 36)
);

private Deserializer<Inner> inner;
private Class<?> listClass;
private Integer primitiveSize;

public ListDeserializer() {}

public <L extends List<Inner>> ListDeserializer(Class<L> listClass, Deserializer<Inner> inner) {
if (listClass == null || inner == null) {
throw new IllegalArgumentException("ListDeserializer requires both \"listClass\" and \"innerDeserializer\" parameters to be provided during initialization");
}
this.listClass = listClass;
this.inner = inner;
this.primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass());
}

public Deserializer<Inner> innerDeserializer() {
return inner;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
if (listClass != null || inner != null) {
throw new ConfigException("List deserializer was already initialized using a non-default constructor");
}
configureListClass(configs, isKey);
configureInnerSerde(configs, isKey);
}

private void configureListClass(Map<String, ?> configs, boolean isKey) {
String listTypePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS;
final Object listClassOrName = configs.get(listTypePropertyName);
if (listClassOrName == null) {
throw new ConfigException("Not able to determine the list class because it was neither passed via the constructor nor set in the config.");
}
try {
if (listClassOrName instanceof String) {
listClass = Utils.loadClass((String) listClassOrName, Object.class);
} else if (listClassOrName instanceof Class) {
listClass = (Class<?>) listClassOrName;
} else {
throw new KafkaException("Could not determine the list class instance using \"" + listTypePropertyName + "\" property.");
}
} catch (final ClassNotFoundException e) {
throw new ConfigException(listTypePropertyName, listClassOrName, "Deserializer's list class \"" + listClassOrName + "\" could not be found.");
}
}

@SuppressWarnings("unchecked")
private void configureInnerSerde(Map<String, ?> configs, boolean isKey) {
String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS;
yeralin marked this conversation as resolved.
Show resolved Hide resolved
final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName);
if (innerSerdeClassOrName == null) {
throw new ConfigException("Not able to determine the inner serde class because it was neither passed via the constructor nor set in the config.");
}
try {
if (innerSerdeClassOrName instanceof String) {
inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).deserializer();
} else if (innerSerdeClassOrName instanceof Class) {
inner = (Deserializer<Inner>) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).deserializer();
} else {
throw new KafkaException("Could not determine the inner serde class instance using \"" + innerSerdePropertyName + "\" property.");
}
inner.configure(configs, isKey);
primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass());
} catch (final ClassNotFoundException e) {
throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Deserializer's inner serde class \"" + innerSerdeClassOrName + "\" could not be found.");
yeralin marked this conversation as resolved.
Show resolved Hide resolved
}
}

yeralin marked this conversation as resolved.
Show resolved Hide resolved

@SuppressWarnings("unchecked")
private List<Inner> createListInstance(int listSize) {
try {
Constructor<List<Inner>> listConstructor;
try {
listConstructor = (Constructor<List<Inner>>) listClass.getConstructor(Integer.TYPE);
return listConstructor.newInstance(listSize);
} catch (NoSuchMethodException e) {
listConstructor = (Constructor<List<Inner>>) listClass.getConstructor();
return listConstructor.newInstance();
}
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException |
IllegalArgumentException | InvocationTargetException e) {
throw new KafkaException("Could not construct a list instance of \"" + listClass.getCanonicalName() + "\"", e);
}
}

private SerializationStrategy parseSerializationStrategyFlag(final int serializationStrategyFlag) throws IOException {
if (serializationStrategyFlag < 0 || serializationStrategyFlag >= SerializationStrategy.VALUES.length) {
throw new SerializationException("Invalid serialization strategy flag value");
}
return SerializationStrategy.VALUES[serializationStrategyFlag];
yeralin marked this conversation as resolved.
Show resolved Hide resolved
}

private List<Integer> deserializeNullIndexList(final DataInputStream dis) throws IOException {
yeralin marked this conversation as resolved.
Show resolved Hide resolved
int nullIndexListSize = dis.readInt();
List<Integer> nullIndexList = new ArrayList<>(nullIndexListSize);
while (nullIndexListSize != 0) {
nullIndexList.add(dis.readInt());
nullIndexListSize--;
}
return nullIndexList;
}

@Override
public List<Inner> deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(data))) {
SerializationStrategy serStrategy = parseSerializationStrategyFlag(dis.readByte());
List<Integer> nullIndexList = null;
if (serStrategy == SerializationStrategy.CONSTANT_SIZE) {
// In CONSTANT_SIZE strategy, indexes of null entries are decoded from a null index list
nullIndexList = deserializeNullIndexList(dis);
}
final int size = dis.readInt();
yeralin marked this conversation as resolved.
Show resolved Hide resolved
List<Inner> deserializedList = createListInstance(size);
for (int i = 0; i < size; i++) {
int entrySize = -1;
if (serStrategy == SerializationStrategy.CONSTANT_SIZE) {
if (nullIndexList.contains(i)) {
deserializedList.add(null);
}
entrySize = primitiveSize;
} else if (serStrategy == SerializationStrategy.VARIABLE_SIZE) {
entrySize = dis.readInt();
if (entrySize == ListSerde.NULL_ENTRY_VALUE) {
deserializedList.add(null);
}
}
byte[] payload = new byte[entrySize];
if (dis.read(payload) == -1) {
throw new SerializationException("End of the stream was reached prematurely");
}
deserializedList.add(inner.deserialize(topic, payload));
}
return deserializedList;
} catch (IOException e) {
throw new KafkaException("Unable to deserialize into a List", e);
}
}

@Override
public void close() {
if (inner != null) {
inner.close();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.serialization;

import java.util.ArrayList;
import java.util.Iterator;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;

public class ListSerializer<Inner> implements Serializer<List<Inner>> {

private static final List<Class<? extends Serializer<?>>> FIXED_LENGTH_SERIALIZERS = Arrays.asList(
ShortSerializer.class,
IntegerSerializer.class,
FloatSerializer.class,
LongSerializer.class,
DoubleSerializer.class,
UUIDSerializer.class);

private Serializer<Inner> inner;
private SerializationStrategy serStrategy;

public ListSerializer() {}

public ListSerializer(Serializer<Inner> inner) {
if (inner == null) {
throw new IllegalArgumentException("ListSerializer requires \"serializer\" parameter to be provided during initialization");
}
this.inner = inner;
this.serStrategy = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()) ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE;
}

public Serializer<Inner> getInnerSerializer() {
return inner;
}

@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
if (inner != null) {
throw new ConfigException("List serializer was already initialized using a non-default constructor");
}
final String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS;
final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName);
if (innerSerdeClassOrName == null) {
throw new ConfigException("Not able to determine the serializer class because it was neither passed via the constructor nor set in the config.");
}
try {
if (innerSerdeClassOrName instanceof String) {
inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).serializer();
} else if (innerSerdeClassOrName instanceof Class) {
inner = (Serializer<Inner>) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).serializer();
} else {
throw new KafkaException("Could not create a serializer class instance using \"" + innerSerdePropertyName + "\" property.");
}
inner.configure(configs, isKey);
serStrategy = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()) ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE;
} catch (final ClassNotFoundException e) {
throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Serializer class " + innerSerdeClassOrName + " could not be found.");
}
}

private void serializeNullIndexList(final DataOutputStream out, List<Inner> data) throws IOException {
int i = 0;
List<Integer> nullIndexList = new ArrayList<>();
for (Iterator<Inner> it = data.listIterator(); it.hasNext(); i++) {
if (it.next() == null) {
nullIndexList.add(i);
}
}
out.writeInt(nullIndexList.size());
for (int nullIndex : nullIndexList) {
out.writeInt(nullIndex);
}
}

@Override
public byte[] serialize(String topic, List<Inner> data) {
if (data == null) {
return null;
}
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
out.writeByte(serStrategy.ordinal()); // write serialization strategy flag
if (serStrategy == SerializationStrategy.CONSTANT_SIZE) {
yeralin marked this conversation as resolved.
Show resolved Hide resolved
// In CONSTANT_SIZE strategy, indexes of null entries are encoded in a null index list
serializeNullIndexList(out, data);
}
final int size = data.size();
out.writeInt(size);
for (Inner entry : data) {
if (entry == null) {
if (serStrategy == SerializationStrategy.VARIABLE_SIZE) {
out.writeInt(Serdes.ListSerde.NULL_ENTRY_VALUE);
}
} else {
final byte[] bytes = inner.serialize(topic, entry);
if (serStrategy == SerializationStrategy.VARIABLE_SIZE) {
out.writeInt(bytes.length);
yeralin marked this conversation as resolved.
Show resolved Hide resolved
}
out.write(bytes);
}
}
return baos.toByteArray();
} catch (IOException e) {
throw new KafkaException("Failed to serialize List", e);
}
}

@Override
public void close() {
if (inner != null) {
inner.close();
}
}

}
Loading