Skip to content

Commit

Permalink
feat(kafka-runner): used an in memory storage for flow & template to …
Browse files Browse the repository at this point in the history
…avoid performance issue on deserialization from rockdb
  • Loading branch information
tchiotludo committed Feb 9, 2022
1 parent 2920d54 commit 1d4de46
Show file tree
Hide file tree
Showing 34 changed files with 431 additions and 249 deletions.
7 changes: 7 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ public static String uid(String namespace, String id, Optional<Integer> revision
));
}

public static String uidWithoutRevision(String namespace, String id) {
return String.join("_", Arrays.asList(
namespace,
id
));
}

public static String uidWithoutRevision(Execution execution) {
return String.join("_", Arrays.asList(
execution.getNamespace(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
package io.kestra.core.runners;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;

import java.util.Collection;
import java.util.Optional;

public interface FlowExecutorInterface {
Flow findById(String namespace, String id, Optional<Integer> revision, String fromNamespace, String fromId);
Collection<Flow> allLastVersion();

Optional<Flow> findById(String namespace, String id, Optional<Integer> revision, String fromNamespace, String fromId);

default Optional<Flow> findByExecution(Execution execution) {
return this.findById(
execution.getNamespace(),
execution.getFlowId(),
Optional.of(execution.getFlowRevision()),
null,
null
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;

import java.util.Collection;
import java.util.Optional;

public class MemoryFlowExecutor implements FlowExecutorInterface {
Expand All @@ -13,8 +14,12 @@ public MemoryFlowExecutor(FlowRepositoryInterface flowRepositoryInterface) {
}

@Override
public Flow findById(String namespace, String id, Optional<Integer> revision, String fromNamespace, String flowId) {
return flowRepositoryInterface.findById(namespace, id, revision)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + namespace + "." + id + "' with revision + '" + revision + "'"));
public Collection<Flow> allLastVersion() {
return flowRepositoryInterface.findAll();
}

@Override
public Optional<Flow> findById(String namespace, String id, Optional<Integer> revision, String fromNamespace, String fromId) {
return flowRepositoryInterface.findById(namespace, id, revision);
}
}
3 changes: 2 additions & 1 deletion core/src/main/java/io/kestra/core/tasks/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl
this.revision != null ? Optional.of(this.revision) : Optional.empty(),
flowVars.get("namespace"),
flowVars.get("id")
);
)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + namespace + "." + id + "' with revision + '" + revision + "'"));

return runnerUtils
.newExecution(
Expand Down
19 changes: 6 additions & 13 deletions core/src/main/java/io/kestra/core/tasks/flows/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;

import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -190,12 +187,8 @@ public Template.Output outputs(RunContext runContext, Execution execution, TaskR
protected io.kestra.core.models.templates.Template findTemplate(ApplicationContext applicationContext) throws IllegalVariableEvaluationException {
TemplateExecutorInterface templateExecutor = applicationContext.getBean(TemplateExecutorInterface.class);

io.kestra.core.models.templates.Template template = templateExecutor.findById(this.namespace, this.templateId);
if (template == null) {
throw new IllegalVariableEvaluationException("Can't find flow template '" + this.namespace + "." + this.templateId + "'");
}

return template;
return templateExecutor.findById(this.namespace, this.templateId)
.orElseThrow(() -> new IllegalVariableEvaluationException("Can't find flow template '" + this.namespace + "." + this.templateId + "'"));
}

@SuperBuilder(toBuilder = true)
Expand Down Expand Up @@ -280,15 +273,15 @@ void onStartup(final StartupEvent event) {
}

public interface TemplateExecutorInterface {
io.kestra.core.models.templates.Template findById(String namespace, String templateId);
Optional<io.kestra.core.models.templates.Template> findById(String namespace, String templateId);
}

public static class MemoryTemplateExecutor implements io.kestra.core.tasks.flows.Template.TemplateExecutorInterface {
@Inject
private TemplateRepositoryInterface templateRepository;

public io.kestra.core.models.templates.Template findById(String namespace, String templateId) {
return this.templateRepository.findById(namespace, templateId).orElse(null);
public Optional<io.kestra.core.models.templates.Template> findById(String namespace, String templateId) {
return this.templateRepository.findById(namespace, templateId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,68 @@
import io.kestra.core.models.flows.Flow;
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.services.FlowService;
import io.kestra.runner.kafka.services.SafeKeyValueStore;
import io.micronaut.context.ApplicationContext;
import io.kestra.core.utils.Await;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

import java.util.Optional;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;

@Slf4j
@Singleton
public class KafkaFlowExecutor implements FlowExecutorInterface {
private final FlowService flowService;
private final SafeKeyValueStore<String, Flow> store;
@Inject
private FlowService flowService;
private Map<String, Flow> flows;
private Map<String, Flow> flowsLast;

public KafkaFlowExecutor(ReadOnlyKeyValueStore<String, Flow> store, String name, ApplicationContext applicationContext) {
this.store = new SafeKeyValueStore<>(store, name);
this.flowService = applicationContext.getBean(FlowService.class);
public synchronized void setFlows(List<Flow> flows) {
this.flows = flows
.stream()
.map(flow -> new AbstractMap.SimpleEntry<>(
flow.uid(),
flow
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

this.flowsLast = flowService.keepLastVersion(flows)
.stream()
.map(flow -> new AbstractMap.SimpleEntry<>(
flow.uidWithoutRevision(),
flow
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@SneakyThrows
private void await() {
if (flows == null) {
Await.until(() -> this.flows != null, Duration.ofMillis(100), Duration.ofMinutes(5));
}
}

@Override
public Collection<Flow> allLastVersion() {
this.await();

return this.flowsLast.values();
}

@Override
public Flow findById(String namespace, String id, Optional<Integer> revision, String fromNamespace, String fromId) {
public Optional<Flow> findById(String namespace, String id, Optional<Integer> revision, String fromNamespace, String fromId) {
this.await();

if (revision.isPresent()) {
return this.store.get(Flow.uid(namespace, id, revision))
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + namespace + "." + id + "'"));
} else {
return flowService.keepLastVersion(
this.store.all(),
namespace,
id
);
String uid = Flow.uid(namespace, id, revision);

return Optional.ofNullable(this.flows.get(uid));
}

String uid = Flow.uidWithoutRevision(namespace, id);

return Optional.ofNullable(this.flowsLast.get(uid));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.DefaultScheduler;
import io.kestra.core.schedulers.SchedulerExecutionWithTrigger;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.utils.IdUtils;
import io.kestra.runner.kafka.configs.TopicsConfig;
import io.kestra.runner.kafka.serializers.JsonSerde;
import io.kestra.runner.kafka.services.*;
import io.kestra.runner.kafka.services.KafkaAdminService;
import io.kestra.runner.kafka.services.KafkaProducerService;
import io.kestra.runner.kafka.services.KafkaStreamService;
import io.kestra.runner.kafka.services.KafkaStreamsBuilder;
import io.kestra.runner.kafka.streams.GlobalStateLockProcessor;
import io.kestra.runner.kafka.streams.GlobalStateProcessor;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Replaces;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -30,15 +33,13 @@
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.Stores;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import jakarta.inject.Singleton;

@KafkaQueueEnabled
@Singleton
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,49 @@
package io.kestra.runner.kafka;

import io.kestra.runner.kafka.services.SafeKeyValueStore;
import io.kestra.core.models.templates.Template;
import io.kestra.core.utils.Await;
import io.micronaut.context.annotation.Replaces;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import io.kestra.core.models.templates.Template;

import java.time.Duration;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

@Slf4j
@KafkaQueueEnabled
@Replaces(io.kestra.core.tasks.flows.Template.MemoryTemplateExecutor.class)
@Requires(property = "kestra.server-type", value = "EXECUTOR")
@Requires(property = "kestra.server-type", pattern = "(EXECUTOR|STANDALONE)")
@Singleton
public class KafkaTemplateExecutor implements io.kestra.core.tasks.flows.Template.TemplateExecutorInterface {
private final SafeKeyValueStore<String, Template> store;
private Map<String, Template> templates;


public KafkaTemplateExecutor(ReadOnlyKeyValueStore<String, Template> store, String name) {
this.store = new SafeKeyValueStore<>(store, name);
public synchronized void setTemplates(List<Template> templates) {
this.templates = templates
.stream()
.map(template -> new AbstractMap.SimpleEntry<>(
template.uid(),
template
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public Template findById(String namespace, String templateId) {
return this.store.get(Template.uid(namespace, templateId)).orElse(null);
@SneakyThrows
private void await() {
if (templates == null) {
Await.until(() -> this.templates != null, Duration.ofMillis(100), Duration.ofMinutes(5));
}
}

public Optional<Template> findById(String namespace, String templateId) {
this.await();

return Optional.ofNullable(this.templates.get(Template.uid(namespace, templateId)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.services.FlowService;
import io.kestra.runner.kafka.KafkaFlowExecutor;
import io.kestra.runner.kafka.KafkaQueueEnabled;
import io.kestra.runner.kafka.serializers.JsonSerde;
import io.kestra.runner.kafka.services.KafkaAdminService;
import io.kestra.runner.kafka.services.KafkaStreamSourceService;
import io.kestra.runner.kafka.services.KafkaStreamsBuilder;
import io.kestra.runner.kafka.streams.FlowTriggerWithExecutionTransformer;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -34,13 +34,11 @@ public class ExecutorFlowTrigger implements KafkaExecutorInterface {
private FlowService flowService;

@Inject
private KafkaStreamSourceService kafkaStreamSourceService;
private KafkaFlowExecutor kafkaFlowExecutor;

public StreamsBuilder topology() {
StreamsBuilder builder = new KafkaStreamsBuilder();

kafkaStreamSourceService.flowGlobalKTable(builder);

// trigger
builder.addStateStore(
Stores.keyValueStoreBuilder(
Expand All @@ -62,6 +60,7 @@ public StreamsBuilder topology() {
.transformValues(
() -> new FlowTriggerWithExecutionTransformer(
TRIGGER_MULTIPLE_STATE_STORE_NAME,
kafkaFlowExecutor,
flowService
),
Named.as("ExecutorFlowTrigger.transformToExecutionList"),
Expand Down
Loading

0 comments on commit 1d4de46

Please sign in to comment.