diff --git a/cli/src/main/resources/application.yml b/cli/src/main/resources/application.yml index 29d47cf356..fe0e3c2052 100644 --- a/cli/src/main/resources/application.yml +++ b/cli/src/main/resources/application.yml @@ -237,6 +237,11 @@ kestra: table: "executordelayed" cls: io.kestra.core.runners.ExecutionDelay + queues: + min-poll-interval: 100ms + max-poll-interval: 1000ms + poll-switch-interval: 5s + elasticsearch: defaults: indice-prefix: "kestra_" diff --git a/jdbc-mysql/src/main/resources/migrations/mysql/V1__initial.sql b/jdbc-mysql/src/main/resources/migrations/mysql/V1__initial.sql index 4d57c32a6e..cedcf302d2 100644 --- a/jdbc-mysql/src/main/resources/migrations/mysql/V1__initial.sql +++ b/jdbc-mysql/src/main/resources/migrations/mysql/V1__initial.sql @@ -60,7 +60,11 @@ CREATE TABLE queues ( 'executor', 'worker', 'scheduler' - ) + ), + `updated` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX ix_type__consumers (type, consumers, offset), + INDEX ix_type__offset (type, offset), + INDEX ix_updated (updated) ) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; @@ -72,10 +76,8 @@ CREATE TABLE `flows` ( `namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL, `revision` INT UNSIGNED GENERATED ALWAYS AS (value ->> '$.revision') STORED NOT NULL, `source_code` TEXT NOT NULL, - INDEX ix_id (id), - INDEX ix_namespace (namespace), - INDEX ix_revision (revision), - INDEX ix_deleted (deleted), + INDEX ix_namespace (deleted, namespace), + INDEX ix_namespace__id__revision (deleted, namespace, id, revision), FULLTEXT ix_fulltext (namespace, id), FULLTEXT ix_source_code (source_code) ) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; @@ -87,9 +89,8 @@ CREATE TABLE `templates` ( `deleted` BOOL GENERATED ALWAYS AS (value ->> '$.deleted' = 'true') STORED NOT NULL, `id` VARCHAR(100) GENERATED ALWAYS AS (value ->> '$.id') STORED NOT NULL, `namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL, - INDEX ix_id (id), - INDEX ix_namespace (namespace), - INDEX ix_deleted (deleted), + INDEX ix_namespace (deleted, namespace), + INDEX ix_namespace__id (deleted, namespace, id), FULLTEXT ix_fulltext (namespace, id) ) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; @@ -115,14 +116,12 @@ CREATE TABLE `executions` ( `state_duration` BIGINT GENERATED ALWAYS AS (value ->> '$.state.duration' * 1000) STORED NOT NULL, `start_date` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.state.startDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL, `end_date` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.state.endDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED, - INDEX ix_id (id), - INDEX ix_namespace (namespace), - INDEX ix_flowId (flow_id), - INDEX ix_state_current (state_current), - INDEX ix_start_date (start_date), - INDEX ix_end_date (end_date), - INDEX ix_state_duration (state_duration), - INDEX ix_deleted (deleted), + INDEX ix_namespace (deleted, namespace), + INDEX ix_flowId (deleted, flow_id), + INDEX ix_state_current (deleted, state_current), + INDEX ix_start_date (deleted, start_date), + INDEX ix_end_date (deleted, end_date), + INDEX ix_state_duration (deleted, state_duration), FULLTEXT ix_fulltext (namespace, flow_id, id) ) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; @@ -134,7 +133,6 @@ CREATE TABLE triggers ( `flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL, `trigger_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.triggerId') STORED NOT NULL, `execution_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.executionId') STORED , - INDEX ix_namespace__flow_id__trigger_id (namespace, flow_id, trigger_id), INDEX ix_execution_id (execution_id) ) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; @@ -160,14 +158,11 @@ CREATE TABLE logs ( 'TRACE' ) GENERATED ALWAYS AS (value ->> '$.level') STORED NOT NULL, `timestamp` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.timestamp' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL, - - INDEX ix_namespace (namespace), - INDEX ix_flowId (flow_id), - INDEX ix_task_id (task_id), - INDEX ix_execution_id (execution_id), - INDEX ix_taskrun_id (taskrun_id), - INDEX ix_trigger_id (trigger_id), - INDEX ix_timestamp (timestamp), + INDEX ix_namespace (deleted, namespace), + INDEX ix_execution_id (deleted, execution_id), + INDEX ix_execution_id__task_id (deleted, execution_id, task_id), + INDEX ix_execution_id__taskrun_id (deleted, execution_id, taskrun_id), + INDEX ix_timestamp (deleted, timestamp), FULLTEXT ix_fulltext (namespace, flow_id, task_id, execution_id, taskrun_id, trigger_id, message, thread) ) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; diff --git a/jdbc-postgres/src/main/resources/migrations/postgres/V1__initial.sql b/jdbc-postgres/src/main/resources/migrations/postgres/V1__initial.sql index 4acad4c8a6..9cf9db9464 100644 --- a/jdbc-postgres/src/main/resources/migrations/postgres/V1__initial.sql +++ b/jdbc-postgres/src/main/resources/migrations/postgres/V1__initial.sql @@ -84,17 +84,31 @@ CREATE OR REPLACE FUNCTION PARSE_ISO8601_DURATION(text) RETURNS interval IMMUTABLE RETURN $1::interval;; +CREATE OR REPLACE FUNCTION UPDATE_UPDATED_DATETIME() RETURNS TRIGGER AS $$ +BEGIN + NEW.updated = now(); + RETURN NEW; +END; +$$ language 'plpgsql'; + + /* ----------------------- queues ----------------------- */ CREATE TABLE queues ( "offset" SERIAL PRIMARY KEY, type queue_type NOT NULL, key VARCHAR(250) NOT NULL, value JSONB NOT NULL, - consumers queue_consumers[] + consumers queue_consumers[], + updated timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP ); -CREATE INDEX queues_key ON queues (type, key); -CREATE INDEX queues_consumers ON queues (type, consumers); +CREATE INDEX queues_type__consumers ON queues (type, consumers, "offset"); +CREATE INDEX queues_type__offset ON queues (type, "offset"); +CREATE INDEX queues_updated ON queues ("updated"); + +CREATE TRIGGER queues_updated BEFORE UPDATE + ON queues FOR EACH ROW EXECUTE PROCEDURE + UPDATE_UPDATED_DATETIME(); /* ----------------------- flows ----------------------- */ @@ -112,10 +126,8 @@ CREATE TABLE flows ( source_code TEXT NOT NULL ); -CREATE INDEX flows_id ON flows (id); -CREATE INDEX flows_namespace ON flows (namespace); -CREATE INDEX flows_revision ON flows (revision); -CREATE INDEX flows_deleted ON flows (deleted); +CREATE INDEX flows_namespace ON flows (deleted, namespace); +CREATE INDEX flows_namespace__id__revision ON flows (deleted, namespace, id, revision); CREATE INDEX flows_fulltext ON flows USING GIN (fulltext); CREATE INDEX flows_source_code ON flows USING GIN (FULLTEXT_INDEX(source_code)); @@ -133,9 +145,8 @@ CREATE TABLE templates ( )) STORED ); -CREATE INDEX templates_namespace ON flows (namespace); -CREATE INDEX templates_revision ON flows (revision); -CREATE INDEX templates_deleted ON flows (deleted); +CREATE INDEX templates_namespace ON templates (deleted, namespace); +CREATE INDEX templates_namespace__id__revision ON templates (deleted, namespace, id); CREATE INDEX templates_fulltext ON templates USING GIN (fulltext); @@ -144,7 +155,6 @@ CREATE TABLE executions ( key VARCHAR(250) NOT NULL PRIMARY KEY, value JSONB NOT NULL, deleted BOOL NOT NULL GENERATED ALWAYS AS (CAST(value ->> 'deleted' AS bool)) STORED, - id VARCHAR(100) NOT NULL GENERATED ALWAYS AS (value ->> 'id') STORED, namespace VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED, flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED, state_current state_type NOT NULL GENERATED ALWAYS AS (STATE_FROMTEXT(value #>> '{state, current}')) STORED, @@ -158,14 +168,12 @@ CREATE TABLE executions ( ) STORED ); -CREATE INDEX executions_id ON executions (id); -CREATE INDEX executions_namespace ON executions (namespace); -CREATE INDEX executions_flow_id ON executions (flow_id); -CREATE INDEX executions_state_current ON executions (state_current); -CREATE INDEX executions_start_date ON executions (start_date); -CREATE INDEX executions_end_date ON executions (end_date); -CREATE INDEX executions_state_duration ON executions (state_duration); -CREATE INDEX executions_deleted ON executions (deleted); +CREATE INDEX executions_namespace ON executions (deleted, namespace); +CREATE INDEX executions_flow_id ON executions (deleted, flow_id); +CREATE INDEX executions_state_current ON executions (deleted, state_current); +CREATE INDEX executions_start_date ON executions (deleted, start_date); +CREATE INDEX executions_end_date ON executions (deleted, end_date); +CREATE INDEX executions_state_duration ON executions (deleted, state_duration); CREATE INDEX executions_fulltext ON executions USING GIN (fulltext); @@ -179,7 +187,6 @@ CREATE TABLE triggers ( execution_id VARCHAR(150) GENERATED ALWAYS AS (value ->> 'executionId') STORED ); -CREATE INDEX triggers_namespace__flow_id__trigger_id ON triggers (namespace, flow_id, trigger_id); CREATE INDEX triggers_execution_id ON triggers (execution_id); @@ -209,13 +216,11 @@ CREATE TABLE logs ( ) STORED ); -CREATE INDEX logs_namespace ON logs (namespace); -CREATE INDEX logs_flowId ON logs (flow_id); -CREATE INDEX logs_task_id ON logs (task_id); -CREATE INDEX logs_execution_id ON logs (execution_id); -CREATE INDEX logs_taskrun_id ON logs (taskrun_id); -CREATE INDEX logs_trigger_id ON logs (trigger_id); -CREATE INDEX logs_timestamp ON logs (timestamp); +CREATE INDEX logs_namespace ON logs (deleted, namespace); +CREATE INDEX logs_execution_id ON logs (deleted, execution_id); +CREATE INDEX logs_execution_id__task_id ON logs (deleted, execution_id, task_id); +CREATE INDEX logs_execution_id__taskrun_id ON logs (deleted, execution_id, taskrun_id); +CREATE INDEX logs_timestamp ON logs (deleted, timestamp); CREATE INDEX logs_fulltext ON logs USING GIN (fulltext); @@ -231,7 +236,7 @@ CREATE TABLE multipleconditions ( ); CREATE INDEX multipleconditions_namespace__flow_id__condition_id ON multipleconditions (namespace, flow_id, condition_id); -CREATE INDEX multipleconditions_namespace__start_date__end_date ON multipleconditions (start_date, end_date); +CREATE INDEX multipleconditions_start_date__end_date ON multipleconditions (start_date, end_date); /* ----------------------- workertaskexecutions ----------------------- */ diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractExecutionRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractExecutionRepository.java index e47ce81615..01573fc2a1 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractExecutionRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractExecutionRepository.java @@ -17,10 +17,8 @@ import jakarta.inject.Singleton; import org.apache.commons.lang3.tuple.Pair; import org.jooq.*; -import org.jooq.exception.DataAccessException; import org.jooq.impl.DSL; -import java.sql.SQLTransientException; import java.time.Duration; import java.time.LocalDate; import java.time.ZonedDateTime; @@ -51,7 +49,7 @@ public Optional findById(String id) { .select(DSL.field("value")) .from(this.jdbcRepository.getTable()) .where(this.defaultFilter()) - .and(DSL.field("id").eq(id)); + .and(DSL.field(DSL.quotedName("key")).eq(id)); return this.jdbcRepository.fetchOne(from); }); @@ -450,7 +448,7 @@ public Executor lock(String executionId, Function> from = context .select(DSL.field("value")) .from(this.jdbcRepository.getTable()) - .where(DSL.field("id").eq(executionId)) + .where(DSL.field(DSL.quotedName("key")).eq(executionId)) .forUpdate(); Optional execution = this.jdbcRepository.fetchOne(from); diff --git a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractTriggerRepository.java b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractTriggerRepository.java index 083323e357..8a42f1f06c 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractTriggerRepository.java +++ b/jdbc/src/main/java/io/kestra/jdbc/repository/AbstractTriggerRepository.java @@ -31,11 +31,7 @@ public Optional findLast(TriggerContext trigger) { .using(configuration) .select(DSL.field("value")) .from(this.jdbcRepository.getTable()) - .where( - DSL.field("namespace").eq(trigger.getNamespace()) - .and(DSL.field("flow_id").eq(trigger.getFlowId())) - .and(DSL.field("trigger_id").eq(trigger.getTriggerId())) - ); + .where(DSL.field(DSL.quotedName("key")).eq(trigger.uid())); return this.jdbcRepository.fetchOne(select); }); diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java index 7e15fced3b..5b6dcd1b70 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java @@ -13,20 +13,27 @@ import io.kestra.jdbc.DSLContextWrapper; import io.kestra.jdbc.JdbcConfiguration; import io.micronaut.context.ApplicationContext; +import io.micronaut.context.annotation.ConfigurationProperties; import io.micronaut.transaction.exceptions.CannotCreateTransactionException; +import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.jooq.*; import org.jooq.impl.DSL; import java.io.IOException; +import java.time.Duration; +import java.time.ZonedDateTime; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Supplier; import javax.sql.DataSource; @Slf4j @@ -43,10 +50,13 @@ public abstract class JdbcQueue implements QueueInterface { protected final DataSource dataSource; + protected final Configuration configuration; + protected final Table table; protected final JdbcQueueIndexer jdbcQueueIndexer; + protected Boolean isShutdown = false; public JdbcQueue(Class cls, ApplicationContext applicationContext) { @@ -59,6 +69,7 @@ public JdbcQueue(Class cls, ApplicationContext applicationContext) { this.cls = cls; this.dslContextWrapper = applicationContext.getBean(DSLContextWrapper.class); this.dataSource = applicationContext.getBean(DataSource.class); + this.configuration = applicationContext.getBean(Configuration.class); JdbcConfiguration jdbcConfiguration = applicationContext.getBean(JdbcConfiguration.class); @@ -141,7 +152,7 @@ public Runnable receive(Consumer consumer) { }); return this.poll(() -> { - dslContextWrapper.transaction(configuration -> { + return dslContextWrapper.transactionResult(configuration -> { DSLContext ctx = DSL.using(configuration); Result fetch = this.receiveFetch(ctx, maxOffset.get()); @@ -153,6 +164,8 @@ public Runnable receive(Consumer consumer) { maxOffset.set(offsets.get(offsets.size() - 1)); } + + return fetch.size(); }); }); } @@ -162,7 +175,7 @@ public Runnable receive(Class consumerGroup, Consumer consumer) { String consumerGroupName = consumerGroupName(consumerGroup); return this.poll(() -> { - dslContextWrapper.transaction(configuration -> { + return dslContextWrapper.transactionResult(configuration -> { DSLContext ctx = DSL.using(configuration); Result fetch = this.receiveFetch(ctx, consumerGroupName); @@ -177,18 +190,29 @@ public Runnable receive(Class consumerGroup, Consumer consumer) { ); } + return fetch.size(); }); }); } @SuppressWarnings("BusyWait") - private Runnable poll(Runnable runnable) { + private Runnable poll(Supplier runnable) { AtomicBoolean running = new AtomicBoolean(true); + AtomicLong sleep = new AtomicLong(configuration.getMaxPollInterval().toMillis()); + AtomicReference lastPoll = new AtomicReference<>(ZonedDateTime.now()); poolExecutor.execute(() -> { while (running.get() && !this.isShutdown) { try { - runnable.run(); + Integer count = runnable.get(); + if (count > 0) { + lastPoll.set(ZonedDateTime.now()); + } + + sleep.set(lastPoll.get().plus(configuration.getPollSwitchInterval()).compareTo(ZonedDateTime.now()) < 0 ? + configuration.getMaxPollInterval().toMillis() : + configuration.getMinPollInterval().toMillis() + ); } catch (CannotCreateTransactionException e) { if (log.isDebugEnabled()) { log.debug("Can't poll on receive", e); @@ -196,14 +220,13 @@ private Runnable poll(Runnable runnable) { } try { - Thread.sleep(500); + Thread.sleep(sleep.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); - return () -> { running.set(false); }; @@ -231,4 +254,12 @@ public void close() throws IOException { this.isShutdown = true; poolExecutor.shutdown(); } + + @ConfigurationProperties("kestra.jdbc.queues") + @Getter + public static class Configuration { + Duration minPollInterval; + Duration maxPollInterval; + Duration pollSwitchInterval; + } }