Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Kafka as backend store implementation #359

Merged
merged 2 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -440,13 +440,21 @@
<aws.java.sdk.version>2.16.31</aws.java.sdk.version>
<gcp.java.sdk.version>19.2.1</gcp.java.sdk.version>
<ksqldb.version>0.17.0</ksqldb.version>
<typesafe.version>1.4.0</typesafe.version>
<lombok.version>1.18.22</lombok.version>
</properties>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.4.0</version>
<version>${typesafe.version}</version>
</dependency>
<dependency>
<groupId>com.hubspot.jinjava</groupId>
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/com/purbon/kafka/topology/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -531,4 +531,38 @@ public List<String> getDlqTopicsDenyList() {
public boolean areMultipleContextPerDirEnabled() {
return config.getBoolean(JULIE_ENABLE_MULTIPLE_CONTEXT_PER_DIR);
}

public String getJulieKafkaConfigTopic() {
return config.getString(JULIE_KAFKA_CONFIG_TOPIC);
}

public String getJulieInstanceId() {
if (!julieInstanceId.isEmpty()) {
return julieInstanceId;
}
try {
julieInstanceId = config.getString(JULIE_INSTANCE_ID);
} catch (ConfigException.Missing | ConfigException.WrongType errorType) {
generateRandomJulieInstanceId();
}
return julieInstanceId;
}

private String julieInstanceId = "";
private static final int defaultJulieInstanceIDLength = 10;

private void generateRandomJulieInstanceId() {
if (julieInstanceId.isEmpty()) {
int leftLimit = 97; // letter 'a'
int rightLimit = 122; // letter 'z'
Random random = new Random();

julieInstanceId =
random
.ints(leftLimit, rightLimit + 1)
.limit(defaultJulieInstanceIDLength)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}
}
}
6 changes: 6 additions & 0 deletions src/main/java/com/purbon/kafka/topology/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class Constants {
public static final String GCP_STATE_PROCESSOR_CLASS =
"com.purbon.kafka.topology.backend.GCPBackend";

public static final String KAFKA_STATE_PROCESSOR_CLASS =
"com.purbon.kafka.topology.backend.KafkaBackend";

public static final String REDIS_HOST_CONFIG = "topology.builder.redis.host";
public static final String REDIS_PORT_CONFIG = "topology.builder.redis.port";

Expand Down Expand Up @@ -131,4 +134,7 @@ public class Constants {
public static final String SSL_KEY_PASSWORD = "ssl.key.password";

public static final String JULIE_ROLES = "julie.roles";

public static final String JULIE_KAFKA_CONFIG_TOPIC = "julie.kafka.config.topic";
public static final String JULIE_INSTANCE_ID = "julie.instance.id";
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be improved by allowing ENV overwrite

}
2 changes: 2 additions & 0 deletions src/main/java/com/purbon/kafka/topology/JulieOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ private static BackendController buildBackendController(Configuration config) th
backend = new S3Backend();
} else if (backendClass.equalsIgnoreCase(GCP_STATE_PROCESSOR_CLASS)) {
backend = new GCPBackend();
} else if (backendClass.equalsIgnoreCase(KAFKA_STATE_PROCESSOR_CLASS)) {
backend = new KafkaBackend();
} else {
throw new IOException(backendClass + " Unknown state processor provided.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private Config getActualTopicConfig(String topic)

public void createTopic(Topic topic, String fullTopicName) throws IOException {
NewTopic newTopic =
new NewTopic(fullTopicName, topic.partitionsCountOptional(), topic.replicationFactor())
new NewTopic(fullTopicName, topic.getPartitionCount(), topic.replicationFactor())
.configs(topic.getRawConfig());
try {
createAllTopics(Collections.singleton(newTopic));
Expand All @@ -197,6 +197,11 @@ public void createTopic(Topic topic, String fullTopicName) throws IOException {
}
}

public void createTopic(String topicName) throws IOException {
Topic topic = new Topic();
createTopic(topic, topicName);
}

private void createAllTopics(Collection<NewTopic> newTopics)
throws ExecutionException, InterruptedException {
adminClient.createTopics(newTopics).all().get();
Expand Down
115 changes: 115 additions & 0 deletions src/main/java/com/purbon/kafka/topology/backend/KafkaBackend.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.purbon.kafka.topology.backend;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.backend.kafka.KafkaBackendConsumer;
import com.purbon.kafka.topology.backend.kafka.KafkaBackendProducer;
import com.purbon.kafka.topology.backend.kafka.RecordReceivedCallback;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class KafkaBackend implements Backend, RecordReceivedCallback {

private static final Logger LOGGER = LogManager.getLogger(KafkaBackend.class);

private boolean isCompleted;

private KafkaBackendConsumer consumer;
private KafkaBackendProducer producer;

private AtomicReference<BackendState> latest;
private String instanceId;
private Thread thread;

public KafkaBackend() {
isCompleted = false;
}

private static class JulieKafkaConsumerThread implements Runnable {
private KafkaBackend callback;
private KafkaBackendConsumer consumer;

public JulieKafkaConsumerThread(KafkaBackend callback, KafkaBackendConsumer consumer) {
this.callback = callback;
this.consumer = consumer;
}

public void run() {
consumer.start();
try {
consumer.retrieve(callback);
} catch (WakeupException ex) {
LOGGER.trace(ex);
}
}
}

@SneakyThrows
@Override
public void configure(Configuration config) {
instanceId = config.getJulieInstanceId();
latest = new AtomicReference<>();

consumer = new KafkaBackendConsumer(config);
consumer.configure();

var topics = consumer.listTopics();
if (!topics.containsKey(config.getJulieKafkaConfigTopic())) {
throw new IOException(
"The internal julie kafka configuration topic topic "
+ config.getJulieKafkaConfigTopic()
+ " should exist in the cluster");
}
producer = new KafkaBackendProducer(config);
producer.configure();

thread = new Thread(new JulieKafkaConsumerThread(this, consumer), "kafkaJulieConsumer");
thread.start();
waitForCompletion();
}

public synchronized void waitForCompletion() throws InterruptedException {
while (!isCompleted) {
wait(30000);
}
}

public synchronized void complete() {
isCompleted = true;
notify();
}

@Override
public void save(BackendState state) throws IOException {
producer.save(state);
}

@Override
public BackendState load() throws IOException {
return latest == null ? new BackendState() : latest.get();
}

@Override
public void close() {
consumer.stop();
producer.stop();
try {
thread.join();
} catch (InterruptedException e) {
LOGGER.error(e);
}
latest = null;
thread = null;
}

@Override
public void apply(ConsumerRecord<String, BackendState> record) {
if (instanceId.equals(record.key()) && latest != null) {
latest.set(record.value());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.purbon.kafka.topology.backend.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.purbon.kafka.topology.backend.BackendState;
import java.io.IOException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

public class JsonDeserializer<T> implements Deserializer<BackendState> {

private final ObjectMapper objectMapper = new ObjectMapper();

private Class<T> tClass;

public JsonDeserializer() {}

public JsonDeserializer(Class<T> tClass) {
this.tClass = tClass;
}

@Override
public BackendState deserialize(String s, byte[] bytes) {
if (bytes == null) {
return null;
}
BackendState data;
try {
data = objectMapper.readValue(bytes, BackendState.class);
} catch (IOException e) {
throw new SerializationException(e);
}
return data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.purbon.kafka.topology.backend.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

public class JsonSerializer<T> implements Serializer<T> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configure(Map<String, ?> props, boolean isKey) {
// nothing to do
}

@Override
public byte[] serialize(String topic, T data) {
if (data == null) return null;

try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}

@Override
public void close() {
// nothing to do
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.purbon.kafka.topology.backend.kafka;

import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.backend.BackendState;
import com.purbon.kafka.topology.backend.KafkaBackend;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.Serdes;

public class KafkaBackendConsumer {

private Configuration config;
private KafkaConsumer<String, BackendState> consumer;

private AtomicBoolean running;

public KafkaBackendConsumer(Configuration config) {
this.config = config;
this.running = new AtomicBoolean(false);
}

public void configure() {
Properties consumerProperties = config.asProperties();
consumerProperties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass());
var serde = new JsonDeserializer<>(BackendState.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serde.getClass());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(
GROUP_ID_CONFIG,
consumerProperties.get(GROUP_ID_CONFIG).toString() + System.currentTimeMillis());
consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Collections.singletonList(config.getJulieKafkaConfigTopic()));
}

public void retrieve(KafkaBackend callback) {

while (running.get()) {
ConsumerRecords<String, BackendState> records = consumer.poll(Duration.ofSeconds(1));
callback.complete();
for (ConsumerRecord<String, BackendState> record : records) {
callback.apply(record);
}
consumer.commitAsync();
}
}

public void stop() {
running.set(false);
consumer.wakeup();
}

public void start() {
running.set(true);
}

public Map<String, List<PartitionInfo>> listTopics() {
return consumer.listTopics();
}
}
Loading