Skip to content

Commit

Permalink
chore(kafka-runner): refactor the SafeKeyValueStore to handle prefixScan
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Feb 8, 2022
1 parent a7ae312 commit 2920d54
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public Flow findById(String namespace, String id, Optional<Integer> revision, St
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + namespace + "." + id + "'"));
} else {
return flowService.keepLastVersion(
this.store.toStream(),
this.store.all(),
namespace,
id
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.kestra.runner.kafka;

import io.kestra.runner.kafka.services.*;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -113,7 +111,7 @@ public List<Flow> flows() {
synchronized (this) {
if (this.flows == null) {
this.flows = this.store
.toStream()
.all()
.filter(flow -> flow != null && !flow.isDeleted())
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import com.google.common.collect.Streams;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Stream;

@Slf4j
Expand All @@ -32,37 +35,55 @@ public Optional<V> get(K key) {
}
}

@SuppressWarnings("UnstableApiUsage")
public Stream<V> toStream() {
public Stream<V> all() {
KeyValueIterator<K, V> all = this.store.all();

return toStream(all, kvKeyValue -> kvKeyValue.value);
}

public Stream<V> prefix(String prefix) {
KeyValueIterator<K, V> all = this.store.prefixScan(prefix, new StringSerializer());

return toStream(all, kvKeyValue -> kvKeyValue.value);
}

public Stream<KeyValue<K, V>> prefixWithKey(String prefix) {
KeyValueIterator<K, V> all = this.store.prefixScan(prefix, new StringSerializer());

return toStream(all, kvKeyValue -> new KeyValue<>(kvKeyValue.key, kvKeyValue.value));
}

@SuppressWarnings("UnstableApiUsage")
private <R> Stream<R> toStream(KeyValueIterator<K, V> all, Function<KeyValue<K, V>, R> function) {
return Streams
.stream(new Iterator<V>() {
private V next;
.stream(new Iterator<R>() {
private R next;

@Override
public boolean hasNext() {
boolean seek = true;
while (seek) {
try {
next = all.next().value;

return true;
} catch (SerializationException e) {
if (log.isTraceEnabled()) {
log.trace("Exception on store {}", name, e);
}
} catch (NoSuchElementException e) {
seek = false;
}
}

all.close();
return false;
boolean seek = true;
while (seek) {
try {
KeyValue<K, V> rawNext = all.next();

next = function.apply(rawNext);

return true;
} catch (SerializationException e) {
if (log.isTraceEnabled()) {
log.trace("Exception on store {}", name, e);
}
} catch (NoSuchElementException e) {
seek = false;
}
}

all.close();
return false;
}

@Override
public V next() {
public R next() {
return this.next;
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void init(final ProcessorContext context) {
@Override
public Iterable<KeyValue<String, ExecutorFlowTrigger>> transform(String key, Executor value) {
List<Flow> allFlows = flowService
.keepLastVersion(flowStore.toStream().map(ValueAndTimestamp::value))
.keepLastVersion(flowStore.all().map(ValueAndTimestamp::value))
.collect(Collectors.toList());

return flowService.flowWithFlowTrigger(allFlows.stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void test() throws JsonProcessingException {
Optional<Flow> validOne = safeKeyValueStore.get("validOne2");
assertThat(validOne.isEmpty(), is(false));

List<Flow> list = safeKeyValueStore.toStream().filter(flow -> !flow.isDeleted()).collect(Collectors.toList());
List<Flow> list = safeKeyValueStore.all().filter(flow -> !flow.isDeleted()).collect(Collectors.toList());
assertThat(list.size(), is(3));
}
}

0 comments on commit 2920d54

Please sign in to comment.