diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java index ac3c2241cfc..c833d9fc726 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java @@ -12,6 +12,7 @@ import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde; import com.provectus.kafka.ui.serdes.builtin.Base64Serde; import com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde; +import com.provectus.kafka.ui.serdes.builtin.HexSerde; import com.provectus.kafka.ui.serdes.builtin.Int32Serde; import com.provectus.kafka.ui.serdes.builtin.Int64Serde; import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde; @@ -47,6 +48,7 @@ public SerdesInitializer() { .put(UInt64Serde.name(), UInt64Serde.class) .put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class) .put(Base64Serde.name(), Base64Serde.class) + .put(HexSerde.name(), HexSerde.class) .put(UuidBinarySerde.name(), UuidBinarySerde.class) .build(), new CustomSerdeLoader() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/HexSerde.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/HexSerde.java new file mode 100644 index 00000000000..cf1a6b793ff --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/HexSerde.java @@ -0,0 +1,80 @@ +package com.provectus.kafka.ui.serdes.builtin; + +import com.provectus.kafka.ui.serde.api.DeserializeResult; +import com.provectus.kafka.ui.serde.api.PropertyResolver; +import com.provectus.kafka.ui.serde.api.SchemaDescription; +import com.provectus.kafka.ui.serdes.BuiltInSerde; +import java.util.HexFormat; +import java.util.Map; +import java.util.Optional; + +public class HexSerde implements BuiltInSerde { + + private HexFormat deserializeHexFormat; + + public static String name() { + return "Hex"; + } + + @Override + public void configure(PropertyResolver serdeProperties, + PropertyResolver kafkaClusterProperties, + PropertyResolver globalProperties) { + String delim = serdeProperties.getProperty("delimiter", String.class).orElse(" "); + boolean uppercase = serdeProperties.getProperty("uppercase", Boolean.class).orElse(true); + deserializeHexFormat = HexFormat.ofDelimiter(delim); + if (uppercase) { + deserializeHexFormat = deserializeHexFormat.withUpperCase(); + } + } + + @Override + public Optional getDescription() { + return Optional.empty(); + } + + @Override + public Optional getSchema(String topic, Target type) { + return Optional.empty(); + } + + @Override + public boolean canDeserialize(String topic, Target type) { + return true; + } + + @Override + public boolean canSerialize(String topic, Target type) { + return true; + } + + @Override + public Serializer serializer(String topic, Target type) { + return input -> { + input = input.trim(); + // it is a hack to provide ability to sent empty array as a key/value + if (input.length() == 0) { + return new byte[] {}; + } + return HexFormat.of().parseHex(prepareInputForParse(input)); + }; + } + + // removing most-common delimiters and prefixes + private static String prepareInputForParse(String input) { + return input + .replaceAll(" ", "") + .replaceAll("#", "") + .replaceAll(":", ""); + } + + @Override + public Deserializer deserializer(String topic, Target type) { + return (headers, data) -> + new DeserializeResult( + deserializeHexFormat.formatHex(data), + DeserializeResult.Type.STRING, + Map.of() + ); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/HexSerdeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/HexSerdeTest.java new file mode 100644 index 00000000000..a318279f564 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/HexSerdeTest.java @@ -0,0 +1,84 @@ +package com.provectus.kafka.ui.serdes.builtin; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.serde.api.DeserializeResult; +import com.provectus.kafka.ui.serde.api.Serde; +import com.provectus.kafka.ui.serdes.PropertyResolverImpl; +import com.provectus.kafka.ui.serdes.RecordHeadersImpl; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.EnumSource; + +public class HexSerdeTest { + + private static final byte[] TEST_BYTES = "hello world".getBytes(); + private static final String TEST_BYTES_HEX_ENCODED = "68 65 6C 6C 6F 20 77 6F 72 6C 64"; + + private Serde hexSerde; + + @BeforeEach + void init() { + hexSerde = new HexSerde(); + hexSerde.configure( + PropertyResolverImpl.empty(), + PropertyResolverImpl.empty(), + PropertyResolverImpl.empty() + ); + } + + + @ParameterizedTest + @CsvSource({ + "68656C6C6F20776F726C64", // uppercase + "68656c6c6f20776f726c64", // lowercase + "68:65:6c:6c:6f:20:77:6f:72:6c:64", // ':' delim + "68 65 6C 6C 6F 20 77 6F 72 6C 64", // space delim, UC + "68 65 6c 6c 6f 20 77 6f 72 6c 64", // space delim, LC + "#68 #65 #6C #6C #6F #20 #77 #6F #72 #6C #64" // '#' prefix, space delim + }) + void serializesInputAsHexString(String hexString) { + for (Serde.Target type : Serde.Target.values()) { + var serializer = hexSerde.serializer("anyTopic", type); + byte[] bytes = serializer.serialize(hexString); + assertThat(bytes).isEqualTo(TEST_BYTES); + } + } + + @ParameterizedTest + @EnumSource + void serializesEmptyStringAsEmptyBytesArray(Serde.Target type) { + var serializer = hexSerde.serializer("anyTopic", type); + byte[] bytes = serializer.serialize(""); + assertThat(bytes).isEqualTo(new byte[] {}); + } + + @ParameterizedTest + @EnumSource + void deserializesDataAsHexBytes(Serde.Target type) { + var deserializer = hexSerde.deserializer("anyTopic", type); + var result = deserializer.deserialize(new RecordHeadersImpl(), TEST_BYTES); + assertThat(result.getResult()).isEqualTo(TEST_BYTES_HEX_ENCODED); + assertThat(result.getType()).isEqualTo(DeserializeResult.Type.STRING); + assertThat(result.getAdditionalProperties()).isEmpty(); + } + + @ParameterizedTest + @EnumSource + void getSchemaReturnsEmpty(Serde.Target type) { + assertThat(hexSerde.getSchema("anyTopic", type)).isEmpty(); + } + + @ParameterizedTest + @EnumSource + void canDeserializeReturnsTrueForAllInputs(Serde.Target type) { + assertThat(hexSerde.canDeserialize("anyTopic", type)).isTrue(); + } + + @ParameterizedTest + @EnumSource + void canSerializeReturnsTrueForAllInput(Serde.Target type) { + assertThat(hexSerde.canSerialize("anyTopic", type)).isTrue(); + } +}