Skip to content

Commit

Permalink
feat(core): allow to configure (de)serialiser on consumers/producers (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
gdufrene authored and tchiotludo committed Jun 26, 2023
1 parent 127713d commit 22cc053
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions src/main/java/org/akhq/modules/KafkaModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.akhq.configs.Connection;
import org.akhq.configs.Default;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.codehaus.httpcache4j.uri.URIBuilder;
Expand Down Expand Up @@ -126,15 +128,7 @@ public AdminClient getAdminClient(String clusterId) throws InvalidClusterExcepti
}

public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

return new KafkaConsumer<>(
this.getConsumerProperties(clusterId),
new ByteArrayDeserializer(),
new ByteArrayDeserializer()
);
return getConsumer(clusterId, new Properties());
}

public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId, Properties properties) throws InvalidClusterException {
Expand All @@ -145,11 +139,16 @@ public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId, Properties pr
Properties props = this.getConsumerProperties(clusterId);
props.putAll(properties);

return new KafkaConsumer<>(
props,
new ByteArrayDeserializer(),
new ByteArrayDeserializer()
);
if (props.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) &&
props.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) ) {
return new KafkaConsumer<>(props);
} else {
return new KafkaConsumer<>(
props,
new ByteArrayDeserializer(),
new ByteArrayDeserializer()
);
}
}

private final Map<String, KafkaProducer<byte[], byte[]>> producers = new HashMap<>();
Expand All @@ -160,11 +159,18 @@ public KafkaProducer<byte[], byte[]> getProducer(String clusterId) throws Invali
}

if (!this.producers.containsKey(clusterId)) {
this.producers.put(clusterId, new KafkaProducer<>(
this.getProducerProperties(clusterId),
new ByteArraySerializer(),
new ByteArraySerializer()
));
Properties props = this.getProducerProperties(clusterId);

if (props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) &&
props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) ) {
this.producers.put(clusterId, new KafkaProducer<>(props));
} else {
this.producers.put(clusterId, new KafkaProducer<>(
props,
new ByteArraySerializer(),
new ByteArraySerializer()
));
}
}

return this.producers.get(clusterId);
Expand Down

0 comments on commit 22cc053

Please sign in to comment.