Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify kafka SimpleImpl w/ Serdes #4282

Merged
merged 1 commit into from
Aug 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 35 additions & 63 deletions extensions/kafka/src/main/java/io/deephaven/kafka/SimpleImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,24 @@
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.ByteBufferSerializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.ShortDeserializer;
import org.apache.kafka.common.serialization.ShortSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.UUIDDeserializer;
import org.apache.kafka.common.serialization.UUIDSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.jetbrains.annotations.NotNull;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

Expand Down Expand Up @@ -248,24 +238,6 @@ KeyOrValueSerializer<?> getKeyOrValueSerializer(@NotNull Table t, @NotNull Strin
}
}

private static class SerDeser<T> {
private final Serializer<T> serializer;
private final Deserializer<T> deserializer;

public SerDeser(Serializer<T> serializer, Deserializer<T> deserializer) {
this.serializer = Objects.requireNonNull(serializer);
this.deserializer = Objects.requireNonNull(deserializer);
}

public Serializer<T> serializer() {
return serializer;
}

public Deserializer<T> deserializer() {
return deserializer;
}
}

@VisibleForTesting
static final Map<String, Type<?>> DESER_NAME_TO_TYPE = Map.of(
ShortDeserializer.class.getName(), Type.shortType(),
Expand All @@ -280,108 +252,108 @@ public Deserializer<T> deserializer() {

@VisibleForTesting
static Optional<Serializer<?>> serializer(Type<?> type) {
return Optional.ofNullable(type.walk(SerDeserVisitor.INSTANCE)).map(SerDeser::serializer);
return Optional.ofNullable(type.walk(SerDeserVisitor.INSTANCE)).map(Serde::serializer);
}

@VisibleForTesting
static Optional<Deserializer<?>> deserializer(Type<?> type) {
return Optional.ofNullable(type.walk(SerDeserVisitor.INSTANCE)).map(SerDeser::deserializer);
return Optional.ofNullable(type.walk(SerDeserVisitor.INSTANCE)).map(Serde::deserializer);
}

/**
* The visitor pattern with SerDeser ensures that whenever a new type is added, it is added both for serialization
* and deserialization at the same time.
*/
private enum SerDeserVisitor implements
Type.Visitor<SerDeser<?>>,
PrimitiveType.Visitor<SerDeser<?>>,
GenericType.Visitor<SerDeser<?>> {
Type.Visitor<Serde<?>>,
PrimitiveType.Visitor<Serde<?>>,
GenericType.Visitor<Serde<?>> {
INSTANCE;

@Override
public SerDeser<?> visit(PrimitiveType<?> primitiveType) {
return primitiveType.walk((PrimitiveType.Visitor<SerDeser<?>>) this);
public Serde<?> visit(PrimitiveType<?> primitiveType) {
return primitiveType.walk((PrimitiveType.Visitor<Serde<?>>) this);
}

@Override
public SerDeser<?> visit(GenericType<?> genericType) {
return genericType.walk((GenericType.Visitor<SerDeser<?>>) this);
public Serde<?> visit(GenericType<?> genericType) {
return genericType.walk((GenericType.Visitor<Serde<?>>) this);
}

@Override
public SerDeser<?> visit(BooleanType booleanType) {
public Serde<?> visit(BooleanType booleanType) {
return null;
}

@Override
public SerDeser<?> visit(ByteType byteType) {
public Serde<?> visit(ByteType byteType) {
return null;
}

@Override
public SerDeser<?> visit(CharType charType) {
public Serde<?> visit(CharType charType) {
return null;
}

@Override
public SerDeser<?> visit(ShortType shortType) {
return new SerDeser<>(new ShortSerializer(), new ShortDeserializer());
public Serde<?> visit(ShortType shortType) {
return Serdes.Short();
}

@Override
public SerDeser<?> visit(IntType intType) {
return new SerDeser<>(new IntegerSerializer(), new IntegerDeserializer());
public Serde<?> visit(IntType intType) {
return Serdes.Integer();
}

@Override
public SerDeser<?> visit(LongType longType) {
return new SerDeser<>(new LongSerializer(), new LongDeserializer());
public Serde<?> visit(LongType longType) {
return Serdes.Long();
}

@Override
public SerDeser<?> visit(FloatType floatType) {
return new SerDeser<>(new FloatSerializer(), new FloatDeserializer());
public Serde<?> visit(FloatType floatType) {
return Serdes.Float();
}

@Override
public SerDeser<?> visit(DoubleType doubleType) {
return new SerDeser<>(new DoubleSerializer(), new DoubleDeserializer());
public Serde<?> visit(DoubleType doubleType) {
return Serdes.Double();
}

@Override
public SerDeser<?> visit(BoxedType<?> boxedType) {
return boxedType.primitiveType().walk((PrimitiveType.Visitor<SerDeser<?>>) this);
public Serde<?> visit(BoxedType<?> boxedType) {
return boxedType.primitiveType().walk((PrimitiveType.Visitor<Serde<?>>) this);
}

@Override
public SerDeser<?> visit(StringType stringType) {
return new SerDeser<>(new StringSerializer(), new StringDeserializer());
public Serde<?> visit(StringType stringType) {
return Serdes.String();
}

@Override
public SerDeser<?> visit(InstantType instantType) {
public Serde<?> visit(InstantType instantType) {
return null;
}

@Override
public SerDeser<?> visit(ArrayType<?, ?> arrayType) {
public Serde<?> visit(ArrayType<?, ?> arrayType) {
// we could walk ArrayType, but byteType().arrayType() is the only array type deserializer we support
if (Type.byteType().arrayType().equals(arrayType)) {
return new SerDeser<>(new ByteArraySerializer(), new ByteArrayDeserializer());
return Serdes.ByteArray();
}
return null;
}

@Override
public SerDeser<?> visit(CustomType<?> customType) {
public Serde<?> visit(CustomType<?> customType) {
if (customType.clazz() == UUID.class) {
return new SerDeser<>(new UUIDSerializer(), new UUIDDeserializer());
return Serdes.UUID();
}
if (customType.clazz() == ByteBuffer.class) {
return new SerDeser<>(new ByteBufferSerializer(), new ByteBufferDeserializer());
return Serdes.ByteBuffer();
}
if (customType.clazz() == Bytes.class) {
return new SerDeser<>(new BytesSerializer(), new BytesDeserializer());
return Serdes.Bytes();
}
return null;
}
Expand Down
Loading