Skip to content

Commit

Permalink
feat(jdbc): optimize index and queue poll interval
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 17, 2022
1 parent ed7cab8 commit 62fabd6
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 68 deletions.
5 changes: 5 additions & 0 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ kestra:
table: "executordelayed"
cls: io.kestra.core.runners.ExecutionDelay

queues:
min-poll-interval: 100ms
max-poll-interval: 1000ms
poll-switch-interval: 5s

elasticsearch:
defaults:
indice-prefix: "kestra_"
Expand Down
45 changes: 20 additions & 25 deletions jdbc-mysql/src/main/resources/migrations/mysql/V1__initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ CREATE TABLE queues (
'executor',
'worker',
'scheduler'
)
),
`updated` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX ix_type__consumers (type, consumers, offset),
INDEX ix_type__offset (type, offset),
INDEX ix_updated (updated)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;


Expand All @@ -72,10 +76,8 @@ CREATE TABLE `flows` (
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
`revision` INT UNSIGNED GENERATED ALWAYS AS (value ->> '$.revision') STORED NOT NULL,
`source_code` TEXT NOT NULL,
INDEX ix_id (id),
INDEX ix_namespace (namespace),
INDEX ix_revision (revision),
INDEX ix_deleted (deleted),
INDEX ix_namespace (deleted, namespace),
INDEX ix_namespace__id__revision (deleted, namespace, id, revision),
FULLTEXT ix_fulltext (namespace, id),
FULLTEXT ix_source_code (source_code)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Expand All @@ -87,9 +89,8 @@ CREATE TABLE `templates` (
`deleted` BOOL GENERATED ALWAYS AS (value ->> '$.deleted' = 'true') STORED NOT NULL,
`id` VARCHAR(100) GENERATED ALWAYS AS (value ->> '$.id') STORED NOT NULL,
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
INDEX ix_id (id),
INDEX ix_namespace (namespace),
INDEX ix_deleted (deleted),
INDEX ix_namespace (deleted, namespace),
INDEX ix_namespace__id (deleted, namespace, id),
FULLTEXT ix_fulltext (namespace, id)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

Expand All @@ -115,14 +116,12 @@ CREATE TABLE `executions` (
`state_duration` BIGINT GENERATED ALWAYS AS (value ->> '$.state.duration' * 1000) STORED NOT NULL,
`start_date` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.state.startDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL,
`end_date` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.state.endDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED,
INDEX ix_id (id),
INDEX ix_namespace (namespace),
INDEX ix_flowId (flow_id),
INDEX ix_state_current (state_current),
INDEX ix_start_date (start_date),
INDEX ix_end_date (end_date),
INDEX ix_state_duration (state_duration),
INDEX ix_deleted (deleted),
INDEX ix_namespace (deleted, namespace),
INDEX ix_flowId (deleted, flow_id),
INDEX ix_state_current (deleted, state_current),
INDEX ix_start_date (deleted, start_date),
INDEX ix_end_date (deleted, end_date),
INDEX ix_state_duration (deleted, state_duration),
FULLTEXT ix_fulltext (namespace, flow_id, id)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

Expand All @@ -134,7 +133,6 @@ CREATE TABLE triggers (
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
`trigger_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.triggerId') STORED NOT NULL,
`execution_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.executionId') STORED ,
INDEX ix_namespace__flow_id__trigger_id (namespace, flow_id, trigger_id),
INDEX ix_execution_id (execution_id)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

Expand All @@ -160,14 +158,11 @@ CREATE TABLE logs (
'TRACE'
) GENERATED ALWAYS AS (value ->> '$.level') STORED NOT NULL,
`timestamp` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.timestamp' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL,

INDEX ix_namespace (namespace),
INDEX ix_flowId (flow_id),
INDEX ix_task_id (task_id),
INDEX ix_execution_id (execution_id),
INDEX ix_taskrun_id (taskrun_id),
INDEX ix_trigger_id (trigger_id),
INDEX ix_timestamp (timestamp),
INDEX ix_namespace (deleted, namespace),
INDEX ix_execution_id (deleted, execution_id),
INDEX ix_execution_id__task_id (deleted, execution_id, task_id),
INDEX ix_execution_id__taskrun_id (deleted, execution_id, taskrun_id),
INDEX ix_timestamp (deleted, timestamp),
FULLTEXT ix_fulltext (namespace, flow_id, task_id, execution_id, taskrun_id, trigger_id, message, thread)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,31 @@ CREATE OR REPLACE FUNCTION PARSE_ISO8601_DURATION(text) RETURNS interval
IMMUTABLE
RETURN $1::interval;;

CREATE OR REPLACE FUNCTION UPDATE_UPDATED_DATETIME() RETURNS TRIGGER AS $$
BEGIN
NEW.updated = now();
RETURN NEW;
END;
$$ language 'plpgsql';


/* ----------------------- queues ----------------------- */
CREATE TABLE queues (
"offset" SERIAL PRIMARY KEY,
type queue_type NOT NULL,
key VARCHAR(250) NOT NULL,
value JSONB NOT NULL,
consumers queue_consumers[]
consumers queue_consumers[],
updated timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX queues_key ON queues (type, key);
CREATE INDEX queues_consumers ON queues (type, consumers);
CREATE INDEX queues_type__consumers ON queues (type, consumers, "offset");
CREATE INDEX queues_type__offset ON queues (type, "offset");
CREATE INDEX queues_updated ON queues ("updated");

CREATE TRIGGER queues_updated BEFORE UPDATE
ON queues FOR EACH ROW EXECUTE PROCEDURE
UPDATE_UPDATED_DATETIME();


/* ----------------------- flows ----------------------- */
Expand All @@ -112,10 +126,8 @@ CREATE TABLE flows (
source_code TEXT NOT NULL
);

CREATE INDEX flows_id ON flows (id);
CREATE INDEX flows_namespace ON flows (namespace);
CREATE INDEX flows_revision ON flows (revision);
CREATE INDEX flows_deleted ON flows (deleted);
CREATE INDEX flows_namespace ON flows (deleted, namespace);
CREATE INDEX flows_namespace__id__revision ON flows (deleted, namespace, id, revision);
CREATE INDEX flows_fulltext ON flows USING GIN (fulltext);
CREATE INDEX flows_source_code ON flows USING GIN (FULLTEXT_INDEX(source_code));

Expand All @@ -133,9 +145,8 @@ CREATE TABLE templates (
)) STORED
);

CREATE INDEX templates_namespace ON flows (namespace);
CREATE INDEX templates_revision ON flows (revision);
CREATE INDEX templates_deleted ON flows (deleted);
CREATE INDEX templates_namespace ON templates (deleted, namespace);
CREATE INDEX templates_namespace__id__revision ON templates (deleted, namespace, id);
CREATE INDEX templates_fulltext ON templates USING GIN (fulltext);


Expand All @@ -144,7 +155,6 @@ CREATE TABLE executions (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL,
deleted BOOL NOT NULL GENERATED ALWAYS AS (CAST(value ->> 'deleted' AS bool)) STORED,
id VARCHAR(100) NOT NULL GENERATED ALWAYS AS (value ->> 'id') STORED,
namespace VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED,
flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED,
state_current state_type NOT NULL GENERATED ALWAYS AS (STATE_FROMTEXT(value #>> '{state, current}')) STORED,
Expand All @@ -158,14 +168,12 @@ CREATE TABLE executions (
) STORED
);

CREATE INDEX executions_id ON executions (id);
CREATE INDEX executions_namespace ON executions (namespace);
CREATE INDEX executions_flow_id ON executions (flow_id);
CREATE INDEX executions_state_current ON executions (state_current);
CREATE INDEX executions_start_date ON executions (start_date);
CREATE INDEX executions_end_date ON executions (end_date);
CREATE INDEX executions_state_duration ON executions (state_duration);
CREATE INDEX executions_deleted ON executions (deleted);
CREATE INDEX executions_namespace ON executions (deleted, namespace);
CREATE INDEX executions_flow_id ON executions (deleted, flow_id);
CREATE INDEX executions_state_current ON executions (deleted, state_current);
CREATE INDEX executions_start_date ON executions (deleted, start_date);
CREATE INDEX executions_end_date ON executions (deleted, end_date);
CREATE INDEX executions_state_duration ON executions (deleted, state_duration);
CREATE INDEX executions_fulltext ON executions USING GIN (fulltext);


Expand All @@ -179,7 +187,6 @@ CREATE TABLE triggers (
execution_id VARCHAR(150) GENERATED ALWAYS AS (value ->> 'executionId') STORED
);

CREATE INDEX triggers_namespace__flow_id__trigger_id ON triggers (namespace, flow_id, trigger_id);
CREATE INDEX triggers_execution_id ON triggers (execution_id);


Expand Down Expand Up @@ -209,13 +216,11 @@ CREATE TABLE logs (
) STORED
);

CREATE INDEX logs_namespace ON logs (namespace);
CREATE INDEX logs_flowId ON logs (flow_id);
CREATE INDEX logs_task_id ON logs (task_id);
CREATE INDEX logs_execution_id ON logs (execution_id);
CREATE INDEX logs_taskrun_id ON logs (taskrun_id);
CREATE INDEX logs_trigger_id ON logs (trigger_id);
CREATE INDEX logs_timestamp ON logs (timestamp);
CREATE INDEX logs_namespace ON logs (deleted, namespace);
CREATE INDEX logs_execution_id ON logs (deleted, execution_id);
CREATE INDEX logs_execution_id__task_id ON logs (deleted, execution_id, task_id);
CREATE INDEX logs_execution_id__taskrun_id ON logs (deleted, execution_id, taskrun_id);
CREATE INDEX logs_timestamp ON logs (deleted, timestamp);
CREATE INDEX logs_fulltext ON logs USING GIN (fulltext);


Expand All @@ -231,7 +236,7 @@ CREATE TABLE multipleconditions (
);

CREATE INDEX multipleconditions_namespace__flow_id__condition_id ON multipleconditions (namespace, flow_id, condition_id);
CREATE INDEX multipleconditions_namespace__start_date__end_date ON multipleconditions (start_date, end_date);
CREATE INDEX multipleconditions_start_date__end_date ON multipleconditions (start_date, end_date);


/* ----------------------- workertaskexecutions ----------------------- */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import org.jooq.*;
import org.jooq.exception.DataAccessException;
import org.jooq.impl.DSL;

import java.sql.SQLTransientException;
import java.time.Duration;
import java.time.LocalDate;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -51,7 +49,7 @@ public Optional<Execution> findById(String id) {
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter())
.and(DSL.field("id").eq(id));
.and(DSL.field(DSL.quotedName("key")).eq(id));

return this.jdbcRepository.fetchOne(from);
});
Expand Down Expand Up @@ -450,7 +448,7 @@ public Executor lock(String executionId, Function<Pair<Execution, JdbcExecutorSt
SelectForUpdateOfStep<Record1<Object>> from = context
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(DSL.field("id").eq(executionId))
.where(DSL.field(DSL.quotedName("key")).eq(executionId))
.forUpdate();

Optional<Execution> execution = this.jdbcRepository.fetchOne(from);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ public Optional<Trigger> findLast(TriggerContext trigger) {
.using(configuration)
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(
DSL.field("namespace").eq(trigger.getNamespace())
.and(DSL.field("flow_id").eq(trigger.getFlowId()))
.and(DSL.field("trigger_id").eq(trigger.getTriggerId()))
);
.where(DSL.field(DSL.quotedName("key")).eq(trigger.uid()));

return this.jdbcRepository.fetchOne(select);
});
Expand Down
Loading

0 comments on commit 62fabd6

Please sign in to comment.