Skip to content

Commit

Permalink
BE: Implement Hex serde (#4074)
Browse files Browse the repository at this point in the history
Co-authored-by: iliax <ikuramshin@provectus.com>
  • Loading branch information
iliax and iliax authored Aug 1, 2023
1 parent 0b99f74 commit 2db8959
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde;
import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
import com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde;
import com.provectus.kafka.ui.serdes.builtin.HexSerde;
import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
Expand Down Expand Up @@ -47,6 +48,7 @@ public SerdesInitializer() {
.put(UInt64Serde.name(), UInt64Serde.class)
.put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
.put(Base64Serde.name(), Base64Serde.class)
.put(HexSerde.name(), HexSerde.class)
.put(UuidBinarySerde.name(), UuidBinarySerde.class)
.build(),
new CustomSerdeLoader()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.provectus.kafka.ui.serdes.builtin;

import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.PropertyResolver;
import com.provectus.kafka.ui.serde.api.SchemaDescription;
import com.provectus.kafka.ui.serdes.BuiltInSerde;
import java.util.HexFormat;
import java.util.Map;
import java.util.Optional;

public class HexSerde implements BuiltInSerde {

private HexFormat deserializeHexFormat;

public static String name() {
return "Hex";
}

@Override
public void configure(PropertyResolver serdeProperties,
PropertyResolver kafkaClusterProperties,
PropertyResolver globalProperties) {
String delim = serdeProperties.getProperty("delimiter", String.class).orElse(" ");
boolean uppercase = serdeProperties.getProperty("uppercase", Boolean.class).orElse(true);
deserializeHexFormat = HexFormat.ofDelimiter(delim);
if (uppercase) {
deserializeHexFormat = deserializeHexFormat.withUpperCase();
}
}

@Override
public Optional<String> getDescription() {
return Optional.empty();
}

@Override
public Optional<SchemaDescription> getSchema(String topic, Target type) {
return Optional.empty();
}

@Override
public boolean canDeserialize(String topic, Target type) {
return true;
}

@Override
public boolean canSerialize(String topic, Target type) {
return true;
}

@Override
public Serializer serializer(String topic, Target type) {
return input -> {
input = input.trim();
// it is a hack to provide ability to sent empty array as a key/value
if (input.length() == 0) {
return new byte[] {};
}
return HexFormat.of().parseHex(prepareInputForParse(input));
};
}

// removing most-common delimiters and prefixes
private static String prepareInputForParse(String input) {
return input
.replaceAll(" ", "")
.replaceAll("#", "")
.replaceAll(":", "");
}

@Override
public Deserializer deserializer(String topic, Target type) {
return (headers, data) ->
new DeserializeResult(
deserializeHexFormat.formatHex(data),
DeserializeResult.Type.STRING,
Map.of()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.provectus.kafka.ui.serdes.builtin;

import static org.assertj.core.api.Assertions.assertThat;

import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
import com.provectus.kafka.ui.serdes.RecordHeadersImpl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;

public class HexSerdeTest {

private static final byte[] TEST_BYTES = "hello world".getBytes();
private static final String TEST_BYTES_HEX_ENCODED = "68 65 6C 6C 6F 20 77 6F 72 6C 64";

private Serde hexSerde;

@BeforeEach
void init() {
hexSerde = new HexSerde();
hexSerde.configure(
PropertyResolverImpl.empty(),
PropertyResolverImpl.empty(),
PropertyResolverImpl.empty()
);
}


@ParameterizedTest
@CsvSource({
"68656C6C6F20776F726C64", // uppercase
"68656c6c6f20776f726c64", // lowercase
"68:65:6c:6c:6f:20:77:6f:72:6c:64", // ':' delim
"68 65 6C 6C 6F 20 77 6F 72 6C 64", // space delim, UC
"68 65 6c 6c 6f 20 77 6f 72 6c 64", // space delim, LC
"#68 #65 #6C #6C #6F #20 #77 #6F #72 #6C #64" // '#' prefix, space delim
})
void serializesInputAsHexString(String hexString) {
for (Serde.Target type : Serde.Target.values()) {
var serializer = hexSerde.serializer("anyTopic", type);
byte[] bytes = serializer.serialize(hexString);
assertThat(bytes).isEqualTo(TEST_BYTES);
}
}

@ParameterizedTest
@EnumSource
void serializesEmptyStringAsEmptyBytesArray(Serde.Target type) {
var serializer = hexSerde.serializer("anyTopic", type);
byte[] bytes = serializer.serialize("");
assertThat(bytes).isEqualTo(new byte[] {});
}

@ParameterizedTest
@EnumSource
void deserializesDataAsHexBytes(Serde.Target type) {
var deserializer = hexSerde.deserializer("anyTopic", type);
var result = deserializer.deserialize(new RecordHeadersImpl(), TEST_BYTES);
assertThat(result.getResult()).isEqualTo(TEST_BYTES_HEX_ENCODED);
assertThat(result.getType()).isEqualTo(DeserializeResult.Type.STRING);
assertThat(result.getAdditionalProperties()).isEmpty();
}

@ParameterizedTest
@EnumSource
void getSchemaReturnsEmpty(Serde.Target type) {
assertThat(hexSerde.getSchema("anyTopic", type)).isEmpty();
}

@ParameterizedTest
@EnumSource
void canDeserializeReturnsTrueForAllInputs(Serde.Target type) {
assertThat(hexSerde.canDeserialize("anyTopic", type)).isTrue();
}

@ParameterizedTest
@EnumSource
void canSerializeReturnsTrueForAllInput(Serde.Target type) {
assertThat(hexSerde.canSerialize("anyTopic", type)).isTrue();
}
}

0 comments on commit 2db8959

Please sign in to comment.