From 30ff6acb6e72eb0441ade39cf6dec97c58608980 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Fri, 4 Feb 2022 14:56:51 +0100 Subject: [PATCH] feat(kafka-runner): scheduler could recreate deleted trigger during race condition (#457) --- .../sys/FlowListenersRestoreCommand.java | 12 +- .../sys/FlowListenersRestoreCommandTest.java | 4 +- .../core/schedulers/AbstractScheduler.java | 22 ++- .../io/kestra/runner/kafka/KafkaExecutor.java | 2 + .../runner/kafka/KafkaFlowListeners.java | 98 +----------- .../kestra/runner/kafka/KafkaScheduler.java | 44 ------ .../kafka/executors/ExecutorFlowLast.java | 148 ++++++++++++++++++ .../executors/ExecutorTriggerCleaner.java | 72 +++++++++ .../runner/kafka/KafkaFlowListenersTest.java | 13 +- .../runner/kafka/KafkaSchedulerTest.java | 5 + 10 files changed, 267 insertions(+), 153 deletions(-) create mode 100644 runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorFlowLast.java create mode 100644 runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorTriggerCleaner.java diff --git a/cli/src/main/java/io/kestra/cli/commands/sys/FlowListenersRestoreCommand.java b/cli/src/main/java/io/kestra/cli/commands/sys/FlowListenersRestoreCommand.java index ae05ccf726..870a4f45ce 100644 --- a/cli/src/main/java/io/kestra/cli/commands/sys/FlowListenersRestoreCommand.java +++ b/cli/src/main/java/io/kestra/cli/commands/sys/FlowListenersRestoreCommand.java @@ -24,7 +24,7 @@ public class FlowListenersRestoreCommand extends AbstractCommand { private ApplicationContext applicationContext; @CommandLine.Option(names = {"--timeout"}, description = "Timeout before quit, considering we complete the restore") - private Duration timeout = Duration.ofSeconds(15); + private Duration timeout = Duration.ofSeconds(60); public FlowListenersRestoreCommand() { super(false); @@ -39,12 +39,16 @@ public Integer call() throws Exception { AtomicReference lastTime = new AtomicReference<>(ZonedDateTime.now()); flowListeners.listen(flows -> { - stdOut("Received {0} flows", flows.size()); + long count = flows.stream().filter(flow -> !flow.isDeleted()).count(); - lastTime.set(ZonedDateTime.now()); + stdOut("Received {0} active flows", count); + + if (count > 0) { + lastTime.set(ZonedDateTime.now()); + } }); - // we can't know when it's over to wait no more flow received + // we can't know when it's over, wait no more flow received Await.until(() -> lastTime.get().compareTo(ZonedDateTime.now().minus(this.timeout)) < 0); return 0; diff --git a/cli/src/test/java/io/kestra/cli/commands/sys/FlowListenersRestoreCommandTest.java b/cli/src/test/java/io/kestra/cli/commands/sys/FlowListenersRestoreCommandTest.java index 5806ab532c..660251766b 100644 --- a/cli/src/test/java/io/kestra/cli/commands/sys/FlowListenersRestoreCommandTest.java +++ b/cli/src/test/java/io/kestra/cli/commands/sys/FlowListenersRestoreCommandTest.java @@ -46,8 +46,8 @@ void run() throws InterruptedException { thread.join(); - assertThat(out.toString(), containsString("Received 1 flows")); - assertThat(out.toString(), containsString("Received 5 flows")); + assertThat(out.toString(), containsString("Received 1 active flows")); + assertThat(out.toString(), containsString("Received 5 active flows")); } } } \ No newline at end of file diff --git a/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java b/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java index e3d833cee0..7debd75305 100644 --- a/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java +++ b/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java @@ -61,6 +61,7 @@ public abstract class AbstractScheduler implements Runnable, AutoCloseable { private final Map lastEvaluate = new ConcurrentHashMap<>(); private final Map evaluateRunning = new ConcurrentHashMap<>(); private final Map evaluateRunningCount = new ConcurrentHashMap<>(); + private final Map triggerStateSaved = new ConcurrentHashMap<>(); @Getter private List schedulable = new ArrayList<>(); @@ -392,10 +393,6 @@ private Trigger getLastTrigger(FlowWithPollingTrigger f, ZonedDateTime now) { return triggerState .findLast(f.getTriggerContext()) .orElseGet(() -> { - // we don't find, so never started execution, create a trigger context with previous date in the past. - // this allows some edge case when the evaluation loop of schedulers will change second - // between start and end - ZonedDateTime nextDate = f.getPollingTrigger().nextEvaluationDate(Optional.empty()); Trigger build = Trigger.builder() @@ -407,7 +404,22 @@ private Trigger getLastTrigger(FlowWithPollingTrigger f, ZonedDateTime now) { .updatedDate(Instant.now()) .build(); - triggerState.save(build); + // we don't find, so never started execution, create a trigger context with previous date in the past. + // this allows some edge case when the evaluation loop of schedulers will change second + // between start and end + // but since we could have some changed on the flow in meantime, we wait 1 min before saving them. + if (triggerStateSaved.containsKey(build.uid())) { + Trigger cachedTrigger = triggerStateSaved.get(build.uid()); + + if (cachedTrigger.getUpdatedDate() != null && Instant.now().isAfter(cachedTrigger.getUpdatedDate().plusSeconds(60))) { + triggerState.save(build); + triggerStateSaved.remove(build.uid()); + } + + return cachedTrigger; + } else { + triggerStateSaved.put(build.uid(), build); + } return build; }); diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaExecutor.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaExecutor.java index e4549a81a7..f368d7b812 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaExecutor.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaExecutor.java @@ -53,6 +53,7 @@ public void run() { kafkaAdminService.createIfNotExist(WorkerTaskResult.class); kafkaAdminService.createIfNotExist(Execution.class); kafkaAdminService.createIfNotExist(Flow.class); + kafkaAdminService.createIfNotExist(KafkaStreamSourceService.TOPIC_FLOWLAST); kafkaAdminService.createIfNotExist(Executor.class); kafkaAdminService.createIfNotExist(KafkaStreamSourceService.TOPIC_EXECUTOR_WORKERINSTANCE); kafkaAdminService.createIfNotExist(ExecutionKilled.class); @@ -66,6 +67,7 @@ public void run() { this.streams = this.kafkaExecutors .stream() + .parallel() .map(executor -> { Properties properties = new Properties(); // build diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowListeners.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowListeners.java index b9edc5cfb4..901e0c0af9 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowListeners.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaFlowListeners.java @@ -33,8 +33,6 @@ @KafkaQueueEnabled public class KafkaFlowListeners implements FlowListenersInterface { private final KafkaAdminService kafkaAdminService; - private final FlowService flowService; - private SafeKeyValueStore store; private final List>> consumers = new ArrayList<>(); private final KafkaStreamService.Stream stream; @@ -43,18 +41,12 @@ public class KafkaFlowListeners implements FlowListenersInterface { @Inject public KafkaFlowListeners( KafkaAdminService kafkaAdminService, - KafkaStreamService kafkaStreamService, - FlowService flowService + KafkaStreamService kafkaStreamService ) { this.kafkaAdminService = kafkaAdminService; - this.flowService = flowService; - kafkaAdminService.createIfNotExist(Flow.class); kafkaAdminService.createIfNotExist(KafkaStreamSourceService.TOPIC_FLOWLAST); - KafkaStreamService.Stream buillLastVersion = kafkaStreamService.of(FlowListenerBuild.class, FlowListenerBuild.class, new FlowListenerBuild().topology(), log); - buillLastVersion.start(); - stream = kafkaStreamService.of(FlowListener.class, FlowListener.class, new FlowListener().topology(), log); stream.start((newState, oldState) -> { if (newState == KafkaStreams.State.RUNNING) { @@ -79,94 +71,6 @@ public KafkaFlowListeners( }); } - public class FlowListenerBuild { - public Topology topology() { - StreamsBuilder builder = new KafkaStreamsBuilder(); - - KStream stream = builder - .stream( - kafkaAdminService.getTopicName(Flow.class), - Consumed.with(Serdes.String(), JsonSerde.of(Flow.class, false)) - ); - - KStream result = KafkaStreamSourceService.logIfEnabled( - log, - stream, - (key, value) -> log.trace( - "Flow in '{}.{}' with revision {}", - value.getNamespace(), - value.getId(), - value.getRevision() - ), - "flow-in" - ) - .filter((key, value) -> value != null, Named.as("notNull")) - .selectKey((key, value) -> value.uidWithoutRevision(), Named.as("rekey")) - .groupBy( - (String key, Flow value) -> value.uidWithoutRevision(), - Grouped.as("grouped") - .withKeySerde(Serdes.String()) - .withValueSerde(JsonSerde.of(Flow.class, false)) - ) - .aggregate( - AllFlowRevision::new, - (key, value, aggregate) -> { - aggregate.revisions.add(value); - - return aggregate; - }, - Materialized.>as("list") - .withKeySerde(Serdes.String()) - .withValueSerde(JsonSerde.of(AllFlowRevision.class, false)) - ) - .mapValues( - (readOnlyKey, value) -> { - List flows = new ArrayList<>(flowService - .keepLastVersion(value.revisions)); - - if (flows.size() > 1) { - throw new IllegalArgumentException("Too many flows (" + flows.size() + ")"); - } - - return flows.size() == 0 ? null : flows.get(0); - }, - Named.as("last") - ) - .toStream(); - - KafkaStreamSourceService.logIfEnabled( - log, - result, - (key, value) -> log.trace( - "Flow out '{}.{}' with revision {}", - value.getNamespace(), - value.getId(), - value.getRevision() - ), - "Flow-out" - ) - .to( - kafkaAdminService.getTopicName(KafkaStreamSourceService.TOPIC_FLOWLAST), - Produced.with(Serdes.String(), JsonSerde.of(Flow.class)) - ); - - Topology topology = builder.build(); - - if (log.isTraceEnabled()) { - log.trace(topology.describe().toString()); - } - - return topology; - } - - } - - @NoArgsConstructor - @Getter - public static class AllFlowRevision { - private final List revisions = new ArrayList<>(); - } - public class FlowListener { public Topology topology() { StreamsBuilder builder = new KafkaStreamsBuilder(); diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaScheduler.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaScheduler.java index 8d6e134417..8478729bfb 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaScheduler.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaScheduler.java @@ -51,8 +51,6 @@ public class KafkaScheduler extends AbstractScheduler { private final KafkaAdminService kafkaAdminService; private final KafkaStreamService kafkaStreamService; private final QueueInterface triggerQueue; - private final ConditionService conditionService; - private final KafkaStreamSourceService kafkaStreamSourceService; private final QueueService queueService; private final KafkaProducer kafkaProducer; private final TopicsConfig topicsConfigTrigger; @@ -76,8 +74,6 @@ public KafkaScheduler( this.triggerQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TRIGGER_NAMED)); this.kafkaAdminService = applicationContext.getBean(KafkaAdminService.class); this.kafkaStreamService = applicationContext.getBean(KafkaStreamService.class); - this.conditionService = applicationContext.getBean(ConditionService.class); - this.kafkaStreamSourceService = applicationContext.getBean(KafkaStreamSourceService.class); this.queueService = applicationContext.getBean(QueueService.class); this.kafkaProducer = applicationContext.getBean(KafkaProducerService.class).of( KafkaScheduler.class, @@ -90,43 +86,6 @@ public KafkaScheduler( this.kafkaProducer.initTransactions(); } - public class SchedulerCleaner { - private StreamsBuilder topology() { - StreamsBuilder builder = new KafkaStreamsBuilder(); - - KStream executorKStream = kafkaStreamSourceService.executorKStream(builder); - - kafkaStreamSourceService.flowGlobalKTable(builder); - kafkaStreamSourceService.templateGlobalKTable(builder); - KStream executionWithFlowKStream = kafkaStreamSourceService.executorWithFlow(executorKStream, false); - - GlobalKTable triggerGlobalKTable = kafkaStreamSourceService.triggerGlobalKTable(builder); - - executionWithFlowKStream - .filter( - (key, value) -> value.getExecution().getTrigger() != null, - Named.as("cleanTrigger-hasTrigger-filter") - ) - .filter( - (key, value) -> conditionService.isTerminatedWithListeners(value.getFlow(), value.getExecution()), - Named.as("cleanTrigger-terminated-filter") - ) - .join( - triggerGlobalKTable, - (key, executionWithFlow) -> Trigger.uid(executionWithFlow.getExecution()), - (execution, trigger) -> trigger.resetExecution(), - Named.as("cleanTrigger-join") - ) - .selectKey((key, value) -> queueService.key(value)) - .to( - kafkaAdminService.getTopicName(Trigger.class), - Produced.with(Serdes.String(), JsonSerde.of(Trigger.class)) - ); - - return builder; - } - } - public class SchedulerState { public StreamsBuilder topology() { StreamsBuilder builder = new KafkaStreamsBuilder(); @@ -221,9 +180,6 @@ public void initStream() { this.executionState = new KafkaSchedulerExecutionState( stateStream.store(StoreQueryParameters.fromNameAndType(STATESTORE_EXECUTOR, QueryableStoreTypes.keyValueStore())) ); - - this.cleanTriggerStream = this.init(SchedulerCleaner.class, new SchedulerCleaner().topology()); - this.cleanTriggerStream.start(); } @Override diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorFlowLast.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorFlowLast.java new file mode 100644 index 0000000000..352013becb --- /dev/null +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorFlowLast.java @@ -0,0 +1,148 @@ +package io.kestra.runner.kafka.executors; + +import io.kestra.core.models.flows.Flow; +import io.kestra.core.models.triggers.AbstractTrigger; +import io.kestra.core.models.triggers.Trigger; +import io.kestra.core.queues.QueueService; +import io.kestra.core.services.FlowService; +import io.kestra.core.utils.ListUtils; +import io.kestra.runner.kafka.KafkaQueueEnabled; +import io.kestra.runner.kafka.serializers.JsonSerde; +import io.kestra.runner.kafka.services.KafkaAdminService; +import io.kestra.runner.kafka.services.KafkaStreamSourceService; +import io.kestra.runner.kafka.services.KafkaStreamsBuilder; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.jackson.Jacksonized; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@KafkaQueueEnabled +@Singleton +@Slf4j +public class ExecutorFlowLast implements KafkaExecutorInterface { + @Inject + private KafkaAdminService kafkaAdminService; + + @Inject + private QueueService queueService; + + public StreamsBuilder topology() { + StreamsBuilder builder = new KafkaStreamsBuilder(); + + // last global KTable + GlobalKTable flowGlobalKTable = builder + .globalTable( + kafkaAdminService.getTopicName(KafkaStreamSourceService.TOPIC_FLOWLAST), + Consumed.with(Serdes.String(), JsonSerde.of(Flow.class)).withName("GlobalKTable.FlowLast"), + Materialized.>as("last") + .withKeySerde(Serdes.String()) + .withValueSerde(JsonSerde.of(Flow.class)) + ); + + // stream + KStream stream = builder + .stream( + kafkaAdminService.getTopicName(Flow.class), + Consumed.with(Serdes.String(), JsonSerde.of(Flow.class, false)).withName("Stream.Flow") + ); + + // logs + stream = KafkaStreamSourceService.logIfEnabled( + log, + stream, + (key, value) -> log.trace( + "Flow in '{}.{}' with revision {}", + value.getNamespace(), + value.getId(), + value.getRevision() + ), + "Main" + ); + + // join with previous if more recent revision + KStream streamWithPrevious = stream + .filter((key, value) -> value != null, Named.as("Main.notNull")) + .selectKey((key, value) -> value.uidWithoutRevision(), Named.as("Main.selectKey")) + .leftJoin( + flowGlobalKTable, + (key, value) -> key, + (readOnlyKey, current, previous) -> { + if (previous == null) { + return new ExecutorFlowLast.FlowWithPrevious(current, null); + } else if (current.getRevision() < previous.getRevision()) { + return null; + } else { + return new ExecutorFlowLast.FlowWithPrevious(current, previous); + } + }, + Named.as("Main.join") + ) + .filter((key, value) -> value != null, Named.as("Main.joinNotNull")); + + // remove triggers + streamWithPrevious + .flatMap( + (key, value) -> { + List deletedTriggers = new ArrayList<>(); + + if (value.getFlow().isDeleted()) { + deletedTriggers = ListUtils.emptyOnNull(value.getFlow().getTriggers()); + } else if (value.getPrevious() != null) { + deletedTriggers = FlowService.findRemovedTrigger( + value.getFlow(), + value.getPrevious() + ); + } + + return deletedTriggers + .stream() + .map(t -> new KeyValue<>( + queueService.key(Trigger.of(value.getFlow(), t)), + (Trigger) null + )) + .collect(Collectors.toList()); + }, + Named.as("DeleteTrigger.flatMap") + ) + .to( + kafkaAdminService.getTopicName(Trigger.class), + Produced.with(Serdes.String(), JsonSerde.of(Trigger.class)).withName("To.Trigger") + ); + + // send to last and don't drop deleted flow in order to keep last version + streamWithPrevious + .map( + (key, value) -> new KeyValue<>( + value.getFlow().uidWithoutRevision(), + value.getFlow() + ), + Named.as("Main.Map") + ) + .to( + kafkaAdminService.getTopicName(KafkaStreamSourceService.TOPIC_FLOWLAST), + Produced.with(Serdes.String(), JsonSerde.of(Flow.class)).withName("To.FlowLast") + ); + + return builder; + } + + @Getter + @Jacksonized + @AllArgsConstructor + public static class FlowWithPrevious { + private Flow flow; + private Flow previous; + } +} diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorTriggerCleaner.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorTriggerCleaner.java new file mode 100644 index 0000000000..8e02ee0467 --- /dev/null +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorTriggerCleaner.java @@ -0,0 +1,72 @@ +package io.kestra.runner.kafka.executors; + +import io.kestra.core.models.triggers.Trigger; +import io.kestra.core.queues.QueueService; +import io.kestra.core.runners.Executor; +import io.kestra.core.services.ConditionService; +import io.kestra.runner.kafka.KafkaQueueEnabled; +import io.kestra.runner.kafka.serializers.JsonSerde; +import io.kestra.runner.kafka.services.KafkaAdminService; +import io.kestra.runner.kafka.services.KafkaStreamSourceService; +import io.kestra.runner.kafka.services.KafkaStreamsBuilder; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.GlobalKTable; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.Produced; + +@KafkaQueueEnabled +@Singleton +@Slf4j +public class ExecutorTriggerCleaner implements KafkaExecutorInterface { + @Inject + private KafkaAdminService kafkaAdminService; + + @Inject + private KafkaStreamSourceService kafkaStreamSourceService; + + @Inject + private QueueService queueService; + + @Inject + private ConditionService conditionService; + + public StreamsBuilder topology() { + StreamsBuilder builder = new KafkaStreamsBuilder(); + + KStream executorKStream = kafkaStreamSourceService.executorKStream(builder); + + kafkaStreamSourceService.flowGlobalKTable(builder); + kafkaStreamSourceService.templateGlobalKTable(builder); + KStream executionWithFlowKStream = kafkaStreamSourceService.executorWithFlow(executorKStream, false); + + GlobalKTable triggerGlobalKTable = kafkaStreamSourceService.triggerGlobalKTable(builder); + + executionWithFlowKStream + .filter( + (key, value) -> value.getExecution().getTrigger() != null, + Named.as("cleanTrigger-hasTrigger-filter") + ) + .filter( + (key, value) -> conditionService.isTerminatedWithListeners(value.getFlow(), value.getExecution()), + Named.as("cleanTrigger-terminated-filter") + ) + .join( + triggerGlobalKTable, + (key, executionWithFlow) -> Trigger.uid(executionWithFlow.getExecution()), + (execution, trigger) -> trigger.resetExecution(), + Named.as("cleanTrigger-join") + ) + .selectKey((key, value) -> queueService.key(value)) + .to( + kafkaAdminService.getTopicName(Trigger.class), + Produced.with(Serdes.String(), JsonSerde.of(Trigger.class)) + ); + + return builder; + } +} diff --git a/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaFlowListenersTest.java b/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaFlowListenersTest.java index b7035ecf04..e417c03377 100644 --- a/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaFlowListenersTest.java +++ b/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaFlowListenersTest.java @@ -3,20 +3,22 @@ import com.fasterxml.jackson.core.JsonProcessingException; import io.kestra.core.models.flows.Flow; import io.kestra.core.runners.FlowListenersTest; +import io.kestra.core.runners.StandAloneRunner; import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.utils.IdUtils; import io.kestra.runner.kafka.configs.TopicsConfig; import io.kestra.runner.kafka.services.KafkaProducerService; import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import jakarta.inject.Inject; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -31,6 +33,15 @@ class KafkaFlowListenersTest extends FlowListenersTest { @Inject KafkaProducerService kafkaProducerService; + @Inject + private StandAloneRunner runner; + + @BeforeEach + private void init() { + runner.setSchedulerEnabled(false); + runner.run(); + } + @Test public void all() { this.suite(flowListenersService); diff --git a/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaSchedulerTest.java b/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaSchedulerTest.java index 82037c5950..56f865ea7d 100644 --- a/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaSchedulerTest.java +++ b/runner-kafka/src/test/java/io/kestra/runner/kafka/KafkaSchedulerTest.java @@ -33,6 +33,9 @@ class KafkaSchedulerTest extends AbstractSchedulerTest { @Inject protected KafkaAdminService kafkaAdminService; + @Inject + protected KafkaExecutor kafkaExecutor; + @Inject protected FlowRepositoryInterface flowRepositoryInterface; @@ -58,6 +61,8 @@ void thread() throws Exception { flowRepositoryInterface.create(flow); + kafkaExecutor.run(); + // scheduler try (AbstractScheduler scheduler = new KafkaScheduler( applicationContext,