From 1d4de46cf663c1dc9fa77337c9372977e5d6d0ad Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Wed, 9 Feb 2022 21:10:13 +0100 Subject: [PATCH] feat(kafka-runner): used an in memory storage for flow & template to avoid performance issue on deserialization from rockdb --- .../io/kestra/core/models/flows/Flow.java | 7 ++ .../core/runners/FlowExecutorInterface.java | 16 +++- .../core/runners/MemoryFlowExecutor.java | 11 ++- .../java/io/kestra/core/tasks/flows/Flow.java | 3 +- .../io/kestra/core/tasks/flows/Template.java | 19 ++--- .../runner/kafka/KafkaFlowExecutor.java | 70 ++++++++++++----- .../kestra/runner/kafka/KafkaScheduler.java | 11 +-- .../runner/kafka/KafkaTemplateExecutor.java | 43 +++++++--- .../kafka/executors/ExecutorFlowTrigger.java | 7 +- .../runner/kafka/executors/ExecutorMain.java | 27 +------ .../runner/kafka/executors/ExecutorStore.java | 69 ++++++++++++++++ .../executors/ExecutorTriggerCleaner.java | 2 - .../services/KafkaStreamSourceService.java | 35 +++------ .../DeduplicationPurgeTransformer.java | 3 +- .../streams/DeduplicationTransformer.java | 3 +- .../ExecutorFromExecutionTransformer.java | 3 +- .../streams/ExecutorJoinerTransformer.java | 3 +- .../ExecutorKilledJoinerTransformer.java | 3 +- .../kafka/streams/FlowJoinerTransformer.java | 37 ++++----- .../FlowTriggerWithExecutionTransformer.java | 50 ++++++------ .../streams/FlowWithTriggerTransformer.java | 24 ++---- .../streams/GlobalInMemoryStateProcessor.java | 53 +++++++++++++ .../streams/GlobalStateLockProcessor.java | 17 ++-- .../kafka/streams/GlobalStateProcessor.java | 18 +++-- .../kafka/streams/StateStoreTransformer.java | 3 +- .../streams/WorkerInstanceTransformer.java | 8 +- .../WorkerTaskExecutionTransformer.java | 18 ++--- .../runner/kafka/AbstractKafkaRunnerTest.java | 11 ++- .../runner/kafka/KafkaExecutorTest.java | 78 ++++++++++++------- .../runner/kafka/KafkaFlowListenersTest.java | 2 + .../kestra/runner/kafka/KafkaRunnerTest.java | 6 +- .../runner/kafka/KafkaSchedulerTest.java | 2 + .../kafka/services/SafeKeyValueStoreTest.java | 16 +++- .../kestra/runner/memory/MemoryExecutor.java | 2 +- 34 files changed, 431 insertions(+), 249 deletions(-) create mode 100644 runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorStore.java create mode 100644 runner-kafka/src/main/java/io/kestra/runner/kafka/streams/GlobalInMemoryStateProcessor.java diff --git a/core/src/main/java/io/kestra/core/models/flows/Flow.java b/core/src/main/java/io/kestra/core/models/flows/Flow.java index 70be01e38f9..7f60b48522d 100644 --- a/core/src/main/java/io/kestra/core/models/flows/Flow.java +++ b/core/src/main/java/io/kestra/core/models/flows/Flow.java @@ -118,6 +118,13 @@ public static String uid(String namespace, String id, Optional revision )); } + public static String uidWithoutRevision(String namespace, String id) { + return String.join("_", Arrays.asList( + namespace, + id + )); + } + public static String uidWithoutRevision(Execution execution) { return String.join("_", Arrays.asList( execution.getNamespace(), diff --git a/core/src/main/java/io/kestra/core/runners/FlowExecutorInterface.java b/core/src/main/java/io/kestra/core/runners/FlowExecutorInterface.java index 00fed44e47d..64e8a347500 100644 --- a/core/src/main/java/io/kestra/core/runners/FlowExecutorInterface.java +++ b/core/src/main/java/io/kestra/core/runners/FlowExecutorInterface.java @@ -1,9 +1,23 @@ package io.kestra.core.runners; +import io.kestra.core.models.executions.Execution; import io.kestra.core.models.flows.Flow; +import java.util.Collection; import java.util.Optional; public interface FlowExecutorInterface { - Flow findById(String namespace, String id, Optional revision, String fromNamespace, String fromId); + Collection allLastVersion(); + + Optional findById(String namespace, String id, Optional revision, String fromNamespace, String fromId); + + default Optional findByExecution(Execution execution) { + return this.findById( + execution.getNamespace(), + execution.getFlowId(), + Optional.of(execution.getFlowRevision()), + null, + null + ); + } } diff --git a/core/src/main/java/io/kestra/core/runners/MemoryFlowExecutor.java b/core/src/main/java/io/kestra/core/runners/MemoryFlowExecutor.java index 595c88c8076..83cf48b9447 100644 --- a/core/src/main/java/io/kestra/core/runners/MemoryFlowExecutor.java +++ b/core/src/main/java/io/kestra/core/runners/MemoryFlowExecutor.java @@ -3,6 +3,7 @@ import io.kestra.core.models.flows.Flow; import io.kestra.core.repositories.FlowRepositoryInterface; +import java.util.Collection; import java.util.Optional; public class MemoryFlowExecutor implements FlowExecutorInterface { @@ -13,8 +14,12 @@ public MemoryFlowExecutor(FlowRepositoryInterface flowRepositoryInterface) { } @Override - public Flow findById(String namespace, String id, Optional revision, String fromNamespace, String flowId) { - return flowRepositoryInterface.findById(namespace, id, revision) - .orElseThrow(() -> new IllegalStateException("Unable to find flow '" + namespace + "." + id + "' with revision + '" + revision + "'")); + public Collection allLastVersion() { + return flowRepositoryInterface.findAll(); + } + + @Override + public Optional findById(String namespace, String id, Optional revision, String fromNamespace, String fromId) { + return flowRepositoryInterface.findById(namespace, id, revision); } } diff --git a/core/src/main/java/io/kestra/core/tasks/flows/Flow.java b/core/src/main/java/io/kestra/core/tasks/flows/Flow.java index feb1a1067e4..a516385f17a 100644 --- a/core/src/main/java/io/kestra/core/tasks/flows/Flow.java +++ b/core/src/main/java/io/kestra/core/tasks/flows/Flow.java @@ -122,7 +122,8 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl this.revision != null ? Optional.of(this.revision) : Optional.empty(), flowVars.get("namespace"), flowVars.get("id") - ); + ) + .orElseThrow(() -> new IllegalStateException("Unable to find flow '" + namespace + "." + id + "' with revision + '" + revision + "'")); return runnerUtils .newExecution( diff --git a/core/src/main/java/io/kestra/core/tasks/flows/Template.java b/core/src/main/java/io/kestra/core/tasks/flows/Template.java index 2267f18a900..ca8844ed97e 100644 --- a/core/src/main/java/io/kestra/core/tasks/flows/Template.java +++ b/core/src/main/java/io/kestra/core/tasks/flows/Template.java @@ -27,10 +27,7 @@ import lombok.experimental.SuperBuilder; import lombok.extern.slf4j.Slf4j; -import java.util.AbstractMap; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -190,12 +187,8 @@ public Template.Output outputs(RunContext runContext, Execution execution, TaskR protected io.kestra.core.models.templates.Template findTemplate(ApplicationContext applicationContext) throws IllegalVariableEvaluationException { TemplateExecutorInterface templateExecutor = applicationContext.getBean(TemplateExecutorInterface.class); - io.kestra.core.models.templates.Template template = templateExecutor.findById(this.namespace, this.templateId); - if (template == null) { - throw new IllegalVariableEvaluationException("Can't find flow template '" + this.namespace + "." + this.templateId + "'"); - } - - return template; + return templateExecutor.findById(this.namespace, this.templateId) + .orElseThrow(() -> new IllegalVariableEvaluationException("Can't find flow template '" + this.namespace + "." + this.templateId + "'")); } @SuperBuilder(toBuilder = true) @@ -280,15 +273,15 @@ void onStartup(final StartupEvent event) { } public interface TemplateExecutorInterface { - io.kestra.core.models.templates.Template findById(String namespace, String templateId); + Optional findById(String namespace, String templateId); } public static class MemoryTemplateExecutor implements io.kestra.core.tasks.flows.Template.TemplateExecutorInterface { @Inject private TemplateRepositoryInterface templateRepository; - public io.kestra.core.models.templates.Template findById(String namespace, String templateId) { - return this.templateRepository.findById(namespace, templateId).orElse(null); + public Optional findById(String namespace, String templateId) { + return this.templateRepository.findById(namespace, templateId); } } diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowExecutor.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowExecutor.java index 1c6d59bc573..99349df2a47 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowExecutor.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowExecutor.java @@ -3,34 +3,68 @@ import io.kestra.core.models.flows.Flow; import io.kestra.core.runners.FlowExecutorInterface; import io.kestra.core.services.FlowService; -import io.kestra.runner.kafka.services.SafeKeyValueStore; -import io.micronaut.context.ApplicationContext; +import io.kestra.core.utils.Await; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import java.util.Optional; +import java.time.Duration; +import java.util.*; +import java.util.stream.Collectors; @Slf4j +@Singleton public class KafkaFlowExecutor implements FlowExecutorInterface { - private final FlowService flowService; - private final SafeKeyValueStore store; + @Inject + private FlowService flowService; + private Map flows; + private Map flowsLast; - public KafkaFlowExecutor(ReadOnlyKeyValueStore store, String name, ApplicationContext applicationContext) { - this.store = new SafeKeyValueStore<>(store, name); - this.flowService = applicationContext.getBean(FlowService.class); + public synchronized void setFlows(List flows) { + this.flows = flows + .stream() + .map(flow -> new AbstractMap.SimpleEntry<>( + flow.uid(), + flow + )) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + this.flowsLast = flowService.keepLastVersion(flows) + .stream() + .map(flow -> new AbstractMap.SimpleEntry<>( + flow.uidWithoutRevision(), + flow + )) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @SneakyThrows + private void await() { + if (flows == null) { + Await.until(() -> this.flows != null, Duration.ofMillis(100), Duration.ofMinutes(5)); + } + } + + @Override + public Collection allLastVersion() { + this.await(); + + return this.flowsLast.values(); } @Override - public Flow findById(String namespace, String id, Optional revision, String fromNamespace, String fromId) { + public Optional findById(String namespace, String id, Optional revision, String fromNamespace, String fromId) { + this.await(); + if (revision.isPresent()) { - return this.store.get(Flow.uid(namespace, id, revision)) - .orElseThrow(() -> new IllegalStateException("Unable to find flow '" + namespace + "." + id + "'")); - } else { - return flowService.keepLastVersion( - this.store.all(), - namespace, - id - ); + String uid = Flow.uid(namespace, id, revision); + + return Optional.ofNullable(this.flows.get(uid)); } + + String uid = Flow.uidWithoutRevision(namespace, id); + + return Optional.ofNullable(this.flowsLast.get(uid)); } } diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaScheduler.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaScheduler.java index 8478729bfb0..8f898891a77 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaScheduler.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaScheduler.java @@ -11,17 +11,20 @@ import io.kestra.core.schedulers.AbstractScheduler; import io.kestra.core.schedulers.DefaultScheduler; import io.kestra.core.schedulers.SchedulerExecutionWithTrigger; -import io.kestra.core.services.ConditionService; import io.kestra.core.services.FlowListenersInterface; import io.kestra.core.utils.IdUtils; import io.kestra.runner.kafka.configs.TopicsConfig; import io.kestra.runner.kafka.serializers.JsonSerde; -import io.kestra.runner.kafka.services.*; +import io.kestra.runner.kafka.services.KafkaAdminService; +import io.kestra.runner.kafka.services.KafkaProducerService; +import io.kestra.runner.kafka.services.KafkaStreamService; +import io.kestra.runner.kafka.services.KafkaStreamsBuilder; import io.kestra.runner.kafka.streams.GlobalStateLockProcessor; import io.kestra.runner.kafka.streams.GlobalStateProcessor; import io.micronaut.context.ApplicationContext; import io.micronaut.context.annotation.Replaces; import io.micronaut.inject.qualifiers.Qualifiers; +import jakarta.inject.Singleton; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -30,15 +33,13 @@ import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.kstream.*; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.Stores; import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import jakarta.inject.Singleton; @KafkaQueueEnabled @Singleton diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaTemplateExecutor.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaTemplateExecutor.java index 1ed8261290c..2e792bfd699 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaTemplateExecutor.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaTemplateExecutor.java @@ -1,24 +1,49 @@ package io.kestra.runner.kafka; -import io.kestra.runner.kafka.services.SafeKeyValueStore; +import io.kestra.core.models.templates.Template; +import io.kestra.core.utils.Await; import io.micronaut.context.annotation.Replaces; import io.micronaut.context.annotation.Requires; +import jakarta.inject.Singleton; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import io.kestra.core.models.templates.Template; + +import java.time.Duration; +import java.util.AbstractMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; @Slf4j @KafkaQueueEnabled @Replaces(io.kestra.core.tasks.flows.Template.MemoryTemplateExecutor.class) -@Requires(property = "kestra.server-type", value = "EXECUTOR") +@Requires(property = "kestra.server-type", pattern = "(EXECUTOR|STANDALONE)") +@Singleton public class KafkaTemplateExecutor implements io.kestra.core.tasks.flows.Template.TemplateExecutorInterface { - private final SafeKeyValueStore store; + private Map templates; + - public KafkaTemplateExecutor(ReadOnlyKeyValueStore store, String name) { - this.store = new SafeKeyValueStore<>(store, name); + public synchronized void setTemplates(List