diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index d572a4ffa34..f9ec3c41de1 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -21,6 +21,16 @@ + + com.hootsuite + event-bus-events-java + 20.26.0 + + + com.google.protobuf + protobuf-java + 4.28.2 + org.springframework.boot spring-boot-starter-webflux diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java index 221b8b5ea53..b461ba6cc96 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java @@ -1,6 +1,6 @@ package com.provectus.kafka.ui.serdes.builtin; -import com.google.protobuf.UnknownFieldSet; +import com.hootsuite.eventbus.events.deserialize.EventListDeserializer; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.serde.api.DeserializeResult; import com.provectus.kafka.ui.serde.api.RecordHeaders; @@ -8,6 +8,7 @@ import com.provectus.kafka.ui.serdes.BuiltInSerde; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import lombok.SneakyThrows; public class ProtobufRawSerde implements BuiltInSerde { @@ -48,12 +49,15 @@ public Deserializer deserializer(String topic, Target type) { @Override public DeserializeResult deserialize(RecordHeaders headers, byte[] data) { try { - UnknownFieldSet unknownFields = UnknownFieldSet.parseFrom(data); - return new DeserializeResult(unknownFields.toString(), DeserializeResult.Type.STRING, Map.of()); + var events = EventListDeserializer.deserialize(data); + var serialized = events.stream().map(e -> + e.getDescriptorForType().getName() + " {\n" + e.toString().replaceAll("(?m)^", " ") + "}" + ).collect(Collectors.joining("\n")); + return new DeserializeResult(serialized, DeserializeResult.Type.STRING, Map.of()); } catch (Exception e) { throw new ValidationException(e.getMessage()); } } }; } -} \ No newline at end of file +} diff --git a/kafka-ui-api/src/main/resources/application.yml b/kafka-ui-api/src/main/resources/application.yml index e8799206132..87f1958be4e 100644 --- a/kafka-ui-api/src/main/resources/application.yml +++ b/kafka-ui-api/src/main/resources/application.yml @@ -1,6 +1,18 @@ auth: type: DISABLED +server: + port: 8080 + +kafka: + clusters: + - name: dev + bootstrapServers: b-1.eventbusmskdevelopment.6x9wvy.c8.kafka.us-east-1.amazonaws.com:9092,b-2.eventbusmskdevelopment.6x9wvy.c8.kafka.us-east-1.amazonaws.com:9092,b-3.eventbusmskdevelopment.6x9wvy.c8.kafka.us-east-1.amazonaws.com:9092 + defaultValueSerde: ProtobufDecodeRaw + - name: staging + bootstrapServers: b-1.eventbusmskstaging.gxfvi6.c2.kafka.us-east-1.amazonaws.com:9092,b-2.eventbusmskstaging.gxfvi6.c2.kafka.us-east-1.amazonaws.com:9092,b-3.eventbusmskstaging.gxfvi6.c2.kafka.us-east-1.amazonaws.com:9092 + defaultValueSerde: ProtobufDecodeRaw + management: endpoint: info: diff --git a/pom.xml b/pom.xml index aa02a56f0e9..b488334fc8c 100644 --- a/pom.xml +++ b/pom.xml @@ -32,11 +32,11 @@ 2.14.0 3.5.0 1.5.5.Final - 1.18.24 + 1.18.30 3.23.3 2.13.9 2.0 - 3.1.3 + 3.1.4 1.0.0 0.1.17 0.1.26