From 2920d54921288dc5d1dbac74d20bfb93a0b4b250 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Tue, 8 Feb 2022 13:18:06 +0100 Subject: [PATCH] chore(kafka-runner): refactor the SafeKeyValueStore to handle prefixScan --- .../runner/kafka/KafkaFlowExecutor.java | 2 +- .../runner/kafka/KafkaFlowListeners.java | 4 +- .../kafka/services/SafeKeyValueStore.java | 65 ++++++++++++------- .../streams/FlowWithTriggerTransformer.java | 2 +- .../kafka/services/SafeKeyValueStoreTest.java | 2 +- 5 files changed, 47 insertions(+), 28 deletions(-) diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowExecutor.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowExecutor.java index 5bc7b4dcfb..1c6d59bc57 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowExecutor.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowExecutor.java @@ -27,7 +27,7 @@ public Flow findById(String namespace, String id, Optional revision, St .orElseThrow(() -> new IllegalStateException("Unable to find flow '" + namespace + "." + id + "'")); } else { return flowService.keepLastVersion( - this.store.toStream(), + this.store.all(), namespace, id ); diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowListeners.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowListeners.java index f6ebe3349a..edf8c82b14 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowListeners.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowListeners.java @@ -1,8 +1,6 @@ package io.kestra.runner.kafka; import io.kestra.runner.kafka.services.*; -import lombok.Getter; -import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; @@ -113,7 +111,7 @@ public List flows() { synchronized (this) { if (this.flows == null) { this.flows = this.store - .toStream() + .all() .filter(flow -> flow != null && !flow.isDeleted()) .collect(Collectors.toList()); } diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/SafeKeyValueStore.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/SafeKeyValueStore.java index bcd18e099f..53cdd20189 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/SafeKeyValueStore.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/SafeKeyValueStore.java @@ -3,12 +3,15 @@ import com.google.common.collect.Streams; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Stream; @Slf4j @@ -32,37 +35,55 @@ public Optional get(K key) { } } - @SuppressWarnings("UnstableApiUsage") - public Stream toStream() { + public Stream all() { KeyValueIterator all = this.store.all(); + return toStream(all, kvKeyValue -> kvKeyValue.value); + } + + public Stream prefix(String prefix) { + KeyValueIterator all = this.store.prefixScan(prefix, new StringSerializer()); + + return toStream(all, kvKeyValue -> kvKeyValue.value); + } + + public Stream> prefixWithKey(String prefix) { + KeyValueIterator all = this.store.prefixScan(prefix, new StringSerializer()); + + return toStream(all, kvKeyValue -> new KeyValue<>(kvKeyValue.key, kvKeyValue.value)); + } + + @SuppressWarnings("UnstableApiUsage") + private Stream toStream(KeyValueIterator all, Function, R> function) { return Streams - .stream(new Iterator() { - private V next; + .stream(new Iterator() { + private R next; @Override public boolean hasNext() { - boolean seek = true; - while (seek) { - try { - next = all.next().value; - - return true; - } catch (SerializationException e) { - if (log.isTraceEnabled()) { - log.trace("Exception on store {}", name, e); - } - } catch (NoSuchElementException e) { - seek = false; - } - } - - all.close(); - return false; + boolean seek = true; + while (seek) { + try { + KeyValue rawNext = all.next(); + + next = function.apply(rawNext); + + return true; + } catch (SerializationException e) { + if (log.isTraceEnabled()) { + log.trace("Exception on store {}", name, e); + } + } catch (NoSuchElementException e) { + seek = false; + } + } + + all.close(); + return false; } @Override - public V next() { + public R next() { return this.next; } }) diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/FlowWithTriggerTransformer.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/FlowWithTriggerTransformer.java index 7187e9237a..66a98ea166 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/FlowWithTriggerTransformer.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/FlowWithTriggerTransformer.java @@ -34,7 +34,7 @@ public void init(final ProcessorContext context) { @Override public Iterable> transform(String key, Executor value) { List allFlows = flowService - .keepLastVersion(flowStore.toStream().map(ValueAndTimestamp::value)) + .keepLastVersion(flowStore.all().map(ValueAndTimestamp::value)) .collect(Collectors.toList()); return flowService.flowWithFlowTrigger(allFlows.stream()) diff --git a/runner-kafka/src/test/java/io/kestra/runner/kafka/services/SafeKeyValueStoreTest.java b/runner-kafka/src/test/java/io/kestra/runner/kafka/services/SafeKeyValueStoreTest.java index 7cb0f1cadb..3d4dc13909 100644 --- a/runner-kafka/src/test/java/io/kestra/runner/kafka/services/SafeKeyValueStoreTest.java +++ b/runner-kafka/src/test/java/io/kestra/runner/kafka/services/SafeKeyValueStoreTest.java @@ -86,7 +86,7 @@ void test() throws JsonProcessingException { Optional validOne = safeKeyValueStore.get("validOne2"); assertThat(validOne.isEmpty(), is(false)); - List list = safeKeyValueStore.toStream().filter(flow -> !flow.isDeleted()).collect(Collectors.toList()); + List list = safeKeyValueStore.all().filter(flow -> !flow.isDeleted()).collect(Collectors.toList()); assertThat(list.size(), is(3)); } } \ No newline at end of file