Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#221 Added support for Schema Registry with Protobuf #225

Merged
merged 4 commits into from
Apr 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ This project is a reboot of Kafdrop 2.x, dragged kicking and screaming into the
# Features
* **View Kafka brokers** — topic and partition assignments, and controller status
* **View topics** — partition count, replication status, and custom configuration
* **Browse messages** — JSON, plain text and Avro encoding
* **Browse messages** — JSON, plain text, Avro and Protobuf encoding
* **View consumer groups** — per-partition parked offsets, combined and per-partition lag
* **Create new topics**
* **View ACLs**
Expand Down Expand Up @@ -63,13 +63,20 @@ Finally, a default message format (e.g. to deserialize Avro messages) can option
Valid format values are `DEFAULT`, `AVRO`, `PROTOBUF`. This can also be configured at the topic level via dropdown when viewing messages.

## Configure Protobuf message type
In case of protobuf message type, the definition of a message could be compiled and transmitted using a descriptor file. Thus, in order for kafdrop to recognize the message, the application will need to access to the descriptor file(s). Kafdrop will allow user to select descriptor and well as specifying name of one of the message type provided by the descriptor at runtime.
### Option 1: Using Protobuf Descriptor
In case of protobuf message type, the definition of a message could be compiled and transmitted using a descriptor file.
Thus, in order for kafdrop to recognize the message, the application will need to access to the descriptor file(s).
Kafdrop will allow user to select descriptor and well as specifying name of one of the message type provided by the descriptor at runtime.

To configure a folder with protobuf descriptor file(s) (.desc), follow:
```
--protobufdesc.directory=/var/protobuf_desc
```

### Option 2 : Using Schema Registry
In case of no protobuf descriptor file being supplied the implementation will attempt to create the protobuf deserializer using the schema registry instead.

### Defaulting to Protobuf
If preferred the message type could be set to default as follows:
```
--message.format=PROTOBUF
Expand Down Expand Up @@ -250,7 +257,7 @@ docker run -d --rm -p 9000:9000 \
|`KAFKA_KEYSTORE` |Private key for mutual TLS authentication (base-64 encoded).
|`SERVER_SERVLET_CONTEXTPATH`|The context path to serve requests on (must end with a `/`). Defaults to `/`.
|`SERVER_PORT` |The web server port to listen on. Defaults to `9000`.
|`SCHEMAREGISTRY_CONNECT `|The endpoint of Schema Registry for Avro message
|`SCHEMAREGISTRY_CONNECT `|The endpoint of Schema Registry for Avro or Protobuf message
|`CMD_ARGS` |Command line arguments to Kafdrop, e.g. `--message.format` or `--protobufdesc.directory` or `--server.port`.

##### Advanced configuration
Expand Down
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,28 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>5.5.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>5.5.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,19 @@ private MessageDeserializer getDeserializer(String topicName, MessageFormat form
final var schemaRegistryAuth = schemaRegistryProperties.getAuth();

deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl, schemaRegistryAuth);
} else if (format == MessageFormat.PROTOBUF) {
} else if (format == MessageFormat.PROTOBUF && null != descFile) {
// filter the input file name

final var descFileName = descFile.replace(".desc", "")
.replaceAll("\\.", "")
.replaceAll("/", "");
final var fullDescFile = protobufProperties.getDirectory() + File.separator + descFileName + ".desc";
deserializer = new ProtobufMessageDeserializer(topicName, fullDescFile, msgTypeName);
} else if (format == MessageFormat.PROTOBUF) {
final var schemaRegistryUrl = schemaRegistryProperties.getConnect();
final var schemaRegistryAuth = schemaRegistryProperties.getAuth();

deserializer = new ProtobufSchemaRegistryMessageDeserializer(topicName, schemaRegistryUrl, schemaRegistryAuth);
} else if (format == MessageFormat.MSGPACK) {
deserializer = new MsgPackMessageDeserializer();
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package kafdrop.util;

import java.nio.ByteBuffer;
import java.util.HashMap;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;

public class ProtobufSchemaRegistryMessageDeserializer implements MessageDeserializer {

private final String topicName;
private final KafkaProtobufDeserializer deserializer;

public ProtobufSchemaRegistryMessageDeserializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) {
this.topicName = topicName;
this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth);
}

@Override
public String deserializeMessage(ByteBuffer buffer) {
// Convert byte buffer to byte array
final var bytes = ByteUtils.convertToByteArray(buffer);
return deserializer.deserialize(topicName, bytes).toString();
}

private static KafkaProtobufDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) {
final var config = new HashMap<String, Object>();
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
if (schemaRegistryAuth != null) {
config.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
config.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
}
final var kafkaAvroDeserializer = new KafkaProtobufDeserializer<>();
kafkaAvroDeserializer.configure(config, false);
return kafkaAvroDeserializer;
}
}