From 95d2b2d6ad259a149c0154385a67209309d2a1c0 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Fri, 28 Jan 2022 10:27:49 +0100 Subject: [PATCH] feat(kafka-runner): create multiple stream thread to separate load (#452) --- .../cli/commands/servers/ExecutorCommand.java | 8 +- .../commands/servers/StandAloneCommand.java | 9 +- cli/src/main/resources/application.yml | 2 +- .../core/models/executions/Execution.java | 4 +- .../core/runners/ExecutorInterface.java | 7 + ...ractExecutor.java => ExecutorService.java} | 45 +- .../core/runners/MemoryFlowExecutor.java | 2 +- .../kestra/core/runners/StandAloneRunner.java | 6 +- .../scripts/runners/DockerScriptRunner.java | 4 +- .../core/runners/FlowTriggerCaseTest.java | 11 +- .../io/kestra/runner/kafka/KafkaExecutor.java | 887 +----------------- .../kafka/executors/ExecutorFlowTrigger.java | 98 ++ .../runner/kafka/executors/ExecutorMain.java | 700 ++++++++++++++ .../executors/ExecutorWorkerRunning.java | 140 +++ .../executors/KafkaExecutorInterface.java | 15 + .../kafka/services/KafkaStreamService.java | 4 + .../streams/ExecutorNextTransformer.java | 15 +- .../streams/WorkerInstanceTransformer.java | 7 +- .../runner/kafka/KafkaExecutorTest.java | 137 ++- .../kestra/runner/kafka/KafkaRunnerTest.java | 2 +- .../src/test/resources/application.yml | 2 +- .../kestra/runner/memory/MemoryExecutor.java | 95 +- 22 files changed, 1227 insertions(+), 973 deletions(-) create mode 100644 core/src/main/java/io/kestra/core/runners/ExecutorInterface.java rename core/src/main/java/io/kestra/core/runners/{AbstractExecutor.java => ExecutorService.java} (95%) create mode 100644 runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorFlowTrigger.java create mode 100644 runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorMain.java create mode 100644 runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorWorkerRunning.java create mode 100644 runner-kafka/src/main/java/io/kestra/runner/kafka/executors/KafkaExecutorInterface.java diff --git a/cli/src/main/java/io/kestra/cli/commands/servers/ExecutorCommand.java b/cli/src/main/java/io/kestra/cli/commands/servers/ExecutorCommand.java index ff0e9e98b5..2ef6f62b01 100644 --- a/cli/src/main/java/io/kestra/cli/commands/servers/ExecutorCommand.java +++ b/cli/src/main/java/io/kestra/cli/commands/servers/ExecutorCommand.java @@ -1,11 +1,11 @@ package io.kestra.cli.commands.servers; import com.google.common.collect.ImmutableMap; +import io.kestra.core.runners.ExecutorInterface; import io.micronaut.context.ApplicationContext; import lombok.extern.slf4j.Slf4j; import io.kestra.cli.AbstractCommand; import io.kestra.core.models.ServerType; -import io.kestra.core.runners.AbstractExecutor; import io.kestra.core.utils.Await; import picocli.CommandLine; @@ -36,12 +36,12 @@ public static Map propertiesOverrides() { public Integer call() throws Exception { super.call(); - AbstractExecutor abstractExecutor = applicationContext.getBean(AbstractExecutor.class); - abstractExecutor.run(); + ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class); + executorService.run(); log.info("Executor started"); - this.shutdownHook(abstractExecutor::close); + this.shutdownHook(executorService::close); Await.until(() -> !this.applicationContext.isRunning()); diff --git a/cli/src/main/java/io/kestra/cli/commands/servers/StandAloneCommand.java b/cli/src/main/java/io/kestra/cli/commands/servers/StandAloneCommand.java index dbe267bd4c..ba199309ca 100644 --- a/cli/src/main/java/io/kestra/cli/commands/servers/StandAloneCommand.java +++ b/cli/src/main/java/io/kestra/cli/commands/servers/StandAloneCommand.java @@ -1,22 +1,19 @@ package io.kestra.cli.commands.servers; import com.google.common.collect.ImmutableMap; -import io.micronaut.context.ApplicationContext; -import lombok.extern.slf4j.Slf4j; import io.kestra.cli.AbstractCommand; import io.kestra.core.models.ServerType; import io.kestra.core.repositories.LocalFlowRepositoryLoader; -import io.kestra.core.runners.AbstractExecutor; import io.kestra.core.runners.StandAloneRunner; import io.kestra.core.utils.Await; -import io.kestra.runner.kafka.KafkaExecutor; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import lombok.extern.slf4j.Slf4j; import picocli.CommandLine; import java.io.File; import java.io.IOException; import java.util.Map; -import java.util.Optional; -import jakarta.inject.Inject; @CommandLine.Command( name = "standalone", diff --git a/cli/src/main/resources/application.yml b/cli/src/main/resources/application.yml index 9628acc4ef..5cb48a5964 100644 --- a/cli/src/main/resources/application.yml +++ b/cli/src/main/resources/application.yml @@ -111,7 +111,7 @@ kestra: segment.bytes: "10485760" executor: - name: "${kestra.kafka.defaults.topic-prefix}executor-executor-changelog" + name: "${kestra.kafka.defaults.topic-prefix}executor_main-executor-changelog" cls: io.kestra.core.runners.Executor properties: cleanup.policy: "delete,compact" diff --git a/core/src/main/java/io/kestra/core/models/executions/Execution.java b/core/src/main/java/io/kestra/core/models/executions/Execution.java index bc82853a24..a2d76f35df 100644 --- a/core/src/main/java/io/kestra/core/models/executions/Execution.java +++ b/core/src/main/java/io/kestra/core/models/executions/Execution.java @@ -434,13 +434,13 @@ public boolean hasTaskRunJoinable(TaskRun taskRun) { } /** - * Convert an exception on {@link io.kestra.core.runners.AbstractExecutor} and add log to the current + * Convert an exception on Executor and add log to the current * {@code RUNNING} taskRun, on the lastAttempts. * If no Attempt is found, we create one (must be nominal case). * The executor will catch the {@code FAILED} taskRun emitted and will failed the execution. * In the worst case, we FAILED the execution (only from {@link io.kestra.core.models.triggers.types.Flow}). * - * @param e the exception throw from {@link io.kestra.core.runners.AbstractExecutor} + * @param e the exception throw from Executor * @return a new execution with taskrun failed if possible or execution failed is other case */ public FailedExecutionWithLog failedExecutionFromExecutor(Exception e) { diff --git a/core/src/main/java/io/kestra/core/runners/ExecutorInterface.java b/core/src/main/java/io/kestra/core/runners/ExecutorInterface.java new file mode 100644 index 0000000000..97090f6931 --- /dev/null +++ b/core/src/main/java/io/kestra/core/runners/ExecutorInterface.java @@ -0,0 +1,7 @@ +package io.kestra.core.runners; + +import java.io.Closeable; + +public interface ExecutorInterface extends Closeable, Runnable { + +} diff --git a/core/src/main/java/io/kestra/core/runners/AbstractExecutor.java b/core/src/main/java/io/kestra/core/runners/ExecutorService.java similarity index 95% rename from core/src/main/java/io/kestra/core/runners/AbstractExecutor.java rename to core/src/main/java/io/kestra/core/runners/ExecutorService.java index c7356b1d2b..eddce20809 100644 --- a/core/src/main/java/io/kestra/core/runners/AbstractExecutor.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutorService.java @@ -12,34 +12,43 @@ import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.services.ConditionService; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; -import java.io.Closeable; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import static io.kestra.core.utils.Rethrow.throwFunction; -import static io.kestra.core.utils.Rethrow.throwPredicate; -public abstract class AbstractExecutor implements Runnable, Closeable { - protected static final Logger log = org.slf4j.LoggerFactory.getLogger(AbstractExecutor.class); +@Singleton +@Slf4j +public class ExecutorService { + @Inject + protected ApplicationContext applicationContext; + @Inject protected RunContextFactory runContextFactory; + + @Inject protected MetricRegistry metricRegistry; + + @Inject protected ConditionService conditionService; + protected FlowExecutorInterface flowExecutorInterface; - public AbstractExecutor( - RunContextFactory runContextFactory, - MetricRegistry metricRegistry, - ConditionService conditionService - ) { - this.runContextFactory = runContextFactory; - this.metricRegistry = metricRegistry; - this.conditionService = conditionService; + protected FlowExecutorInterface flowExecutorInterface() { + // bean is injected late, so we need to wait + if (this.flowExecutorInterface == null) { + this.flowExecutorInterface = applicationContext.getBean(FlowExecutorInterface.class); + } + + return this.flowExecutorInterface; } public Executor process(Executor executor) { @@ -250,7 +259,7 @@ private List saveFlowableOutput( ImmutableMap.of() ); } catch (Exception e) { - log.warn("Unable to save output on taskRun '{}'", taskRun, e); + executor.getFlow().logger().warn("Unable to save output on taskRun '{}'", taskRun, e); } return taskRun; @@ -510,7 +519,7 @@ private Executor handleFlowTask(final Executor executor) { ); try { - Execution execution = flowTask.createExecution(runContext, flowExecutorInterface); + Execution execution = flowTask.createExecution(runContext, flowExecutorInterface()); WorkerTaskExecution workerTaskExecution = WorkerTaskExecution.builder() .task(flowTask) @@ -552,7 +561,7 @@ private Executor handleFlowTask(final Executor executor) { return resultExecutor; } - protected static void log(Logger log, Boolean in, WorkerTask value) { + public void log(Logger log, Boolean in, WorkerTask value) { log.debug( "{} {} : {}", in ? "<< IN " : ">> OUT", @@ -561,7 +570,7 @@ protected static void log(Logger log, Boolean in, WorkerTask value) { ); } - protected static void log(Logger log, Boolean in, WorkerTaskResult value) { + public void log(Logger log, Boolean in, WorkerTaskResult value) { log.debug( "{} {} : {}", in ? "<< IN " : ">> OUT", @@ -570,7 +579,7 @@ protected static void log(Logger log, Boolean in, WorkerTaskResult value) { ); } - protected static void log(Logger log, Boolean in, Execution value) { + public void log(Logger log, Boolean in, Execution value) { log.debug( "{} {} [key='{}']\n{}", in ? "<< IN " : ">> OUT", @@ -580,7 +589,7 @@ protected static void log(Logger log, Boolean in, Execution value) { ); } - protected static void log(Logger log, Boolean in, Executor value) { + public void log(Logger log, Boolean in, Executor value) { log.debug( "{} {} [key='{}', from='{}', offset='{}']\n{}", in ? "<< IN " : ">> OUT", diff --git a/core/src/main/java/io/kestra/core/runners/MemoryFlowExecutor.java b/core/src/main/java/io/kestra/core/runners/MemoryFlowExecutor.java index 38b5d2066b..595c88c807 100644 --- a/core/src/main/java/io/kestra/core/runners/MemoryFlowExecutor.java +++ b/core/src/main/java/io/kestra/core/runners/MemoryFlowExecutor.java @@ -6,7 +6,7 @@ import java.util.Optional; public class MemoryFlowExecutor implements FlowExecutorInterface { - private FlowRepositoryInterface flowRepositoryInterface; + private final FlowRepositoryInterface flowRepositoryInterface; public MemoryFlowExecutor(FlowRepositoryInterface flowRepositoryInterface) { this.flowRepositoryInterface = flowRepositoryInterface; diff --git a/core/src/main/java/io/kestra/core/runners/StandAloneRunner.java b/core/src/main/java/io/kestra/core/runners/StandAloneRunner.java index 575aee5359..88484bd724 100644 --- a/core/src/main/java/io/kestra/core/runners/StandAloneRunner.java +++ b/core/src/main/java/io/kestra/core/runners/StandAloneRunner.java @@ -11,13 +11,13 @@ import java.io.Closeable; import java.io.IOException; -import java.util.concurrent.ExecutorService; + import jakarta.inject.Inject; import jakarta.inject.Named; @Slf4j public class StandAloneRunner implements RunnerInterface, Closeable { - @Setter private ExecutorService poolExecutor; + @Setter private java.util.concurrent.ExecutorService poolExecutor; @Setter protected int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors()); @Setter protected boolean schedulerEnabled = true; @@ -47,7 +47,7 @@ public void run() { poolExecutor = executorsUtils.cachedThreadPool("standalone-runner"); - poolExecutor.execute(applicationContext.getBean(AbstractExecutor.class)); + poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class)); Worker worker = new Worker(applicationContext, workerThread); applicationContext.registerSingleton(worker); diff --git a/core/src/main/java/io/kestra/core/tasks/scripts/runners/DockerScriptRunner.java b/core/src/main/java/io/kestra/core/tasks/scripts/runners/DockerScriptRunner.java index 5a384265a0..865ede5318 100644 --- a/core/src/main/java/io/kestra/core/tasks/scripts/runners/DockerScriptRunner.java +++ b/core/src/main/java/io/kestra/core/tasks/scripts/runners/DockerScriptRunner.java @@ -23,6 +23,7 @@ import io.micronaut.context.ApplicationContext; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; +import org.apache.hc.core5.http.ConnectionClosedException; import org.slf4j.Logger; import java.io.IOException; @@ -186,7 +187,8 @@ public RunResult run( .maxAttempt(5) .build() ).run( - InternalServerErrorException.class, + (bool, throwable) -> throwable instanceof InternalServerErrorException || + throwable.getCause() instanceof ConnectionClosedException, () -> { pull .withTag(!imageParse.tag.equals("") ? imageParse.tag : "latest") diff --git a/core/src/test/java/io/kestra/core/runners/FlowTriggerCaseTest.java b/core/src/test/java/io/kestra/core/runners/FlowTriggerCaseTest.java index 87fdd7bd6a..bb2c3a7107 100644 --- a/core/src/test/java/io/kestra/core/runners/FlowTriggerCaseTest.java +++ b/core/src/test/java/io/kestra/core/runners/FlowTriggerCaseTest.java @@ -5,17 +5,17 @@ import io.kestra.core.models.flows.State; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import jakarta.inject.Singleton; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import jakarta.inject.Inject; -import jakarta.inject.Named; -import jakarta.inject.Singleton; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.is; @Singleton public class FlowTriggerCaseTest { @@ -49,7 +49,8 @@ public void trigger() throws InterruptedException, TimeoutException { logEntryQueue.receive(logEntry -> { if (logEntry.getMessage().contains("Failed to trigger flow") && - logEntry.getTriggerId().equals("listen-flow-invalid") + logEntry.getTriggerId() != null && + logEntry.getTriggerId().equals("listen-flow-invalid") ) { countDownLatch.countDown(); } diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaExecutor.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaExecutor.java index 22f2c0781b..64902a86f5 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaExecutor.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaExecutor.java @@ -1,855 +1,51 @@ package io.kestra.runner.kafka; -import io.kestra.core.metrics.MetricRegistry; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.ExecutionKilled; import io.kestra.core.models.executions.LogEntry; -import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.Flow; -import io.kestra.core.models.flows.State; import io.kestra.core.models.templates.Template; import io.kestra.core.models.triggers.Trigger; -import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow; -import io.kestra.core.queues.QueueService; import io.kestra.core.runners.*; -import io.kestra.core.services.ConditionService; -import io.kestra.core.services.FlowService; -import io.kestra.runner.kafka.serializers.JsonSerde; +import io.kestra.runner.kafka.executors.KafkaExecutorInterface; import io.kestra.runner.kafka.services.KafkaAdminService; import io.kestra.runner.kafka.services.KafkaStreamService; import io.kestra.runner.kafka.services.KafkaStreamSourceService; -import io.kestra.runner.kafka.services.KafkaStreamsBuilder; -import io.kestra.runner.kafka.streams.*; +import io.kestra.runner.kafka.streams.ExecutorFlowTrigger; import io.micronaut.context.ApplicationContext; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.*; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.Stores; -import org.slf4j.event.Level; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; import java.time.Duration; -import java.util.HashMap; -import java.util.Map; +import java.util.List; import java.util.Properties; import java.util.stream.Collectors; -import jakarta.inject.Inject; -import jakarta.inject.Singleton; @KafkaQueueEnabled @Singleton -public class KafkaExecutor extends AbstractExecutor implements Closeable { - private static final String EXECUTOR_STATE_STORE_NAME = "executor"; - private static final String WORKERTASK_DEDUPLICATION_STATE_STORE_NAME = "workertask_deduplication"; - private static final String TRIGGER_DEDUPLICATION_STATE_STORE_NAME = "trigger_deduplication"; - public static final String TRIGGER_MULTIPLE_STATE_STORE_NAME = "trigger_multiplecondition"; - private static final String NEXTS_DEDUPLICATION_STATE_STORE_NAME = "next_deduplication"; - public static final String WORKER_RUNNING_STATE_STORE_NAME = "worker_running"; - public static final String WORKERINSTANCE_STATE_STORE_NAME = "worker_instance"; - public static final String TOPIC_EXECUTOR_WORKERINSTANCE = "executorworkerinstance"; +@Slf4j +public class KafkaExecutor implements ExecutorInterface { + private List streams; + @Inject protected ApplicationContext applicationContext; - protected KafkaStreamService kafkaStreamService; - protected KafkaAdminService kafkaAdminService; - protected FlowService flowService; - protected KafkaStreamSourceService kafkaStreamSourceService; - protected QueueService queueService; - - protected KafkaStreamService.Stream stream; @Inject - public KafkaExecutor( - ApplicationContext applicationContext, - RunContextFactory runContextFactory, - KafkaStreamService kafkaStreamService, - KafkaAdminService kafkaAdminService, - MetricRegistry metricRegistry, - FlowService flowService, - ConditionService conditionService, - KafkaStreamSourceService kafkaStreamSourceService, - QueueService queueService - ) { - super(runContextFactory, metricRegistry, conditionService); - - this.applicationContext = applicationContext; - this.kafkaStreamService = kafkaStreamService; - this.kafkaAdminService = kafkaAdminService; - this.flowService = flowService; - this.kafkaStreamSourceService = kafkaStreamSourceService; - this.queueService = queueService; - } - - public StreamsBuilder topology() { - StreamsBuilder builder = new KafkaStreamsBuilder(); - - // executor - builder.addStateStore(Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore(EXECUTOR_STATE_STORE_NAME), - Serdes.String(), - JsonSerde.of(Executor.class) - )); - - // WorkerTask deduplication - builder.addStateStore(Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore(WORKERTASK_DEDUPLICATION_STATE_STORE_NAME), - Serdes.String(), - Serdes.String() - )); - - // next deduplication - builder.addStateStore(Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore(NEXTS_DEDUPLICATION_STATE_STORE_NAME), - Serdes.String(), - JsonSerde.of(ExecutorNextTransformer.Store.class) - )); - - // trigger deduplication - builder.addStateStore(Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore(TRIGGER_DEDUPLICATION_STATE_STORE_NAME), - Serdes.String(), - Serdes.String() - )); - - // declare common stream - KStream workerTaskResultKStream = this.workerTaskResultKStream(builder); - KStream executorKStream = this.executorKStream(builder); - - // join with killed - KStream executionKilledKStream = this.executionKilledKStream(builder); - KStream executionWithKilled = this.joinExecutionKilled(executionKilledKStream, executorKStream); - - // join with WorkerResult - KStream executionKStream = this.joinWorkerResult(workerTaskResultKStream, executionWithKilled); - - // handle state on execution - GlobalKTable flowKTable = kafkaStreamSourceService.flowGlobalKTable(builder); - GlobalKTable templateKTable = kafkaStreamSourceService.templateGlobalKTable(builder); - KStream stream = kafkaStreamSourceService.executorWithFlow(executionKStream, true); - - stream = this.handleExecutor(stream); - - // save execution - this.toExecution(stream); - this.toWorkerTask(stream); - this.toWorkerTaskResult(stream); - - // trigger - builder.addStateStore( - Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore(TRIGGER_MULTIPLE_STATE_STORE_NAME), - Serdes.String(), - JsonSerde.of(MultipleConditionWindow.class) - ) - ); - - KStream flowWithTriggerStream = this.flowWithTriggerStream(builder); - - this.toExecutorFlowTriggerTopic(stream); - this.handleFlowTrigger(flowWithTriggerStream); - - // task Flow - KTable workerTaskExecutionKTable = this.workerTaskExecutionStream(builder); - - KStream workerTaskExecutionKStream = this.deduplicateWorkerTaskExecution(stream); - this.toWorkerTaskExecution(workerTaskExecutionKStream); - this.workerTaskExecutionToExecution(workerTaskExecutionKStream); - this.handleWorkerTaskExecution(workerTaskExecutionKTable, stream); - - // purge at end - this.purgeExecutor(stream); - - // handle worker running - builder.addGlobalStore( - Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore(WORKERINSTANCE_STATE_STORE_NAME), - Serdes.String(), - JsonSerde.of(WorkerInstance.class) - ), - kafkaAdminService.getTopicName(TOPIC_EXECUTOR_WORKERINSTANCE), - Consumed.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("GlobalStore.ExecutorWorkerInstace"), - () -> new GlobalStateProcessor<>(WORKERINSTANCE_STATE_STORE_NAME) - ); - - GlobalKTable workerTaskRunningKTable = this.workerTaskRunningKStream(builder); - KStream workerInstanceKStream = this.workerInstanceKStream(builder); - - this.purgeWorkerRunning(workerTaskResultKStream); - this.detectNewWorker(workerInstanceKStream, workerTaskRunningKTable); - - return builder; - } - - public KStream executorKStream(StreamsBuilder builder) { - KStream result = builder - .stream( - kafkaAdminService.getTopicName(Execution.class), - Consumed.with(Serdes.String(), JsonSerde.of(Execution.class)).withName("Executor.fromExecution") - ) - .filter((key, value) -> value != null, Named.as("Executor.filterNotNull")) - .transformValues( - () -> new ExecutorFromExecutionTransformer(EXECUTOR_STATE_STORE_NAME), - Named.as("Executor.toExecutor"), - EXECUTOR_STATE_STORE_NAME - ); - - // logs - KafkaStreamSourceService.logIfEnabled( - log, - result, - (key, value) -> log(log, true, value), - "ExecutionIn" - ); - - return result; - } - - private KStream executionKilledKStream(StreamsBuilder builder) { - return builder - .stream( - kafkaAdminService.getTopicName(ExecutionKilled.class), - Consumed.with(Serdes.String(), JsonSerde.of(ExecutionKilled.class)).withName("KTable.ExecutionKilled") - ); - } - - private GlobalKTable workerTaskRunningKStream(StreamsBuilder builder) { - return builder - .globalTable( - kafkaAdminService.getTopicName(WorkerTaskRunning.class), - Consumed.with(Serdes.String(), JsonSerde.of(WorkerTaskRunning.class)).withName("GlobalKTable.WorkerTaskRunning"), - Materialized.>as(WORKER_RUNNING_STATE_STORE_NAME) - .withKeySerde(Serdes.String()) - .withValueSerde(JsonSerde.of(WorkerTaskRunning.class)) - ); - } - - private KStream workerInstanceKStream(StreamsBuilder builder) { - return builder - .stream( - kafkaAdminService.getTopicName(WorkerInstance.class), - Consumed.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("KStream.WorkerInstance") - ); - } - - private KStream flowWithTriggerStream(StreamsBuilder builder) { - return builder - .stream( - kafkaAdminService.getTopicName(ExecutorFlowTrigger.class), - Consumed.with(Serdes.String(), JsonSerde.of(ExecutorFlowTrigger.class)).withName("KStream.ExecutorFlowTrigger") - ) - .filter((key, value) -> value != null, Named.as("Flowwithtriggerstream.filterNotNull")); - } - - private KStream joinExecutionKilled(KStream executionKilledKStream, KStream executorKStream) { - return executorKStream - .merge( - executionKilledKStream - .transformValues( - () -> new ExecutorKilledJoinerTransformer( - EXECUTOR_STATE_STORE_NAME - ), - Named.as("JoinExecutionKilled.transformValues"), - EXECUTOR_STATE_STORE_NAME - ) - .filter((key, value) -> value != null, Named.as("JoinExecutionKilled.filterNotNull")), - Named.as("JoinExecutionKilled.merge") - ); - } - - private KStream workerTaskResultKStream(StreamsBuilder builder) { - return builder - .stream( - kafkaAdminService.getTopicName(WorkerTaskResult.class), - Consumed.with(Serdes.String(), JsonSerde.of(WorkerTaskResult.class)).withName("KStream.WorkerTaskResult") - ) - .filter((key, value) -> value != null, Named.as("WorkerTaskResultKStream.filterNotNull")); - } - - private KStream joinWorkerResult(KStream workerTaskResultKstream, KStream executorKStream) { - return executorKStream - .merge( - workerTaskResultKstream - .selectKey((key, value) -> value.getTaskRun().getExecutionId(), Named.as("JoinWorkerResult.selectKey")) - .mapValues( - (key, value) -> new Executor(value), - Named.as("JoinWorkerResult.WorkerTaskResultMap") - ) - .repartition( - Repartitioned.as("workertaskjoined") - .withKeySerde(Serdes.String()) - .withValueSerde(JsonSerde.of(Executor.class)) - ), - Named.as("JoinWorkerResult.merge") - ) - .transformValues( - () -> new ExecutorJoinerTransformer( - EXECUTOR_STATE_STORE_NAME, - this.metricRegistry - ), - Named.as("JoinWorkerResult.transformValues"), - EXECUTOR_STATE_STORE_NAME - ) - .filter( - (key, value) -> value != null, - Named.as("JoinWorkerResult.notNullFilter") - ); - } - - private KStream handleExecutor(KStream stream) { - return stream - .transformValues( - () -> new ExecutorNextTransformer( - NEXTS_DEDUPLICATION_STATE_STORE_NAME, - this - ), - Named.as("HandleExecutor.transformValues"), - NEXTS_DEDUPLICATION_STATE_STORE_NAME - ); - } - - private void purgeExecutor(KStream stream) { - KStream terminatedWithKilled = stream - .filter( - (key, value) -> conditionService.isTerminatedWithListeners(value.getFlow(), value.getExecution()), - Named.as("PurgeExecutor.filterTerminated") - ); - - // we don't purge killed execution in order to have feedback about child running tasks - // this can be killed lately (after the executor kill the execution), but we want to keep - // feedback about the actual state (killed or not) - // @TODO: this can lead to infinite state store for most executor topic - KStream terminated = terminatedWithKilled.filter( - (key, value) -> value.getExecution().getState().getCurrent() != State.Type.KILLED, - Named.as("PurgeExecutor.filterKilledExecution") - ); - - // clean up executor - terminated - .mapValues( - (readOnlyKey, value) -> (Execution) null, - Named.as("PurgeExecutor.executionToNull") - ) - .to( - kafkaAdminService.getTopicName(Executor.class), - Produced.with(Serdes.String(), JsonSerde.of(Execution.class)).withName("PurgeExecutor.toExecutor") - ); - - // flatMap taskRun - KStream taskRunKStream = terminated - .filter( - (key, value) -> value.getExecution().getTaskRunList() != null, - Named.as("PurgeExecutor.notNullTaskRunList") - ) - .flatMapValues( - (readOnlyKey, value) -> value.getExecution().getTaskRunList(), - Named.as("PurgeExecutor.flatMapTaskRunList") - ); - - // clean up workerTaskResult - taskRunKStream - .map( - (readOnlyKey, value) -> new KeyValue<>( - value.getId(), - (WorkerTaskResult) null - ), - Named.as("PurgeExecutor.workerTaskResultToNull") - ) - .to( - kafkaAdminService.getTopicName(WorkerTaskResult.class), - Produced.with(Serdes.String(), JsonSerde.of(WorkerTaskResult.class)).withName("PurgeExecutor.toWorkerTaskResult") - ); - - // clean up WorkerTask deduplication state - taskRunKStream - .transformValues( - () -> new DeduplicationPurgeTransformer<>( - WORKERTASK_DEDUPLICATION_STATE_STORE_NAME, - (key, value) -> value.getExecutionId() + "-" + value.getId() - ), - Named.as("PurgeExecutor.purgeWorkerTaskDeduplication"), - WORKERTASK_DEDUPLICATION_STATE_STORE_NAME - ); - - taskRunKStream - .transformValues( - () -> new DeduplicationPurgeTransformer<>( - WORKERTASK_DEDUPLICATION_STATE_STORE_NAME, - (key, value) -> "WorkerTaskExecution-" + value.getExecutionId() + "-" + value.getId() - ), - Named.as("PurgeExecutor.purgeWorkerTaskExecutionDeduplication"), - WORKERTASK_DEDUPLICATION_STATE_STORE_NAME - ); - - // clean up Execution Nexts deduplication state - terminated - .transformValues( - () -> new DeduplicationPurgeTransformer<>( - NEXTS_DEDUPLICATION_STATE_STORE_NAME, - (key, value) -> value.getExecution().getId() - ), - Named.as("PurgeExecutor.purgeNextsDeduplication"), - NEXTS_DEDUPLICATION_STATE_STORE_NAME - ); - - // clean up killed - terminatedWithKilled - .mapValues( - (readOnlyKey, value) -> (ExecutionKilled) null, - Named.as("PurgeExecutor.executionKilledToNull") - ) - .to( - kafkaAdminService.getTopicName(ExecutionKilled.class), - Produced.with(Serdes.String(), JsonSerde.of(ExecutionKilled.class)).withName("PurgeExecutor.toExecutionKilled") - ); - } - - private void toExecutorFlowTriggerTopic(KStream stream) { - stream - .filter( - (key, value) -> conditionService.isTerminatedWithListeners(value.getFlow(), value.getExecution()), - Named.as("HandleExecutorFlowTriggerTopic.filterTerminated") - ) - .transformValues( - () -> new DeduplicationTransformer<>( - "FlowTrigger", - TRIGGER_DEDUPLICATION_STATE_STORE_NAME, - (key, value) -> value.getExecution().getId(), - (key, value) -> value.getExecution().getId() - ), - Named.as("HandleExecutorFlowTriggerTopic.deduplication"), - TRIGGER_DEDUPLICATION_STATE_STORE_NAME - ) - .filter((key, value) -> value != null, Named.as("HandleExecutorFlowTriggerTopic.deduplicationNotNull")) - .flatTransform( - () -> new FlowWithTriggerTransformer( - flowService - ), - Named.as("HandleExecutorFlowTriggerTopic.flatMapToExecutorFlowTrigger") - ) - .to( - kafkaAdminService.getTopicName(ExecutorFlowTrigger.class), - Produced.with(Serdes.String(), JsonSerde.of(ExecutorFlowTrigger.class)).withName("PurgeExecutor.toExecutorFlowTrigger") - ); - } - - private void handleFlowTrigger(KStream stream) { - stream - .transformValues( - () -> new FlowTriggerWithExecutionTransformer( - TRIGGER_MULTIPLE_STATE_STORE_NAME, - flowService - ), - Named.as("HandleFlowTrigger.transformToExecutionList"), - TRIGGER_MULTIPLE_STATE_STORE_NAME - ) - .flatMap( - (key, value) -> value - .stream() - .map(execution -> new KeyValue<>(execution.getId(), execution)) - .collect(Collectors.toList()), - Named.as("HandleFlowTrigger.flapMapToExecution") - ) - .to( - kafkaAdminService.getTopicName(Execution.class), - Produced.with(Serdes.String(), JsonSerde.of(Execution.class)).withName("HandleFlowTrigger.toExecution") - ); - - stream - .mapValues( - (readOnlyKey, value) -> (ExecutorFlowTrigger)null, - Named.as("HandleFlowTrigger.executorFlowTriggerToNull") - ) - .to( - kafkaAdminService.getTopicName(ExecutorFlowTrigger.class), - Produced.with(Serdes.String(), JsonSerde.of(ExecutorFlowTrigger.class)).withName("HandleFlowTrigger.toExecutorFlowTrigger") - ); - } - - private void toWorkerTask(KStream stream) { - // deduplication worker task - KStream dedupWorkerTask = stream - .flatMapValues( - (readOnlyKey, value) -> value.getWorkerTasks(), - Named.as("HandleWorkerTask.flatMapToWorkerTask") - ) - .transformValues( - () -> new DeduplicationTransformer<>( - "WorkerTask", - WORKERTASK_DEDUPLICATION_STATE_STORE_NAME, - (key, value) -> value.getTaskRun().getExecutionId() + "-" + value.getTaskRun().getId(), - (key, value) -> value.getTaskRun().getState().getCurrent().name() - ), - Named.as("HandleWorkerTask.deduplication"), - WORKERTASK_DEDUPLICATION_STATE_STORE_NAME - ) - .filter((key, value) -> value != null, Named.as("HandleWorkerTask.notNullFilter")); - - // flowable > running to WorkerTaskResult - KStream resultFlowable = dedupWorkerTask - .filter((key, value) -> value.getTask().isFlowable(), Named.as("HandleWorkerTaskFlowable.filterIsFlowable")) - .mapValues( - (key, value) -> new WorkerTaskResult(value.withTaskRun(value.getTaskRun().withState(State.Type.RUNNING))), - Named.as("HandleWorkerTaskFlowable.toRunning") - ) - .map( - (key, value) -> new KeyValue<>(queueService.key(value), value), - Named.as("HandleWorkerTaskFlowable.mapWithKey") - ) - .selectKey( - (key, value) -> queueService.key(value), - Named.as("HandleWorkerTaskFlowable.selectKey") - ); - - KStream workerTaskResultKStream = KafkaStreamSourceService.logIfEnabled( - log, - resultFlowable, - (key, value) -> log(log, false, value), - "HandleWorkerTaskFlowable" - ); - - workerTaskResultKStream - .to( - kafkaAdminService.getTopicName(WorkerTaskResult.class), - Produced.with(Serdes.String(), JsonSerde.of(WorkerTaskResult.class)).withName("HandleWorkerTaskFlowable.toWorkerTaskResult") - ); - - // not flowable > to WorkerTask - KStream resultNotFlowable = dedupWorkerTask - .filter((key, value) -> !value.getTask().isFlowable(), Named.as("HandleWorkerTaskNotFlowable.filterIsNotFlowable")) - .map((key, value) -> new KeyValue<>(queueService.key(value), value), Named.as("HandleWorkerTaskNotFlowable.mapWithKey")) - .selectKey( - (key, value) -> queueService.key(value), - Named.as("HandleWorkerTaskNotFlowable.selectKey") - ); - - KStream workerTaskKStream = KafkaStreamSourceService.logIfEnabled( - log, - resultNotFlowable, - (key, value) -> log(log, false, value), - "HandleWorkerTaskNotFlowable" - ); - - workerTaskKStream - .to( - kafkaAdminService.getTopicName(WorkerTask.class), - Produced.with(Serdes.String(), JsonSerde.of(WorkerTask.class)).withName("HandleWorkerTaskNotFlowable.toWorkerTask") - ); - } - - private KTable workerTaskExecutionStream(StreamsBuilder builder) { - return builder - .table( - kafkaAdminService.getTopicName(WorkerTaskExecution.class), - Consumed.with(Serdes.String(), JsonSerde.of(WorkerTaskExecution.class)).withName("WorkerTaskExecution.from"), - Materialized.>as("workertaskexecution") - .withKeySerde(Serdes.String()) - .withValueSerde(JsonSerde.of(WorkerTaskExecution.class)) - ); - } - - private KStream deduplicateWorkerTaskExecution(KStream stream) { - return stream - .flatMapValues( - (readOnlyKey, value) -> value.getWorkerTaskExecutions(), - Named.as("DeduplicateWorkerTaskExecution.flatMap") - ) - .transformValues( - () -> new DeduplicationTransformer<>( - "DeduplicateWorkerTaskExecution", - WORKERTASK_DEDUPLICATION_STATE_STORE_NAME, - (key, value) -> "WorkerTaskExecution-" + value.getTaskRun().getExecutionId() + "-" + value.getTaskRun().getId(), - (key, value) -> value.getTaskRun().getState().getCurrent().name() - ), - Named.as("DeduplicateWorkerTaskExecution.deduplication"), - WORKERTASK_DEDUPLICATION_STATE_STORE_NAME - ) - .filter((key, value) -> value != null, Named.as("DeduplicateWorkerTaskExecution.notNullFilter")); - } - - private void toWorkerTaskExecution(KStream stream) { - stream - .selectKey( - (key, value) -> value.getExecution().getId(), - Named.as("ToWorkerTaskExecution.selectKey") - ) - .to( - kafkaAdminService.getTopicName(WorkerTaskExecution.class), - Produced.with(Serdes.String(), JsonSerde.of(WorkerTaskExecution.class)).withName("ToWorkerTaskExecution.toWorkerTaskExecution") - ); - } - - private void workerTaskExecutionToExecution(KStream stream) { - stream - .mapValues( - value -> { - String message = "Create new execution for flow '" + - value.getExecution().getNamespace() + "'." + value.getExecution().getFlowId() + - "' with id '" + value.getExecution().getId() + "' from task '" + value.getTask().getId() + - "' and taskrun '" + value.getTaskRun().getId() + - (value.getTaskRun().getValue() != null ? " (" + value.getTaskRun().getValue() + ")" : "") + "'"; - - log.info(message); - LogEntry.LogEntryBuilder logEntryBuilder = LogEntry.of(value.getTaskRun()).toBuilder() - .level(Level.INFO) - .message(message) - .timestamp(value.getTaskRun().getState().getStartDate()) - .thread(Thread.currentThread().getName()); - - return logEntryBuilder.build(); - }, - Named.as("WorkerTaskExecutionToExecution.mapToLog") - ) - .selectKey((key, value) -> (String)null, Named.as("WorkerTaskExecutionToExecution.logRemoveKey")) - .to( - kafkaAdminService.getTopicName(LogEntry.class), - Produced.with(Serdes.String(), JsonSerde.of(LogEntry.class)).withName("WorkerTaskExecutionToExecution.toLogEntry") - ); - - KStream executionKStream = stream - .mapValues( - (key, value) -> value.getExecution(), - Named.as("WorkerTaskExecutionToExecution.map") - ) - .selectKey( - (key, value) -> value.getId(), - Named.as("WorkerTaskExecutionToExecution.selectKey") - ); - - executionKStream = KafkaStreamSourceService.logIfEnabled( - log, - executionKStream, - (key, value) -> log(log, false, value), - "WorkerTaskExecutionToExecution" - ); - - executionKStream - .to( - kafkaAdminService.getTopicName(Execution.class), - Produced.with(Serdes.String(), JsonSerde.of(Execution.class)).withName("WorkerTaskExecutionToExecution.toExecution") - ); - } - - private void handleWorkerTaskExecution(KTable workerTaskExecutionKTable, KStream stream) { - KStream joinKStream = stream - .filter( - (key, value) -> conditionService.isTerminatedWithListeners(value.getFlow(), value.getExecution()), - Named.as("HandleWorkerTaskExecution.isTerminated") - ) - .transformValues( - () -> new WorkerTaskExecutionTransformer(runContextFactory, workerTaskExecutionKTable.queryableStoreName()), - Named.as("HandleWorkerTaskExecution.transform"), - workerTaskExecutionKTable.queryableStoreName() - ) - .filter((key, value) -> value != null, Named.as("HandleWorkerTaskExecution.joinNotNullFilter")); - - toWorkerTaskResultSend(joinKStream, "HandleWorkerTaskExecution"); - } - - private void toWorkerTaskResult(KStream stream) { - KStream workerTaskResultKStream = stream - .flatMapValues( - (readOnlyKey, value) -> value.getWorkerTaskResults(), - Named.as("HandleWorkerTaskResult.flapMap") - ); - - toWorkerTaskResultSend(workerTaskResultKStream, "HandleWorkerTaskResult"); - } - - private void toWorkerTaskResultSend(KStream stream, String name) { - KStream workerTaskResultKStream = stream - .transformValues( - () -> new DeduplicationTransformer<>( - name, - WORKERTASK_DEDUPLICATION_STATE_STORE_NAME, - (key, value) -> value.getTaskRun().getExecutionId() + "-" + value.getTaskRun().getId(), - (key, value) -> value.getTaskRun().getState().getCurrent().name() - ), - Named.as(name + ".deduplication"), - WORKERTASK_DEDUPLICATION_STATE_STORE_NAME - ) - .filter((key, value) -> value != null, Named.as(name + ".notNullFilter")) - .selectKey( - (key, value) -> value.getTaskRun().getId(), - Named.as(name + ".selectKey") - ); - - KafkaStreamSourceService.logIfEnabled( - log, - workerTaskResultKStream, - (key, value) -> log(log, false, value), - name - ) - .to( - kafkaAdminService.getTopicName(WorkerTaskResult.class), - Produced.with(Serdes.String(), JsonSerde.of(WorkerTaskResult.class)).withName(name + ".toWorkerTaskResult") - ); - } - - private void purgeWorkerRunning(KStream workerTaskResultKStream) { - workerTaskResultKStream - .filter((key, value) -> value.getTaskRun().getState().isTerninated(), Named.as("PurgeWorkerRunning.filterTerminated")) - .mapValues((readOnlyKey, value) -> (WorkerTaskRunning)null, Named.as("PurgeWorkerRunning.toNull")) - .to( - kafkaAdminService.getTopicName(WorkerTaskRunning.class), - Produced.with(Serdes.String(), JsonSerde.of(WorkerTaskRunning.class)).withName("PurgeWorkerRunning.toWorkerTaskRunning") - ); - } - - private void detectNewWorker(KStream workerInstanceKStream, GlobalKTable workerTaskRunningGlobalKTable) { - workerInstanceKStream - .to( - kafkaAdminService.getTopicName(TOPIC_EXECUTOR_WORKERINSTANCE), - Produced.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("DetectNewWorker.toExecutorWorkerInstance") - ); - - KStream stream = workerInstanceKStream - .transformValues( - WorkerInstanceTransformer::new, - Named.as("DetectNewWorker.workerInstanceTransformer") - ) - .flatMapValues((readOnlyKey, value) -> value, Named.as("DetectNewWorker.flapMapList")); - - // we resend the worker task from evicted worker - KStream resultWorkerTask = stream - .flatMapValues( - (readOnlyKey, value) -> value.getWorkerTasksToSend(), - Named.as("DetectNewWorkerTask.flapMapWorkerTaskToSend") - ); - - // and remove from running since already sent - resultWorkerTask - .map((key, value) -> KeyValue.pair(value.getTaskRun().getId(), (WorkerTaskRunning)null), Named.as("DetectNewWorkerTask.workerTaskRunningToNull")) - .to( - kafkaAdminService.getTopicName(WorkerTaskRunning.class), - Produced.with(Serdes.String(), JsonSerde.of(WorkerTaskRunning.class)).withName("DetectNewWorker.toWorkerTaskRunning") - ); - - KafkaStreamSourceService.logIfEnabled( - log, - resultWorkerTask, - (key, value) -> log.debug( - ">> OUT WorkerTask resend : {}", - value.getTaskRun().toStringState() - ), - "DetectNewWorkerTask" - ) - .to( - kafkaAdminService.getTopicName(WorkerTask.class), - Produced.with(Serdes.String(), JsonSerde.of(WorkerTask.class)).withName("DetectNewWorkerTask.toWorkerTask") - ); - - // we resend the WorkerInstance update - KStream updatedStream = KafkaStreamSourceService.logIfEnabled( - log, - stream, - (key, value) -> log.debug( - "Instance updated: {}", - value - ), - "DetectNewWorkerInstance" - ) - .map( - (key, value) -> value.getWorkerInstanceUpdated(), - Named.as("DetectNewWorkerInstance.mapInstance") - ); - - // cleanup executor workerinstance state store - updatedStream - .filter((key, value) -> value != null, Named.as("DetectNewWorkerInstance.filterNotNull")) - .to( - kafkaAdminService.getTopicName(TOPIC_EXECUTOR_WORKERINSTANCE), - Produced.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("DetectNewWorkerInstance.toExecutorWorkerInstance") - ); - - updatedStream - .to( - kafkaAdminService.getTopicName(WorkerInstance.class), - Produced.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("DetectNewWorkerInstance.toWorkerInstance") - ); - } - - private void toExecution(KStream stream) { - KStream streamFrom = stream - .filter((key, value) -> value.isExecutionUpdated(), Named.as("ToExecution.haveFrom")) - .transformValues( - ExecutorAddHeaderTransformer::new, - Named.as("ToExecution.addHeaders") - ); - - // send execution - KStream executionKStream = streamFrom - .filter((key, value) -> value.getException() == null, Named.as("ToExecutionExecution.notException")); - - toExecutionSend(executionKStream, "ToExecutionExecution"); - - // send exception - KStream> failedStream = streamFrom - .filter((key, value) -> value.getException() != null, Named.as("ToExecutionException.isException")) - .mapValues( - e -> Pair.of(e, e.getExecution().failedExecutionFromExecutor(e.getException())), - Named.as("ToExecutionException.mapToFailedExecutionWithLog") - ); - - failedStream - .flatMapValues(e -> e.getRight().getLogs(), Named.as("ToExecutionException.flatmapLogs")) - .selectKey((key, value) -> (String)null, Named.as("ToExecutionException.removeKey")) - .to( - kafkaAdminService.getTopicName(LogEntry.class), - Produced.with(Serdes.String(), JsonSerde.of(LogEntry.class)).withName("ToExecutionException.toLogEntry") - ); - - KStream executorFailedKStream = failedStream - .mapValues(e -> e.getLeft().withExecution(e.getRight().getExecution(), "failedExecutionFromExecutor"), Named.as("ToExecutionException.mapToExecutor")); - - toExecutionSend(executorFailedKStream, "ToExecutionException"); - } - - private void toExecutionSend(KStream stream, String from) { - stream = KafkaStreamSourceService.logIfEnabled( - log, - stream, - (key, value) -> log(log, false, value), - from - ); - - stream - .transformValues( - () -> new StateStoreTransformer<>(EXECUTOR_STATE_STORE_NAME, Executor::serialize), - Named.as(from + ".store"), - EXECUTOR_STATE_STORE_NAME - ) - .mapValues((readOnlyKey, value) -> value.getExecution(), Named.as(from + ".mapToExecution")) - .to( - kafkaAdminService.getTopicName(Execution.class), - Produced.with(Serdes.String(), JsonSerde.of(Execution.class)).withName(from + ".toExecution") - ); - } + protected KafkaStreamService kafkaStreamService; - @NoArgsConstructor - @Getter - public static class WorkerTaskResultState { - Map results = new HashMap<>(); - } + @Inject + protected KafkaAdminService kafkaAdminService; - @AllArgsConstructor - @Getter - public static class WorkerTaskRunningWithWorkerTaskRunning { - WorkerInstance workerInstance; - WorkerTaskRunning workerTaskRunning; - } + @Inject + protected List kafkaExecutors; - @NoArgsConstructor - @Getter - public static class WorkerTaskRunningState { - Map workerTaskRunnings = new HashMap<>(); - } + @Inject + protected ExecutorService executorService; @Override public void run() { @@ -868,38 +64,33 @@ public void run() { kafkaAdminService.createIfNotExist(Trigger.class); kafkaAdminService.createIfNotExist(ExecutorFlowTrigger.class); - Properties properties = new Properties(); - // build - Topology topology = this.topology().build(); - - if (log.isTraceEnabled()) { - log.trace(topology.describe().toString()); - } + this.streams = this.kafkaExecutors + .stream() + .parallel() + .map(executor -> { + Properties properties = new Properties(); + // build + Topology topology = executor.topology().build(); - stream = kafkaStreamService.of(this.getClass(), this.getClass(), topology, properties, log); - stream.start(); + Logger logger = LoggerFactory.getLogger(executor.getClass()); + KafkaStreamService.Stream stream = kafkaStreamService.of(executor.getClass(), executor.getClass(), topology, properties, logger); + stream.start(); - applicationContext.inject(stream); + executor.onCreated(applicationContext, stream); - applicationContext.registerSingleton(new KafkaTemplateExecutor( - stream.store(StoreQueryParameters.fromNameAndType("template", QueryableStoreTypes.keyValueStore())), - "template" - )); + applicationContext.inject(stream); - this.flowExecutorInterface = new KafkaFlowExecutor( - stream.store(StoreQueryParameters.fromNameAndType("flow", QueryableStoreTypes.keyValueStore())), - "flow", - applicationContext - ); - applicationContext.registerSingleton(this.flowExecutorInterface); + return stream; + }) + .collect(Collectors.toList()); } @Override public void close() throws IOException { - if (this.stream != null) { - this.stream.close(Duration.ofSeconds(10)); - this.stream = null; + if (streams != null) { + streams + .parallelStream() + .forEach(stream -> stream.close(Duration.ofSeconds(10))); } } - } diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorFlowTrigger.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorFlowTrigger.java new file mode 100644 index 0000000000..b9b6e38e53 --- /dev/null +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorFlowTrigger.java @@ -0,0 +1,98 @@ +package io.kestra.runner.kafka.executors; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow; +import io.kestra.core.services.FlowService; +import io.kestra.runner.kafka.KafkaQueueEnabled; +import io.kestra.runner.kafka.serializers.JsonSerde; +import io.kestra.runner.kafka.services.KafkaAdminService; +import io.kestra.runner.kafka.services.KafkaStreamSourceService; +import io.kestra.runner.kafka.services.KafkaStreamsBuilder; +import io.kestra.runner.kafka.streams.FlowTriggerWithExecutionTransformer; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.Stores; + +import java.util.stream.Collectors; + +@KafkaQueueEnabled +@Singleton +public class ExecutorFlowTrigger implements KafkaExecutorInterface { + public static final String TRIGGER_MULTIPLE_STATE_STORE_NAME = "trigger_multiplecondition"; + + @Inject + private KafkaAdminService kafkaAdminService; + + @Inject + private FlowService flowService; + + @Inject + private KafkaStreamSourceService kafkaStreamSourceService; + + public StreamsBuilder topology() { + StreamsBuilder builder = new KafkaStreamsBuilder(); + + kafkaStreamSourceService.flowGlobalKTable(builder); + + // trigger + builder.addStateStore( + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(TRIGGER_MULTIPLE_STATE_STORE_NAME), + Serdes.String(), + JsonSerde.of(MultipleConditionWindow.class) + ) + ); + + KStream stream = builder + .stream( + kafkaAdminService.getTopicName(io.kestra.runner.kafka.streams.ExecutorFlowTrigger.class), + Consumed.with(Serdes.String(), JsonSerde.of(io.kestra.runner.kafka.streams.ExecutorFlowTrigger.class)) + .withName("KStream.ExecutorFlowTrigger") + ) + .filter((key, value) -> value != null, Named.as("ExecutorFlowTrigger.filterNotNull")); + + stream + .transformValues( + () -> new FlowTriggerWithExecutionTransformer( + TRIGGER_MULTIPLE_STATE_STORE_NAME, + flowService + ), + Named.as("ExecutorFlowTrigger.transformToExecutionList"), + TRIGGER_MULTIPLE_STATE_STORE_NAME + ) + .flatMap( + (key, value) -> value + .stream() + .map(execution -> new KeyValue<>(execution.getId(), execution)) + .collect(Collectors.toList()), + Named.as("ExecutorFlowTrigger.flapMapToExecution") + ) + .to( + kafkaAdminService.getTopicName(Execution.class), + Produced + .with(Serdes.String(), JsonSerde.of(Execution.class)) + .withName("ExecutorFlowTrigger.toExecution") + ); + + stream + .mapValues( + (readOnlyKey, value) -> (io.kestra.runner.kafka.streams.ExecutorFlowTrigger)null, + Named.as("ExecutorFlowTrigger.executorFlowTriggerToNull") + ) + .to( + kafkaAdminService.getTopicName(io.kestra.runner.kafka.streams.ExecutorFlowTrigger.class), + Produced + .with(Serdes.String(), JsonSerde.of(io.kestra.runner.kafka.streams.ExecutorFlowTrigger.class)) + .withName("ExecutorFlowTrigger.toExecutorFlowTrigger") + ); + + return builder; + } +} diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorMain.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorMain.java new file mode 100644 index 0000000000..a1e21a0bf6 --- /dev/null +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorMain.java @@ -0,0 +1,700 @@ +package io.kestra.runner.kafka.executors; + +import io.kestra.core.metrics.MetricRegistry; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.ExecutionKilled; +import io.kestra.core.models.executions.LogEntry; +import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.flows.Flow; +import io.kestra.core.models.flows.State; +import io.kestra.core.models.templates.Template; +import io.kestra.core.queues.QueueService; +import io.kestra.core.runners.*; +import io.kestra.core.services.ConditionService; +import io.kestra.core.services.FlowService; +import io.kestra.runner.kafka.KafkaFlowExecutor; +import io.kestra.runner.kafka.KafkaQueueEnabled; +import io.kestra.runner.kafka.KafkaTemplateExecutor; +import io.kestra.runner.kafka.serializers.JsonSerde; +import io.kestra.runner.kafka.services.KafkaAdminService; +import io.kestra.runner.kafka.services.KafkaStreamService; +import io.kestra.runner.kafka.services.KafkaStreamSourceService; +import io.kestra.runner.kafka.services.KafkaStreamsBuilder; +import io.kestra.runner.kafka.streams.*; +import io.micronaut.context.ApplicationContext; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.Stores; +import org.slf4j.event.Level; + +import java.util.HashMap; +import java.util.Map; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +@KafkaQueueEnabled +@Singleton +@Slf4j +public class ExecutorMain implements KafkaExecutorInterface { + private static final String EXECUTOR_STATE_STORE_NAME = "executor"; + private static final String WORKERTASK_DEDUPLICATION_STATE_STORE_NAME = "workertask_deduplication"; + private static final String TRIGGER_DEDUPLICATION_STATE_STORE_NAME = "trigger_deduplication"; + private static final String NEXTS_DEDUPLICATION_STATE_STORE_NAME = "next_deduplication"; + + protected FlowExecutorInterface flowExecutorInterface; + + @Inject + private KafkaAdminService kafkaAdminService; + + @Inject + private FlowService flowService; + + @Inject + private KafkaStreamSourceService kafkaStreamSourceService; + + @Inject + private QueueService queueService; + + @Inject + private MetricRegistry metricRegistry; + + @Inject + private ConditionService conditionService; + + @Inject + private ExecutorService executorService; + + @Inject + private RunContextFactory runContextFactory; + + @Override + public void onCreated(ApplicationContext applicationContext, KafkaStreamService.Stream stream) { + applicationContext.registerSingleton(new KafkaTemplateExecutor( + stream.store(StoreQueryParameters.fromNameAndType("template", QueryableStoreTypes.keyValueStore())), + "template" + )); + + if (flowExecutorInterface == null) { + this.flowExecutorInterface = new KafkaFlowExecutor( + stream.store(StoreQueryParameters.fromNameAndType("flow", QueryableStoreTypes.keyValueStore())), + "flow", + applicationContext + ); + } + + applicationContext.registerSingleton(this.flowExecutorInterface); + } + + public StreamsBuilder topology() { + StreamsBuilder builder = new KafkaStreamsBuilder(); + + // executor + builder.addStateStore(Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(EXECUTOR_STATE_STORE_NAME), + Serdes.String(), + JsonSerde.of(Executor.class) + )); + + // WorkerTask deduplication + builder.addStateStore(Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(WORKERTASK_DEDUPLICATION_STATE_STORE_NAME), + Serdes.String(), + Serdes.String() + )); + + // next deduplication + builder.addStateStore(Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(NEXTS_DEDUPLICATION_STATE_STORE_NAME), + Serdes.String(), + JsonSerde.of(ExecutorNextTransformer.Store.class) + )); + + // trigger deduplication + builder.addStateStore(Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(TRIGGER_DEDUPLICATION_STATE_STORE_NAME), + Serdes.String(), + Serdes.String() + )); + + // declare common stream + KStream workerTaskResultKStream = this.workerTaskResultKStream(builder); + KStream executorKStream = this.executorKStream(builder); + + // join with killed + KStream executionKilledKStream = this.executionKilledKStream(builder); + KStream executionWithKilled = this.joinExecutionKilled(executionKilledKStream, executorKStream); + + // join with WorkerResult + KStream executionKStream = this.joinWorkerResult(workerTaskResultKStream, executionWithKilled); + + // handle state on execution + GlobalKTable flowKTable = kafkaStreamSourceService.flowGlobalKTable(builder); + GlobalKTable templateKTable = kafkaStreamSourceService.templateGlobalKTable(builder); + KStream stream = kafkaStreamSourceService.executorWithFlow(executionKStream, true); + + stream = this.handleExecutor(stream); + + // save execution + this.toExecution(stream); + this.toWorkerTask(stream); + this.toWorkerTaskResult(stream); + + this.toExecutorFlowTriggerTopic(stream); + + // task Flow + KTable workerTaskExecutionKTable = this.workerTaskExecutionStream(builder); + + KStream workerTaskExecutionKStream = this.deduplicateWorkerTaskExecution(stream); + this.toWorkerTaskExecution(workerTaskExecutionKStream); + this.workerTaskExecutionToExecution(workerTaskExecutionKStream); + this.handleWorkerTaskExecution(workerTaskExecutionKTable, stream); + + // purge at end + this.purgeExecutor(stream); + + this.purgeWorkerRunning(workerTaskResultKStream); + + return builder; + } + + public KStream executorKStream(StreamsBuilder builder) { + KStream result = builder + .stream( + kafkaAdminService.getTopicName(Execution.class), + Consumed.with(Serdes.String(), JsonSerde.of(Execution.class)).withName("Executor.fromExecution") + ) + .filter((key, value) -> value != null, Named.as("Executor.filterNotNull")) + .transformValues( + () -> new ExecutorFromExecutionTransformer(EXECUTOR_STATE_STORE_NAME), + Named.as("Executor.toExecutor"), + EXECUTOR_STATE_STORE_NAME + ); + + // logs + KafkaStreamSourceService.logIfEnabled( + log, + result, + (key, value) -> executorService.log(log, true, value), + "ExecutionIn" + ); + + return result; + } + + private KStream executionKilledKStream(StreamsBuilder builder) { + return builder + .stream( + kafkaAdminService.getTopicName(ExecutionKilled.class), + Consumed.with(Serdes.String(), JsonSerde.of(ExecutionKilled.class)).withName("KTable.ExecutionKilled") + ); + } + + private KStream joinExecutionKilled(KStream executionKilledKStream, KStream executorKStream) { + return executorKStream + .merge( + executionKilledKStream + .transformValues( + () -> new ExecutorKilledJoinerTransformer( + EXECUTOR_STATE_STORE_NAME + ), + Named.as("JoinExecutionKilled.transformValues"), + EXECUTOR_STATE_STORE_NAME + ) + .filter((key, value) -> value != null, Named.as("JoinExecutionKilled.filterNotNull")), + Named.as("JoinExecutionKilled.merge") + ); + } + + private KStream workerTaskResultKStream(StreamsBuilder builder) { + return builder + .stream( + kafkaAdminService.getTopicName(WorkerTaskResult.class), + Consumed.with(Serdes.String(), JsonSerde.of(WorkerTaskResult.class)).withName("KStream.WorkerTaskResult") + ) + .filter((key, value) -> value != null, Named.as("WorkerTaskResultKStream.filterNotNull")); + } + + private KStream joinWorkerResult(KStream workerTaskResultKstream, KStream executorKStream) { + return executorKStream + .merge( + workerTaskResultKstream + .selectKey((key, value) -> value.getTaskRun().getExecutionId(), Named.as("JoinWorkerResult.selectKey")) + .mapValues( + (key, value) -> new Executor(value), + Named.as("JoinWorkerResult.WorkerTaskResultMap") + ) + .repartition( + Repartitioned.as("workertaskjoined") + .withKeySerde(Serdes.String()) + .withValueSerde(JsonSerde.of(Executor.class)) + ), + Named.as("JoinWorkerResult.merge") + ) + .transformValues( + () -> new ExecutorJoinerTransformer( + EXECUTOR_STATE_STORE_NAME, + this.metricRegistry + ), + Named.as("JoinWorkerResult.transformValues"), + EXECUTOR_STATE_STORE_NAME + ) + .filter( + (key, value) -> value != null, + Named.as("JoinWorkerResult.notNullFilter") + ); + } + + private KStream handleExecutor(KStream stream) { + return stream + .transformValues( + () -> new ExecutorNextTransformer( + NEXTS_DEDUPLICATION_STATE_STORE_NAME, + this.executorService + ), + Named.as("HandleExecutor.transformValues"), + NEXTS_DEDUPLICATION_STATE_STORE_NAME + ); + } + + private void purgeExecutor(KStream stream) { + KStream terminatedWithKilled = stream + .filter( + (key, value) -> conditionService.isTerminatedWithListeners(value.getFlow(), value.getExecution()), + Named.as("PurgeExecutor.filterTerminated") + ); + + // we don't purge killed execution in order to have feedback about child running tasks + // this can be killed lately (after the executor kill the execution), but we want to keep + // feedback about the actual state (killed or not) + // @TODO: this can lead to infinite state store for most executor topic + KStream terminated = terminatedWithKilled.filter( + (key, value) -> value.getExecution().getState().getCurrent() != State.Type.KILLED, + Named.as("PurgeExecutor.filterKilledExecution") + ); + + // clean up executor + terminated + .mapValues( + (readOnlyKey, value) -> (Execution) null, + Named.as("PurgeExecutor.executionToNull") + ) + .to( + kafkaAdminService.getTopicName(Executor.class), + Produced.with(Serdes.String(), JsonSerde.of(Execution.class)).withName("PurgeExecutor.toExecutor") + ); + + // flatMap taskRun + KStream taskRunKStream = terminated + .filter( + (key, value) -> value.getExecution().getTaskRunList() != null, + Named.as("PurgeExecutor.notNullTaskRunList") + ) + .flatMapValues( + (readOnlyKey, value) -> value.getExecution().getTaskRunList(), + Named.as("PurgeExecutor.flatMapTaskRunList") + ); + + // clean up workerTaskResult + taskRunKStream + .map( + (readOnlyKey, value) -> new KeyValue<>( + value.getId(), + (WorkerTaskResult) null + ), + Named.as("PurgeExecutor.workerTaskResultToNull") + ) + .to( + kafkaAdminService.getTopicName(WorkerTaskResult.class), + Produced.with(Serdes.String(), JsonSerde.of(WorkerTaskResult.class)).withName("PurgeExecutor.toWorkerTaskResult") + ); + + // clean up WorkerTask deduplication state + taskRunKStream + .transformValues( + () -> new DeduplicationPurgeTransformer<>( + WORKERTASK_DEDUPLICATION_STATE_STORE_NAME, + (key, value) -> value.getExecutionId() + "-" + value.getId() + ), + Named.as("PurgeExecutor.purgeWorkerTaskDeduplication"), + WORKERTASK_DEDUPLICATION_STATE_STORE_NAME + ); + + taskRunKStream + .transformValues( + () -> new DeduplicationPurgeTransformer<>( + WORKERTASK_DEDUPLICATION_STATE_STORE_NAME, + (key, value) -> "WorkerTaskExecution-" + value.getExecutionId() + "-" + value.getId() + ), + Named.as("PurgeExecutor.purgeWorkerTaskExecutionDeduplication"), + WORKERTASK_DEDUPLICATION_STATE_STORE_NAME + ); + + // clean up Execution Nexts deduplication state + terminated + .transformValues( + () -> new DeduplicationPurgeTransformer<>( + NEXTS_DEDUPLICATION_STATE_STORE_NAME, + (key, value) -> value.getExecution().getId() + ), + Named.as("PurgeExecutor.purgeNextsDeduplication"), + NEXTS_DEDUPLICATION_STATE_STORE_NAME + ); + + // clean up killed + terminatedWithKilled + .mapValues( + (readOnlyKey, value) -> (ExecutionKilled) null, + Named.as("PurgeExecutor.executionKilledToNull") + ) + .to( + kafkaAdminService.getTopicName(ExecutionKilled.class), + Produced.with(Serdes.String(), JsonSerde.of(ExecutionKilled.class)).withName("PurgeExecutor.toExecutionKilled") + ); + } + + private void toExecutorFlowTriggerTopic(KStream stream) { + stream + .filter( + (key, value) -> conditionService.isTerminatedWithListeners(value.getFlow(), value.getExecution()), + Named.as("HandleExecutorFlowTriggerTopic.filterTerminated") + ) + .transformValues( + () -> new DeduplicationTransformer<>( + "FlowTrigger", + TRIGGER_DEDUPLICATION_STATE_STORE_NAME, + (key, value) -> value.getExecution().getId(), + (key, value) -> value.getExecution().getId() + ), + Named.as("HandleExecutorFlowTriggerTopic.deduplication"), + TRIGGER_DEDUPLICATION_STATE_STORE_NAME + ) + .filter((key, value) -> value != null, Named.as("HandleExecutorFlowTriggerTopic.deduplicationNotNull")) + .flatTransform( + () -> new FlowWithTriggerTransformer(flowService), + Named.as("HandleExecutorFlowTriggerTopic.flatMapToExecutorFlowTrigger") + ) + .to( + kafkaAdminService.getTopicName(io.kestra.runner.kafka.streams.ExecutorFlowTrigger.class), + Produced + .with(Serdes.String(), JsonSerde.of(io.kestra.runner.kafka.streams.ExecutorFlowTrigger.class)) + .withName("PurgeExecutor.toExecutorFlowTrigger") + ); + } + + private void toWorkerTask(KStream stream) { + // deduplication worker task + KStream dedupWorkerTask = stream + .flatMapValues( + (readOnlyKey, value) -> value.getWorkerTasks(), + Named.as("HandleWorkerTask.flatMapToWorkerTask") + ) + .transformValues( + () -> new DeduplicationTransformer<>( + "WorkerTask", + WORKERTASK_DEDUPLICATION_STATE_STORE_NAME, + (key, value) -> value.getTaskRun().getExecutionId() + "-" + value.getTaskRun().getId(), + (key, value) -> value.getTaskRun().getState().getCurrent().name() + ), + Named.as("HandleWorkerTask.deduplication"), + WORKERTASK_DEDUPLICATION_STATE_STORE_NAME + ) + .filter((key, value) -> value != null, Named.as("HandleWorkerTask.notNullFilter")); + + // flowable > running to WorkerTaskResult + KStream resultFlowable = dedupWorkerTask + .filter((key, value) -> value.getTask().isFlowable(), Named.as("HandleWorkerTaskFlowable.filterIsFlowable")) + .mapValues( + (key, value) -> new WorkerTaskResult(value.withTaskRun(value.getTaskRun().withState(State.Type.RUNNING))), + Named.as("HandleWorkerTaskFlowable.toRunning") + ) + .map( + (key, value) -> new KeyValue<>(queueService.key(value), value), + Named.as("HandleWorkerTaskFlowable.mapWithKey") + ) + .selectKey( + (key, value) -> queueService.key(value), + Named.as("HandleWorkerTaskFlowable.selectKey") + ); + + KStream workerTaskResultKStream = KafkaStreamSourceService.logIfEnabled( + log, + resultFlowable, + (key, value) -> executorService.log(log, false, value), + "HandleWorkerTaskFlowable" + ); + + workerTaskResultKStream + .to( + kafkaAdminService.getTopicName(WorkerTaskResult.class), + Produced.with(Serdes.String(), JsonSerde.of(WorkerTaskResult.class)).withName("HandleWorkerTaskFlowable.toWorkerTaskResult") + ); + + // not flowable > to WorkerTask + KStream resultNotFlowable = dedupWorkerTask + .filter((key, value) -> !value.getTask().isFlowable(), Named.as("HandleWorkerTaskNotFlowable.filterIsNotFlowable")) + .map((key, value) -> new KeyValue<>(queueService.key(value), value), Named.as("HandleWorkerTaskNotFlowable.mapWithKey")) + .selectKey( + (key, value) -> queueService.key(value), + Named.as("HandleWorkerTaskNotFlowable.selectKey") + ); + + KStream workerTaskKStream = KafkaStreamSourceService.logIfEnabled( + log, + resultNotFlowable, + (key, value) -> executorService.log(log, false, value), + "HandleWorkerTaskNotFlowable" + ); + + workerTaskKStream + .to( + kafkaAdminService.getTopicName(WorkerTask.class), + Produced.with(Serdes.String(), JsonSerde.of(WorkerTask.class)).withName("HandleWorkerTaskNotFlowable.toWorkerTask") + ); + } + + private KTable workerTaskExecutionStream(StreamsBuilder builder) { + return builder + .table( + kafkaAdminService.getTopicName(WorkerTaskExecution.class), + Consumed.with(Serdes.String(), JsonSerde.of(WorkerTaskExecution.class)).withName("WorkerTaskExecution.from"), + Materialized.>as("workertaskexecution") + .withKeySerde(Serdes.String()) + .withValueSerde(JsonSerde.of(WorkerTaskExecution.class)) + ); + } + + private KStream deduplicateWorkerTaskExecution(KStream stream) { + return stream + .flatMapValues( + (readOnlyKey, value) -> value.getWorkerTaskExecutions(), + Named.as("DeduplicateWorkerTaskExecution.flatMap") + ) + .transformValues( + () -> new DeduplicationTransformer<>( + "DeduplicateWorkerTaskExecution", + WORKERTASK_DEDUPLICATION_STATE_STORE_NAME, + (key, value) -> "WorkerTaskExecution-" + value.getTaskRun().getExecutionId() + "-" + value.getTaskRun().getId(), + (key, value) -> value.getTaskRun().getState().getCurrent().name() + ), + Named.as("DeduplicateWorkerTaskExecution.deduplication"), + WORKERTASK_DEDUPLICATION_STATE_STORE_NAME + ) + .filter((key, value) -> value != null, Named.as("DeduplicateWorkerTaskExecution.notNullFilter")); + } + + private void toWorkerTaskExecution(KStream stream) { + stream + .selectKey( + (key, value) -> value.getExecution().getId(), + Named.as("ToWorkerTaskExecution.selectKey") + ) + .to( + kafkaAdminService.getTopicName(WorkerTaskExecution.class), + Produced.with(Serdes.String(), JsonSerde.of(WorkerTaskExecution.class)).withName("ToWorkerTaskExecution.toWorkerTaskExecution") + ); + } + + private void workerTaskExecutionToExecution(KStream stream) { + stream + .mapValues( + value -> { + String message = "Create new execution for flow '" + + value.getExecution().getNamespace() + "'." + value.getExecution().getFlowId() + + "' with id '" + value.getExecution().getId() + "' from task '" + value.getTask().getId() + + "' and taskrun '" + value.getTaskRun().getId() + + (value.getTaskRun().getValue() != null ? " (" + value.getTaskRun().getValue() + ")" : "") + "'"; + + log.info(message); + + LogEntry.LogEntryBuilder logEntryBuilder = LogEntry.of(value.getTaskRun()).toBuilder() + .level(Level.INFO) + .message(message) + .timestamp(value.getTaskRun().getState().getStartDate()) + .thread(Thread.currentThread().getName()); + + return logEntryBuilder.build(); + }, + Named.as("WorkerTaskExecutionToExecution.mapToLog") + ) + .selectKey((key, value) -> (String)null, Named.as("WorkerTaskExecutionToExecution.logRemoveKey")) + .to( + kafkaAdminService.getTopicName(LogEntry.class), + Produced.with(Serdes.String(), JsonSerde.of(LogEntry.class)).withName("WorkerTaskExecutionToExecution.toLogEntry") + ); + + KStream executionKStream = stream + .mapValues( + (key, value) -> value.getExecution(), + Named.as("WorkerTaskExecutionToExecution.map") + ) + .selectKey( + (key, value) -> value.getId(), + Named.as("WorkerTaskExecutionToExecution.selectKey") + ); + + executionKStream = KafkaStreamSourceService.logIfEnabled( + log, + executionKStream, + (key, value) -> executorService.log(log, false, value), + "WorkerTaskExecutionToExecution" + ); + + executionKStream + .to( + kafkaAdminService.getTopicName(Execution.class), + Produced.with(Serdes.String(), JsonSerde.of(Execution.class)).withName("WorkerTaskExecutionToExecution.toExecution") + ); + } + + private void handleWorkerTaskExecution(KTable workerTaskExecutionKTable, KStream stream) { + KStream joinKStream = stream + .filter( + (key, value) -> conditionService.isTerminatedWithListeners(value.getFlow(), value.getExecution()), + Named.as("HandleWorkerTaskExecution.isTerminated") + ) + .transformValues( + () -> new WorkerTaskExecutionTransformer(runContextFactory, workerTaskExecutionKTable.queryableStoreName()), + Named.as("HandleWorkerTaskExecution.transform"), + workerTaskExecutionKTable.queryableStoreName() + ) + .filter((key, value) -> value != null, Named.as("HandleWorkerTaskExecution.joinNotNullFilter")); + + toWorkerTaskResultSend(joinKStream, "HandleWorkerTaskExecution"); + } + + private void toWorkerTaskResult(KStream stream) { + KStream workerTaskResultKStream = stream + .flatMapValues( + (readOnlyKey, value) -> value.getWorkerTaskResults(), + Named.as("HandleWorkerTaskResult.flapMap") + ); + + toWorkerTaskResultSend(workerTaskResultKStream, "HandleWorkerTaskResult"); + } + + private void toWorkerTaskResultSend(KStream stream, String name) { + KStream workerTaskResultKStream = stream + .transformValues( + () -> new DeduplicationTransformer<>( + name, + WORKERTASK_DEDUPLICATION_STATE_STORE_NAME, + (key, value) -> value.getTaskRun().getExecutionId() + "-" + value.getTaskRun().getId(), + (key, value) -> value.getTaskRun().getState().getCurrent().name() + ), + Named.as(name + ".deduplication"), + WORKERTASK_DEDUPLICATION_STATE_STORE_NAME + ) + .filter((key, value) -> value != null, Named.as(name + ".notNullFilter")) + .selectKey( + (key, value) -> value.getTaskRun().getId(), + Named.as(name + ".selectKey") + ); + + KafkaStreamSourceService.logIfEnabled( + log, + workerTaskResultKStream, + (key, value) -> executorService.log(log, false, value), + name + ) + .to( + kafkaAdminService.getTopicName(WorkerTaskResult.class), + Produced.with(Serdes.String(), JsonSerde.of(WorkerTaskResult.class)).withName(name + ".toWorkerTaskResult") + ); + } + + private void purgeWorkerRunning(KStream workerTaskResultKStream) { + workerTaskResultKStream + .filter((key, value) -> value.getTaskRun().getState().isTerninated(), Named.as("PurgeWorkerRunning.filterTerminated")) + .mapValues((readOnlyKey, value) -> (WorkerTaskRunning)null, Named.as("PurgeWorkerRunning.toNull")) + .to( + kafkaAdminService.getTopicName(WorkerTaskRunning.class), + Produced.with(Serdes.String(), JsonSerde.of(WorkerTaskRunning.class)).withName("PurgeWorkerRunning.toWorkerTaskRunning") + ); + } + + private void toExecution(KStream stream) { + KStream streamFrom = stream + .filter((key, value) -> value.isExecutionUpdated(), Named.as("ToExecution.haveFrom")) + .transformValues( + ExecutorAddHeaderTransformer::new, + Named.as("ToExecution.addHeaders") + ); + + // send execution + KStream executionKStream = streamFrom + .filter((key, value) -> value.getException() == null, Named.as("ToExecutionExecution.notException")); + + toExecutionSend(executionKStream, "ToExecutionExecution"); + + // send exception + KStream> failedStream = streamFrom + .filter((key, value) -> value.getException() != null, Named.as("ToExecutionException.isException")) + .mapValues( + e -> Pair.of(e, e.getExecution().failedExecutionFromExecutor(e.getException())), + Named.as("ToExecutionException.mapToFailedExecutionWithLog") + ); + + failedStream + .flatMapValues(e -> e.getRight().getLogs(), Named.as("ToExecutionException.flatmapLogs")) + .selectKey((key, value) -> (String)null, Named.as("ToExecutionException.removeKey")) + .to( + kafkaAdminService.getTopicName(LogEntry.class), + Produced.with(Serdes.String(), JsonSerde.of(LogEntry.class)).withName("ToExecutionException.toLogEntry") + ); + + KStream executorFailedKStream = failedStream + .mapValues(e -> e.getLeft().withExecution(e.getRight().getExecution(), "failedExecutionFromExecutor"), Named.as("ToExecutionException.mapToExecutor")); + + toExecutionSend(executorFailedKStream, "ToExecutionException"); + } + + private void toExecutionSend(KStream stream, String from) { + stream = KafkaStreamSourceService.logIfEnabled( + log, + stream, + (key, value) -> executorService.log(log, false, value), + from + ); + + stream + .transformValues( + () -> new StateStoreTransformer<>(EXECUTOR_STATE_STORE_NAME, Executor::serialize), + Named.as(from + ".store"), + EXECUTOR_STATE_STORE_NAME + ) + .mapValues((readOnlyKey, value) -> value.getExecution(), Named.as(from + ".mapToExecution")) + .to( + kafkaAdminService.getTopicName(Execution.class), + Produced.with(Serdes.String(), JsonSerde.of(Execution.class)).withName(from + ".toExecution") + ); + } + + @NoArgsConstructor + @Getter + public static class WorkerTaskResultState { + Map results = new HashMap<>(); + } + + @AllArgsConstructor + @Getter + public static class WorkerTaskRunningWithWorkerTaskRunning { + WorkerInstance workerInstance; + WorkerTaskRunning workerTaskRunning; + } + + @NoArgsConstructor + @Getter + public static class WorkerTaskRunningState { + Map workerTaskRunnings = new HashMap<>(); + } +} diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorWorkerRunning.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorWorkerRunning.java new file mode 100644 index 0000000000..cad4848618 --- /dev/null +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorWorkerRunning.java @@ -0,0 +1,140 @@ +package io.kestra.runner.kafka.executors; + +import io.kestra.core.runners.ExecutorService; +import io.kestra.core.runners.WorkerInstance; +import io.kestra.core.runners.WorkerTask; +import io.kestra.core.runners.WorkerTaskRunning; +import io.kestra.runner.kafka.KafkaQueueEnabled; +import io.kestra.runner.kafka.serializers.JsonSerde; +import io.kestra.runner.kafka.services.KafkaAdminService; +import io.kestra.runner.kafka.services.KafkaStreamSourceService; +import io.kestra.runner.kafka.services.KafkaStreamsBuilder; +import io.kestra.runner.kafka.streams.GlobalStateProcessor; +import io.kestra.runner.kafka.streams.WorkerInstanceTransformer; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; + +@KafkaQueueEnabled +@Singleton +@Slf4j +public class ExecutorWorkerRunning implements KafkaExecutorInterface { + public static final String WORKERINSTANCE_STATE_STORE_NAME = "worker_instance"; + public static final String TOPIC_EXECUTOR_WORKERINSTANCE = "executorworkerinstance"; + public static final String WORKER_RUNNING_STATE_STORE_NAME = "worker_running"; + + @Inject + private KafkaAdminService kafkaAdminService; + + @Inject + private ExecutorService executorService; + + public StreamsBuilder topology() { + StreamsBuilder builder = new KafkaStreamsBuilder(); + + builder.addGlobalStore( + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(WORKERINSTANCE_STATE_STORE_NAME), + Serdes.String(), + JsonSerde.of(WorkerInstance.class) + ), + kafkaAdminService.getTopicName(KafkaStreamSourceService.TOPIC_EXECUTOR_WORKERINSTANCE), + Consumed.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("GlobalStore.ExecutorWorkerInstace"), + () -> new GlobalStateProcessor<>(WORKERINSTANCE_STATE_STORE_NAME) + ); + + // only used as state store + builder + .globalTable( + kafkaAdminService.getTopicName(WorkerTaskRunning.class), + Consumed.with(Serdes.String(), JsonSerde.of(WorkerTaskRunning.class)).withName("GlobalKTable.WorkerTaskRunning"), + Materialized.>as(WORKER_RUNNING_STATE_STORE_NAME) + .withKeySerde(Serdes.String()) + .withValueSerde(JsonSerde.of(WorkerTaskRunning.class)) + ); + + KStream workerInstanceKStream = builder + .stream( + kafkaAdminService.getTopicName(WorkerInstance.class), + Consumed.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("KStream.WorkerInstance") + ); + + workerInstanceKStream + .to( + kafkaAdminService.getTopicName(TOPIC_EXECUTOR_WORKERINSTANCE), + Produced.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("DetectNewWorker.toExecutorWorkerInstance") + ); + + KStream stream = workerInstanceKStream + .transformValues( + WorkerInstanceTransformer::new, + Named.as("DetectNewWorker.workerInstanceTransformer") + ) + .flatMapValues((readOnlyKey, value) -> value, Named.as("DetectNewWorker.flapMapList")); + + // we resend the worker task from evicted worker + KStream resultWorkerTask = stream + .flatMapValues( + (readOnlyKey, value) -> value.getWorkerTasksToSend(), + Named.as("DetectNewWorkerTask.flapMapWorkerTaskToSend") + ); + + // and remove from running since already sent + resultWorkerTask + .map((key, value) -> KeyValue.pair(value.getTaskRun().getId(), (WorkerTaskRunning)null), Named.as("DetectNewWorkerTask.workerTaskRunningToNull")) + .to( + kafkaAdminService.getTopicName(WorkerTaskRunning.class), + Produced.with(Serdes.String(), JsonSerde.of(WorkerTaskRunning.class)).withName("DetectNewWorker.toWorkerTaskRunning") + ); + + KafkaStreamSourceService.logIfEnabled( + log, + resultWorkerTask, + (key, value) -> executorService.log(log, false, value), + "DetectNewWorkerTask" + ) + .to( + kafkaAdminService.getTopicName(WorkerTask.class), + Produced.with(Serdes.String(), JsonSerde.of(WorkerTask.class)).withName("DetectNewWorkerTask.toWorkerTask") + ); + + // we resend the WorkerInstance update + KStream updatedStream = KafkaStreamSourceService.logIfEnabled( + log, + stream, + (key, value) -> log.debug( + "Instance updated: {}", + value + ), + "DetectNewWorkerInstance" + ) + .map( + (key, value) -> value.getWorkerInstanceUpdated(), + Named.as("DetectNewWorkerInstance.mapInstance") + ); + + // cleanup executor workerinstance state store + updatedStream + .filter((key, value) -> value != null, Named.as("DetectNewWorkerInstance.filterNotNull")) + .to( + kafkaAdminService.getTopicName(TOPIC_EXECUTOR_WORKERINSTANCE), + Produced.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("DetectNewWorkerInstance.toExecutorWorkerInstance") + ); + + updatedStream + .to( + kafkaAdminService.getTopicName(WorkerInstance.class), + Produced.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("DetectNewWorkerInstance.toWorkerInstance") + ); + + + return builder; + } +} diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/KafkaExecutorInterface.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/KafkaExecutorInterface.java new file mode 100644 index 0000000000..bc58e88244 --- /dev/null +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/KafkaExecutorInterface.java @@ -0,0 +1,15 @@ +package io.kestra.runner.kafka.executors; + +import io.kestra.runner.kafka.KafkaQueueEnabled; +import io.kestra.runner.kafka.services.KafkaStreamService; +import io.micronaut.context.ApplicationContext; +import org.apache.kafka.streams.StreamsBuilder; + +@KafkaQueueEnabled +public interface KafkaExecutorInterface { + StreamsBuilder topology(); + + default void onCreated(ApplicationContext applicationContext, KafkaStreamService.Stream stream) { + // no op + } +} diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java index 562517f94e..28b5489421 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java @@ -123,6 +123,10 @@ private Stream(Topology topology, Properties props, MetricRegistry meterRegistry } this.logger = logger != null ? logger : log; + + if (this.logger.isTraceEnabled()) { + this.logger.trace(topology.describe().toString()); + } } public synchronized void start(final KafkaStreams.StateListener listener) throws IllegalStateException, StreamsException { diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorNextTransformer.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorNextTransformer.java index 81e9bf8f02..09956b6c7e 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorNextTransformer.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorNextTransformer.java @@ -1,7 +1,7 @@ package io.kestra.runner.kafka.streams; import io.kestra.core.models.executions.Execution; -import io.kestra.core.runners.AbstractExecutor; +import io.kestra.core.runners.ExecutorService; import io.kestra.core.runners.Executor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -18,23 +18,22 @@ @Slf4j public class ExecutorNextTransformer implements ValueTransformerWithKey { private final String storeName; - private final AbstractExecutor abstractExecutor; + private final ExecutorService executorService; private KeyValueStore store; - public ExecutorNextTransformer(String storeName, AbstractExecutor abstractExecutor) { + public ExecutorNextTransformer(String storeName, ExecutorService executorService) { this.storeName = storeName; - this.abstractExecutor = abstractExecutor; + this.executorService = executorService; } @Override - @SuppressWarnings("unchecked") public void init(final ProcessorContext context) { - this.store = (KeyValueStore) context.getStateStore(this.storeName); + this.store = context.getStateStore(this.storeName); } @Override public Executor transform(final String key, final Executor value) { - Executor executor = abstractExecutor.process(value); + Executor executor = executorService.process(value); if (executor.getNexts().size() == 0) { return value; @@ -57,7 +56,7 @@ public Executor transform(final String key, final Executor value) { store.addAll(groups.get(false)); this.store.put(key, store); - Execution newExecution = abstractExecutor.onNexts( + Execution newExecution = executorService.onNexts( value.getFlow(), value.getExecution(), executor.getNexts() diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/WorkerInstanceTransformer.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/WorkerInstanceTransformer.java index 4597bf5abc..c9a4baa0a0 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/WorkerInstanceTransformer.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/WorkerInstanceTransformer.java @@ -2,6 +2,7 @@ import com.google.common.collect.Streams; import io.kestra.core.runners.*; +import io.kestra.runner.kafka.executors.ExecutorWorkerRunning; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.ToString; @@ -13,7 +14,6 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import io.kestra.core.services.WorkerInstanceService; -import io.kestra.runner.kafka.KafkaExecutor; import java.util.Collections; import java.util.List; @@ -29,10 +29,9 @@ public WorkerInstanceTransformer() { } @Override - @SuppressWarnings("unchecked") public void init(final ProcessorContext context) { - this.instanceStore = (KeyValueStore) context.getStateStore(KafkaExecutor.WORKERINSTANCE_STATE_STORE_NAME); - this.runningStore = (KeyValueStore>) context.getStateStore(KafkaExecutor.WORKER_RUNNING_STATE_STORE_NAME); + this.instanceStore = context.getStateStore(ExecutorWorkerRunning.WORKERINSTANCE_STATE_STORE_NAME); + this.runningStore = context.getStateStore(ExecutorWorkerRunning.WORKER_RUNNING_STATE_STORE_NAME); } @Override diff --git a/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaExecutorTest.java b/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaExecutorTest.java index 2fc319e74d..b3e542732c 100644 --- a/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaExecutorTest.java +++ b/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaExecutorTest.java @@ -17,16 +17,18 @@ import io.kestra.core.utils.TestsUtils; import io.kestra.runner.kafka.configs.ClientConfig; import io.kestra.runner.kafka.configs.StreamDefaultsConfig; +import io.kestra.runner.kafka.executors.ExecutorMain; +import io.kestra.runner.kafka.executors.ExecutorFlowTrigger; +import io.kestra.runner.kafka.executors.ExecutorWorkerRunning; +import io.kestra.runner.kafka.executors.KafkaExecutorInterface; import io.kestra.runner.kafka.serializers.JsonSerde; import io.kestra.runner.kafka.services.KafkaAdminService; import io.micronaut.context.ApplicationContext; import io.micronaut.test.extensions.junit5.annotation.MicronautTest; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.test.TestRecord; import org.junit.jupiter.api.AfterEach; @@ -51,7 +53,13 @@ class KafkaExecutorTest { ApplicationContext applicationContext; @Inject - KafkaExecutor stream; + ExecutorMain executorMain; + + @Inject + ExecutorFlowTrigger executorFlowTrigger; + + @Inject + ExecutorWorkerRunning executorWorkerRunning; @Inject ClientConfig clientConfig; @@ -70,12 +78,12 @@ class KafkaExecutorTest { TopologyTestDriver testTopology; - static WorkerInstance workerInstance = workerInstance(); - @BeforeEach void init() throws IOException, URISyntaxException { TestsUtils.loads(repositoryLoader); + } + void startStream(KafkaExecutorInterface kafkaExecutorInterface) { Properties properties = new Properties(); properties.putAll(clientConfig.getProperties()); properties.putAll(streamConfig.getProperties()); @@ -85,18 +93,24 @@ void init() throws IOException, URISyntaxException { // @TODO properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); - Topology topology = stream.topology().build(); + Topology topology = kafkaExecutorInterface.topology().build(); if (log.isTraceEnabled()) { log.trace(topology.describe().toString()); } + if (testTopology != null) { + testTopology.close(); + } + testTopology = new TopologyTestDriver(topology, properties); - applicationContext.registerSingleton(new KafkaTemplateExecutor( - testTopology.getKeyValueStore("template"), - "template" - )); + if (kafkaExecutorInterface.getClass() == ExecutorMain.class) { + applicationContext.registerSingleton(new KafkaTemplateExecutor( + testTopology.getKeyValueStore("template"), + "template" + )); + } } @AfterEach @@ -108,6 +122,8 @@ void tear() { @Test void standard() { + startStream(this.executorMain); + Flow flow = flowRepository.findById("io.kestra.tests", "logs").orElseThrow(); this.flowInput().pipeInput(flow.uid(), flow); @@ -136,6 +152,8 @@ void standard() { @Test void concurrent() { + startStream(this.executorMain); + Flow flow = flowRepository.findById("io.kestra.tests", "logs").orElseThrow(); this.flowInput().pipeInput(flow.uid(), flow); @@ -165,6 +183,8 @@ void concurrent() { @Test void killed() { + startStream(this.executorMain); + Flow flow = flowRepository.findById("io.kestra.tests", "logs").orElseThrow(); this.flowInput().pipeInput(flow.uid(), flow); @@ -204,6 +224,8 @@ void killed() { @Test void killedAlreadyFinished() { + startStream(this.executorMain); + Flow flow = flowRepository.findById("io.kestra.tests", "logs").orElseThrow(); this.flowInput().pipeInput(flow.uid(), flow); @@ -225,6 +247,8 @@ void killedAlreadyFinished() { @ParameterizedTest @ValueSource(booleans = {true, false}) void killedParallel(boolean killed) { + startStream(this.executorMain); + Flow flow = flowRepository.findById("io.kestra.tests", "parallel").orElseThrow(); this.flowInput().pipeInput(flow.uid(), flow); @@ -290,6 +314,8 @@ void killedParallel(boolean killed) { @Test void eachNull() { + startStream(this.executorMain); + Flow flow = flowRepository.findById("io.kestra.tests", "each-null").orElseThrow(); this.flowInput().pipeInput(flow.uid(), flow); @@ -305,6 +331,8 @@ void eachNull() { @Test void parallel() { + startStream(this.executorMain); + Flow flow = flowRepository.findById("io.kestra.tests", "parallel").orElseThrow(); this.flowInput().pipeInput(flow.uid(), flow); @@ -368,6 +396,8 @@ void parallel() { @Test void eachParallelNested() throws InternalException { + startStream(this.executorMain); + Flow flow = flowRepository.findById("io.kestra.tests", "each-parallel-nested").orElseThrow(); this.flowInput().pipeInput(flow.uid(), flow); @@ -424,6 +454,8 @@ void eachParallelNested() throws InternalException { @Test void flowTrigger() { + startStream(this.executorMain); + Flow triggerFlow = flowRepository.findById("io.kestra.tests", "trigger-flow-listener-no-inputs").orElseThrow(); this.flowInput().pipeInput(triggerFlow.uid(), triggerFlow); @@ -440,16 +472,22 @@ void flowTrigger() { firstExecution = runningAndSuccessSequential(firstFlow, firstExecution, 0); assertThat(firstExecution.getState().getCurrent(), is(State.Type.SUCCESS)); - Execution triggerExecution = executionOutput().readRecord().getValue(); - assertThat(triggerExecution.getState().getCurrent(), is(State.Type.CREATED)); + io.kestra.runner.kafka.streams.ExecutorFlowTrigger executorFlowTrigger = executorFlowTriggerOutput().readRecord().value(); + assertThat(executorFlowTrigger.getFlowHavingTrigger().getId(), is("trigger-flow-listener-no-inputs")); - triggerExecution = runningAndSuccessSequential(triggerFlow, triggerExecution, 0); + startStream(this.executorFlowTrigger); - assertThat(triggerExecution.getState().getCurrent(), is(State.Type.SUCCESS)); + executorFlowTriggerInput().pipeInput(executorFlowTrigger.getFlowHavingTrigger().uid(), executorFlowTrigger); + + Execution triggerExecution = executionOutput().readRecord().getValue(); + assertThat(triggerExecution.getState().getCurrent(), is(State.Type.CREATED)); + assertThat(triggerExecution.getFlowId(), is("trigger-flow-listener-no-inputs")); } @Test void multipleTrigger() { + startStream(this.executorMain); + Flow flowA = flowRepository.findById("io.kestra.tests", "trigger-multiplecondition-flow-a").orElseThrow(); this.flowInput().pipeInput(flowA.uid(), flowA); @@ -469,19 +507,33 @@ void multipleTrigger() { executionB = runningAndSuccessSequential(flowB, executionB, 0); assertThat(executionB.getState().getCurrent(), is(State.Type.SUCCESS)); + // get trigger + io.kestra.runner.kafka.streams.ExecutorFlowTrigger executorFlowTriggerA = executorFlowTriggerOutput().readRecord().value(); + assertThat(executorFlowTriggerA.getExecution().getFlowId(), is("trigger-multiplecondition-flow-a")); + io.kestra.runner.kafka.streams.ExecutorFlowTrigger executorFlowTriggerB = executorFlowTriggerOutput().readRecord().value(); + assertThat(executorFlowTriggerB.getExecution().getFlowId(), is("trigger-multiplecondition-flow-b")); + + startStream(this.executorFlowTrigger); + this.flowInput().pipeInput(flowA.uid(), flowA); + this.flowInput().pipeInput(flowB.uid(), flowB); + this.flowInput().pipeInput(triggerFlow.uid(), triggerFlow); + + executorFlowTriggerInput().pipeInput(executorFlowTriggerA.getFlowHavingTrigger().uid(), executorFlowTriggerA); + executorFlowTriggerInput().pipeInput(executorFlowTriggerB.getFlowHavingTrigger().uid(), executorFlowTriggerB); + // trigger start Execution triggerExecution = executionOutput().readRecord().getValue(); assertThat(triggerExecution.getState().getCurrent(), is(State.Type.CREATED)); - - triggerExecution = runningAndSuccessSequential(triggerFlow, triggerExecution, 0); - assertThat(triggerExecution.getState().getCurrent(), is(State.Type.SUCCESS)); + assertThat(triggerExecution.getFlowId(), is("trigger-multiplecondition-listener")); } @Test void workerRebalanced() { + startStream(this.executorMain); + + // start an execution to generate some id Flow flow = flowRepository.findById("io.kestra.tests", "logs").orElseThrow(); this.flowInput().pipeInput(flow.uid(), flow); - this.workerInstanceInput().pipeInput(workerInstance.getWorkerUuid().toString(), workerInstance); Execution execution = createExecution(flow); @@ -489,7 +541,19 @@ void workerRebalanced() { String taskRunId = execution.getTaskRunList().get(0).getId(); assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.RUNNING)); - assertThat(this.workerTaskOutput().readRecord().value().getTaskRun().getState().getCurrent(), is(State.Type.CREATED)); + + WorkerTask workerTask = this.workerTaskOutput().readRecord().value(); + assertThat(workerTask.getTaskRun().getState().getCurrent(), is(State.Type.CREATED)); + + // start worker running stream + startStream(this.executorWorkerRunning); + + WorkerInstance startInstance = workerInstance(); + this.workerInstanceInput().pipeInput(startInstance.getWorkerUuid().toString(), startInstance); + + // add a running + WorkerTaskRunning running = WorkerTaskRunning.of(workerTask, startInstance, 0); + this.workerTaskRunningInput().pipeInput(running.getTaskRun().getId(), running); // declare a new worker instance WorkerInstance newInstance = workerInstance(); @@ -508,9 +572,10 @@ void workerRebalanced() { assertThat(workerTaskRunningRecord.key(), is(taskRunId)); } - @Test void invalidStore() throws JsonProcessingException { + startStream(this.executorMain); + KeyValueStore flow = this.testTopology.getKeyValueStore("flow"); flow.put("io.kestra.unittest_invalid_1", JacksonMapper.ofJson().writeValueAsString(Map.of( "id", "invalid", @@ -525,7 +590,6 @@ void invalidStore() throws JsonProcessingException { ) ))); - Flow triggerFlow = flowRepository.findById("io.kestra.tests", "trigger-flow-listener-no-inputs").orElseThrow(); this.flowInput().pipeInput(triggerFlow.uid(), triggerFlow); @@ -538,15 +602,10 @@ void invalidStore() throws JsonProcessingException { firstExecution = runningAndSuccessSequential(firstFlow, firstExecution, 0); assertThat(firstExecution.getState().getCurrent(), is(State.Type.SUCCESS)); - Execution triggerExecution = executionOutput().readRecord().getValue(); - assertThat(triggerExecution.getState().getCurrent(), is(State.Type.CREATED)); - - triggerExecution = runningAndSuccessSequential(triggerFlow, triggerExecution, 0); - - assertThat(triggerExecution.getState().getCurrent(), is(State.Type.SUCCESS)); + io.kestra.runner.kafka.streams.ExecutorFlowTrigger executorFlowTrigger = executorFlowTriggerOutput().readRecord().value(); + assertThat(executorFlowTrigger.getFlowHavingTrigger().getId(), is("trigger-flow-listener-no-inputs")); } - private Execution createExecution(Flow flow) { Execution execution = Execution.builder() .id(IdUtils.create()) @@ -596,6 +655,7 @@ private Execution runningAndSuccessSequential(Flow flow, Execution execution, in // add to running queue + /* TaskRun taskRun = execution.getTaskRunList().get(index); WorkerTaskRunning workerTaskRunning = WorkerTaskRunning.of( WorkerTask.builder() @@ -608,6 +668,8 @@ private Execution runningAndSuccessSequential(Flow flow, Execution execution, in ); this.workerTaskRunningInput().pipeInput(taskRun.getId(), workerTaskRunning); + */ + if (lastState == State.Type.CREATED) { return execution; } @@ -668,6 +730,25 @@ private TestInputTopic executionInput() { ); } + private TestInputTopic executorFlowTriggerInput() { + return this.testTopology + .createInputTopic( + kafkaAdminService.getTopicName(io.kestra.runner.kafka.streams.ExecutorFlowTrigger.class), + Serdes.String().serializer(), + JsonSerde.of(io.kestra.runner.kafka.streams.ExecutorFlowTrigger.class).serializer() + ); + } + + + private TestOutputTopic executorFlowTriggerOutput() { + return this.testTopology + .createOutputTopic( + kafkaAdminService.getTopicName(io.kestra.runner.kafka.streams.ExecutorFlowTrigger.class), + Serdes.String().deserializer(), + JsonSerde.of(io.kestra.runner.kafka.streams.ExecutorFlowTrigger.class).deserializer() + ); + } + private TestInputTopic executionKilledInput() { return this.testTopology .createInputTopic( diff --git a/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaRunnerTest.java b/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaRunnerTest.java index 7cdca9e4aa..4ebf459ad7 100644 --- a/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaRunnerTest.java +++ b/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaRunnerTest.java @@ -199,7 +199,7 @@ void workerRecordTooLarge() throws TimeoutException { ); assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); - assertThat(logs.stream().filter(logEntry -> logEntry.getExecutionId().equals(execution.getId())).count(), is(63L)); + assertThat(logs.stream().filter(logEntry -> logEntry.getExecutionId().equals(execution.getId())).count(), is(greaterThan(60L))); } @Test diff --git a/runner-kafka/src/test/resources/application.yml b/runner-kafka/src/test/resources/application.yml index 24dc4be502..f00625cb7f 100644 --- a/runner-kafka/src/test/resources/application.yml +++ b/runner-kafka/src/test/resources/application.yml @@ -77,7 +77,7 @@ kestra: segment.bytes: "10485760" executor: - name: "${kestra.kafka.defaults.topic-prefix}executor-executor-changelog" + name: "${kestra.kafka.defaults.topic-prefix}executor_main-executor-changelog" cls: io.kestra.core.runners.Executor properties: cleanup.policy: "delete,compact" diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java index 9643c8fce7..deaade72f8 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java @@ -21,58 +21,69 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; + +import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import jakarta.inject.Named; import jakarta.inject.Singleton; +import lombok.extern.slf4j.Slf4j; @Singleton @MemoryQueueEnabled -public class MemoryExecutor extends AbstractExecutor { - private final FlowRepositoryInterface flowRepository; - private final QueueInterface executionQueue; - private final QueueInterface workerTaskQueue; - private final QueueInterface workerTaskResultQueue; - private final QueueInterface logQueue; - private final FlowService flowService; - private final TaskDefaultService taskDefaultService; - private final Template.TemplateExecutorInterface templateExecutorInterface; +@Slf4j +public class MemoryExecutor implements ExecutorInterface { private static final MemoryMultipleConditionStorage multipleConditionStorage = new MemoryMultipleConditionStorage(); - private static final ConcurrentHashMap EXECUTIONS = new ConcurrentHashMap<>(); private static final ConcurrentHashMap WORKERTASKEXECUTIONS_WATCHER = new ConcurrentHashMap<>(); - private List allFlows; @Inject - public MemoryExecutor( - RunContextFactory runContextFactory, - FlowRepositoryInterface flowRepository, - @Named(QueueFactoryInterface.EXECUTION_NAMED) QueueInterface executionQueue, - @Named(QueueFactoryInterface.WORKERTASK_NAMED) QueueInterface workerTaskQueue, - @Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED) QueueInterface workerTaskResultQueue, - @Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface logQueue, - MetricRegistry metricRegistry, - FlowService flowService, - ConditionService conditionService, - TaskDefaultService taskDefaultService, - Template.TemplateExecutorInterface templateExecutorInterface - ) { - super(runContextFactory, metricRegistry, conditionService); - - this.flowRepository = flowRepository; - this.executionQueue = executionQueue; - this.workerTaskQueue = workerTaskQueue; - this.workerTaskResultQueue = workerTaskResultQueue; - this.logQueue = logQueue; - this.flowService = flowService; - this.conditionService = conditionService; - this.taskDefaultService = taskDefaultService; - this.flowExecutorInterface = new MemoryFlowExecutor(this.flowRepository); - this.templateExecutorInterface = templateExecutorInterface; - } + private ApplicationContext applicationContext; + + @Inject + private FlowRepositoryInterface flowRepository; + + @Inject + @Named(QueueFactoryInterface.EXECUTION_NAMED) + private QueueInterface executionQueue; + + @Inject + @Named(QueueFactoryInterface.WORKERTASK_NAMED) + private QueueInterface workerTaskQueue; + + @Inject + @Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED) + private QueueInterface workerTaskResultQueue; + + @Inject + @Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) + private QueueInterface logQueue; + + @Inject + private FlowService flowService; + + @Inject + private TaskDefaultService taskDefaultService; + + @Inject + private Template.TemplateExecutorInterface templateExecutorInterface; + + @Inject + private ExecutorService executorService; + + @Inject + private ConditionService conditionService; + + @Inject + private RunContextFactory runContextFactory; + + @Inject + private MetricRegistry metricRegistry; @Override public void run() { + applicationContext.registerSingleton(new MemoryFlowExecutor(this.flowRepository)); + this.allFlows = this.flowRepository.findAll(); this.executionQueue.receive(MemoryExecutor.class, this::executionQueue); this.workerTaskResultQueue.receive(MemoryExecutor.class, this::workerTaskResultQueue); @@ -107,14 +118,14 @@ private void handleExecution(ExecutionState state) { Executor executor = new Executor(execution, null).withFlow(flow); if (log.isDebugEnabled()) { - log(log, true, executor); + executorService.log(log, true, executor); } - executor = this.process(executor); + executor = executorService.process(executor); if (executor.getNexts().size() > 0 && deduplicateNexts(execution, executor.getNexts())) { executor.withExecution( - this.onNexts(executor.getFlow(), executor.getExecution(), executor.getNexts()), + executorService.onNexts(executor.getFlow(), executor.getExecution(), executor.getNexts()), "onNexts" ); } @@ -224,7 +235,7 @@ private ExecutionState saveExecution(Execution execution) { private void toExecution(Executor executor) { if (log.isDebugEnabled()) { - log(log, false, executor); + executorService.log(log, false, executor); } // emit for other consumer than executor @@ -242,7 +253,7 @@ private void toExecution(Executor executor) { private void workerTaskResultQueue(WorkerTaskResult message) { synchronized (this) { if (log.isDebugEnabled()) { - log(log, true, message); + executorService.log(log, true, message); } metricRegistry