Skip to content

Commit

Permalink
feat(kafka-runner): add consumer on GlobalStateProcessor to be notifi…
Browse files Browse the repository at this point in the history
…ed about every change
  • Loading branch information
tchiotludo committed Feb 3, 2022
1 parent cc0bba2 commit 7f36cb9
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.Stores;

Expand Down Expand Up @@ -138,7 +139,7 @@ public StreamsBuilder topology() {
JsonSerde.of(Executor.class)
),
kafkaAdminService.getTopicName(Executor.class),
Consumed.with(Serdes.String(), JsonSerde.of(Executor.class)),
Consumed.with(Serdes.String(), JsonSerde.of(Executor.class)).withName("GlobalStore.Executor"),
() -> new GlobalStateProcessor<>(STATESTORE_EXECUTOR)
);

Expand All @@ -150,7 +151,7 @@ public StreamsBuilder topology() {
JsonSerde.of(Trigger.class)
),
kafkaAdminService.getTopicName(Trigger.class),
Consumed.with(Serdes.String(), JsonSerde.of(Trigger.class)),
Consumed.with(Serdes.String(), JsonSerde.of(Trigger.class)).withName("GlobalStore.Trigger"),
() -> new GlobalStateLockProcessor<>(STATESTORE_TRIGGER, triggerLock)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public StreamsBuilder topology() {
JsonSerde.of(WorkerInstance.class)
),
kafkaAdminService.getTopicName(KafkaStreamSourceService.TOPIC_EXECUTOR_WORKERINSTANCE),
Consumed.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("GlobalStore.ExecutorWorkerInstace"),
Consumed.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("GlobalStore.ExecutorWorkerInstance"),
() -> new GlobalStateProcessor<>(WORKERINSTANCE_STATE_STORE_NAME)
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,36 @@
package io.kestra.runner.kafka.streams;

import com.google.common.collect.Streams;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;

@Slf4j
public class GlobalStateProcessor <T> implements Processor<String, T> {
private final String storeName;
private final Consumer<List<T>> consumer;
private KeyValueStore<String, T> store;

public GlobalStateProcessor(String storeName) {
this(storeName, null);
}

public GlobalStateProcessor(String storeName, Consumer<List<T>> consumer) {
this.storeName = storeName;
this.consumer = consumer;
}

@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.store = (KeyValueStore<String, T>) context.getStateStore(this.storeName);
this.store = context.getStateStore(this.storeName);

this.send();
}

@Override
Expand All @@ -27,6 +40,17 @@ public void process(String key, T value) {
} else {
this.store.put(key, value);
}

this.send();
}

@SuppressWarnings("UnstableApiUsage")
private void send() {
if (consumer != null) {
try (KeyValueIterator<String, T> all = this.store.all()) {
consumer.accept(Streams.stream(all).map(e -> e.value).collect(Collectors.toList()));
}
}
}

@Override
Expand Down

0 comments on commit 7f36cb9

Please sign in to comment.