diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java index 38bd111d9496c..272cbad52a74c 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java @@ -115,7 +115,6 @@ private void configureInnerSerde(Map configs, boolean isKey) { } } - @SuppressWarnings("unchecked") private List createListInstance(int listSize) { try { @@ -165,17 +164,10 @@ public List deserialize(String topic, byte[] data) { final int size = dis.readInt(); List deserializedList = createListInstance(size); for (int i = 0; i < size; i++) { - int entrySize = -1; - if (serStrategy == SerializationStrategy.CONSTANT_SIZE) { - if (nullIndexList.contains(i)) { - deserializedList.add(null); - } - entrySize = primitiveSize; - } else if (serStrategy == SerializationStrategy.VARIABLE_SIZE) { - entrySize = dis.readInt(); - if (entrySize == ListSerde.NULL_ENTRY_VALUE) { - deserializedList.add(null); - } + int entrySize = serStrategy == SerializationStrategy.CONSTANT_SIZE ? primitiveSize : dis.readInt(); + if (entrySize == ListSerde.NULL_ENTRY_VALUE || (nullIndexList != null && nullIndexList.contains(i))) { + deserializedList.add(null); + continue; } byte[] payload = new byte[entrySize]; if (dis.read(payload) == -1) { diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 57389bdc615d0..3ae4573d2071f 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -150,6 +150,11 @@

Streams API (meaning: use broker default replication factor). The replication.factor value of -1 requires broker version 2.4 or newer.

+

The new serde type was introduced ListSerde:

+
    +
  • Added class ListSerde to (de)serialize List-based objects
  • +
  • Introduced ListSerializer and ListDeserializer to power the new functionality
  • +

Streams API changes in 2.8.0

@@ -1104,12 +1109,6 @@

  • JoinWindows has no default size anymore: JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000)
  • -

    New serde type ListSerde:

    -
      -
    • added class ListSerde to (de)serialize List-based objects
    • -
    • introduced ListSerializer and ListDeserializer to power the new functionality
    • -
    -