Skip to content

Commit

Permalink
feat(jdbc): implementation of MultipleConditionStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 17, 2022
1 parent 334f895 commit 521ddde
Show file tree
Hide file tree
Showing 18 changed files with 341 additions and 50 deletions.
3 changes: 3 additions & 0 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ kestra:
logs:
table: "logs"
cls: io.kestra.core.models.executions.LogEntry
multipleconditions:
table: "multipleconditions"
cls: io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow

elasticsearch:
defaults:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -32,8 +33,8 @@ default MultipleConditionWindow getOrCreate(Flow flow, MultipleCondition multipl
.plusMinutes(multipleCondition.getWindow().toMinutes() * (now.getMinute() / multipleCondition.getWindow().toMinutes()));
}

ZonedDateTime start = now.plus(multipleCondition.getWindowAdvance());
ZonedDateTime end = start.plus(multipleCondition.getWindow()).minus(Duration.ofNanos(1));
ZonedDateTime start = now.plus(multipleCondition.getWindowAdvance()).truncatedTo(ChronoUnit.MILLIS);
ZonedDateTime end = start.plus(multipleCondition.getWindow()).minus(Duration.ofMillis(1)).truncatedTo(ChronoUnit.MILLIS);

return this.get(flow, multipleCondition.getId())
.filter(m -> m.isValid(ZonedDateTime.now()))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.models.triggers.multipleflows;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Builder;
import lombok.Value;
Expand All @@ -14,10 +15,17 @@
@Builder
public class MultipleConditionWindow {
String namespace;

String flowId;

String conditionId;

@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
ZonedDateTime start;

@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
ZonedDateTime end;

Map<String, Boolean> results;

@JsonIgnore
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/io/kestra/core/queues/QueueService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskResult;
Expand Down Expand Up @@ -36,6 +37,8 @@ public String key(Object object) {
return ((ExecutionKilled) object).getExecutionId();
} else if (object.getClass() == Trigger.class) {
return ((Trigger) object).uid();
} else if (object.getClass() == MultipleConditionWindow.class) {
return ((MultipleConditionWindow) object).uid();
} else {
throw new IllegalArgumentException("Unknown type '" + object.getClass().getName() + "'");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.core.models.triggers.multipleflows;

import com.google.common.collect.ImmutableMap;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.conditions.types.ExecutionFlowCondition;
Expand All @@ -19,16 +20,13 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

public class MultipleConditionStorageTest {
@MicronautTest(transactional = false)
public abstract class AbstractMultipleConditionStorageTest {
private static final String NAMESPACE = "io.kestra.unit";

protected MultipleConditionStorageInterface multipleConditionStorage() {
return new MemoryMultipleConditionStorage();
}
abstract protected MultipleConditionStorageInterface multipleConditionStorage();

protected void save(MultipleConditionStorageInterface multipleConditionStorage, Flow flow, List<MultipleConditionWindow> multipleConditionWindows) {
((MemoryMultipleConditionStorage) multipleConditionStorage).save(multipleConditionWindows);
}
abstract protected void save(MultipleConditionStorageInterface multipleConditionStorage, Flow flow, List<MultipleConditionWindow> multipleConditionWindows);

@Test
void daily() {
Expand All @@ -43,7 +41,7 @@ void daily() {
assertThat(window.getStart().toLocalTime(), is(LocalTime.parse("00:00:00")));
assertThat(window.getStart().toLocalDate(), is(ZonedDateTime.now().toLocalDate()));

assertThat(window.getEnd().toLocalTime(), is(LocalTime.parse("23:59:59.999999999")));
assertThat(window.getEnd().toLocalTime(), is(LocalTime.parse("23:59:59.999")));
assertThat(window.getEnd().toLocalDate(), is(ZonedDateTime.now().toLocalDate()));
}

Expand All @@ -60,7 +58,7 @@ void dailyAdvance() {
assertThat(window.getStart().toLocalTime(), is(LocalTime.parse("20:00:00")));
assertThat(window.getStart().toLocalDate(), is(ZonedDateTime.now().minusDays(1).toLocalDate()));

assertThat(window.getEnd().toLocalTime(), is(LocalTime.parse("19:59:59.999999999")));
assertThat(window.getEnd().toLocalTime(), is(LocalTime.parse("19:59:59.999")));
assertThat(window.getEnd().toLocalDate(), is(ZonedDateTime.now().toLocalDate()));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.runner.mysql;

import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.jdbc.runner.AbstractJdbcMultipleConditionStorage;
import io.kestra.repository.mysql.MysqlRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Singleton;

@Singleton
@MysqlQueueEnabled
public class MysqlMultipleConditionStorage extends AbstractJdbcMultipleConditionStorage {
public MysqlMultipleConditionStorage(ApplicationContext applicationContext) {
super(new MysqlRepository<>(MultipleConditionWindow.class, applicationContext));
}
}
109 changes: 81 additions & 28 deletions jdbc-mysql/src/main/resources/migrations/mysql/V1__initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,37 @@ CREATE FUNCTION PARSE_ISO8601_DURATION(duration VARCHAR(20))
CONTAINS SQL
DETERMINISTIC
BEGIN
RETURN
CASE
WHEN duration LIKE 'P%DT%H%M%.%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'P%dDT%HH%iM%s.%fS.%f'))
WHEN duration LIKE 'P%DT%H%M%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'P%dDT%HH%iM%sS.%f'))
WHEN duration LIKE 'PT%H%M%.%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%HH%iM%s.%fS.%f'))
WHEN duration LIKE 'PT%H%M%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%HH%iM%sS.%f'))
WHEN duration LIKE 'PT%M%.%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%iM%s.%fS.%f'))
WHEN duration LIKE 'PT%M%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%iM%sS.%f'))
WHEN duration LIKE 'PT%.%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%s.%fS.%f'))
WHEN duration LIKE 'PT%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%sS.%f'))
END;
RETURN
CASE
WHEN duration LIKE 'P%DT%H%M%.%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'P%dDT%HH%iM%s.%fS.%f'))
WHEN duration LIKE 'P%DT%H%M%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'P%dDT%HH%iM%sS.%f'))
WHEN duration LIKE 'PT%H%M%.%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%HH%iM%s.%fS.%f'))
WHEN duration LIKE 'PT%H%M%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%HH%iM%sS.%f'))
WHEN duration LIKE 'PT%M%.%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%iM%s.%fS.%f'))
WHEN duration LIKE 'PT%M%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%iM%sS.%f'))
WHEN duration LIKE 'PT%.%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%s.%fS.%f'))
WHEN duration LIKE 'PT%S' THEN TO_SECONDS(STR_TO_DATE(duration, 'PT%sS.%f'))
END;

END //
DELIMITER ;

DELIMITER //
CREATE FUNCTION PARSE_ISO8601_DATETIME(date VARCHAR(50))
RETURNS datetime
LANGUAGE SQL
CONTAINS SQL
DETERMINISTIC
BEGIN
RETURN IF(
SUBSTRING(date, LENGTH(date), LENGTH(date)) = 'Z',
STR_TO_DATE(date, '%Y-%m-%dT%H:%i:%s.%fZ'),
CONVERT_TZ(
STR_TO_DATE(SUBSTRING(date, 1, LENGTH(date) - 6), '%Y-%m-%dT%H:%i:%s.%f'),
SUBSTRING(date, LENGTH(date) - 5, 5),
'UTC'
)
);
END //
DELIMITER ;

Expand Down Expand Up @@ -92,14 +112,14 @@ CREATE TABLE `executions` (
'KILLED'
) GENERATED ALWAYS AS (value ->> '$.state.current') STORED NOT NULL,
`state_duration` BIGINT GENERATED ALWAYS AS (value ->> '$.state.duration' * 1000) STORED NOT NULL,
`start_date` TIMESTAMP GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.state.startDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL,
INDEX ix_executions_id (id),
INDEX ix_executions_namespace (namespace),
INDEX ix_executions_flowId (flow_id),
INDEX ix_executions_state_current (state_current),
INDEX ix_executions_start_date (start_date),
INDEX ix_executions_state_duration (state_duration),
INDEX ix_executions_deleted (deleted),
`start_date` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.state.startDate' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL,
INDEX ix_id (id),
INDEX ix_namespace (namespace),
INDEX ix_flowId (flow_id),
INDEX ix_state_current (state_current),
INDEX ix_start_date (start_date),
INDEX ix_state_duration (state_duration),
INDEX ix_deleted (deleted),
FULLTEXT ix_fulltext (namespace, flow_id, id)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

Expand Down Expand Up @@ -134,14 +154,47 @@ CREATE TABLE logs (
'DEBUG',
'TRACE'
) GENERATED ALWAYS AS (value ->> '$.level') STORED NOT NULL,
`timestamp` TIMESTAMP GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.timestamp' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL,

INDEX logs_namespace (namespace),
INDEX logs_flowId (flow_id),
INDEX logs_task_id (task_id),
INDEX logs_execution_id (execution_id),
INDEX logs_taskrun_id (taskrun_id),
INDEX logs_trigger_id (trigger_id),
INDEX logs_timestamp (timestamp),
`timestamp` DATETIME(6) GENERATED ALWAYS AS (STR_TO_DATE(value ->> '$.timestamp' , '%Y-%m-%dT%H:%i:%s.%fZ')) STORED NOT NULL,

INDEX ix_namespace (namespace),
INDEX ix_flowId (flow_id),
INDEX ix_task_id (task_id),
INDEX ix_execution_id (execution_id),
INDEX ix_taskrun_id (taskrun_id),
INDEX ix_trigger_id (trigger_id),
INDEX ix_timestamp (timestamp),
FULLTEXT ix_fulltext (namespace, flow_id, task_id, execution_id, taskrun_id, trigger_id, message, thread)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;


CREATE TABLE multipleconditions (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
`condition_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.conditionId') STORED NOT NULL,
`start_date` DATETIME(6) GENERATED ALWAYS AS (
IF(
SUBSTRING(value ->> '$.start', LENGTH(value ->> '$.start'), LENGTH(value ->> '$.start')) = 'Z',
STR_TO_DATE(value ->> '$.start', '%Y-%m-%dT%H:%i:%s.%fZ'),
CONVERT_TZ(
STR_TO_DATE(SUBSTRING(value ->> '$.start', 1, LENGTH(value ->> '$.start') - 6), '%Y-%m-%dT%H:%i:%s.%f'),
SUBSTRING(value ->> '$.start', LENGTH(value ->> '$.start') - 5, 5),
'UTC'
)
)
) STORED NOT NULL,
`end_date` DATETIME(6) GENERATED ALWAYS AS (
IF(
SUBSTRING(value ->> '$.end', LENGTH(value ->> '$.end'), LENGTH(value ->> '$.end')) = 'Z',
STR_TO_DATE(value ->> '$.end', '%Y-%m-%dT%H:%i:%s.%fZ'),
CONVERT_TZ(
STR_TO_DATE(SUBSTRING(value ->> '$.end', 1, LENGTH(value ->> '$.end') - 6), '%Y-%m-%dT%H:%i:%s.%f'),
SUBSTRING(value ->> '$.end', LENGTH(value ->> '$.end') - 5, 5),
'UTC'
)
)
) STORED NOT NULL,
INDEX namespace__flow_id__condition_id (namespace, flow_id, condition_id),
INDEX start_date__end_date (start_date, end_date)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.kestra.runner.mysql;

import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.multipleflows.AbstractMultipleConditionStorageTest;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.jdbc.JdbcTestUtils;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;

import java.util.List;

class MysqlMultipleConditionStorageTest extends AbstractMultipleConditionStorageTest {
@Inject
ApplicationContext applicationContext;

@Inject
JdbcTestUtils jdbcTestUtils;

protected MultipleConditionStorageInterface multipleConditionStorage() {
return new MysqlMultipleConditionStorage(applicationContext);
}

protected void save(MultipleConditionStorageInterface multipleConditionStorage, Flow flow, List<MultipleConditionWindow> multipleConditionWindows) {
((MysqlMultipleConditionStorage) multipleConditionStorage).save(multipleConditionWindows);
}


@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}
3 changes: 3 additions & 0 deletions jdbc-mysql/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ kestra:
logs:
table: "logs"
cls: io.kestra.core.models.executions.LogEntry
multipleconditions:
table: "multipleconditions"
cls: io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,17 @@ public Map<Field<Object>, Object> persistFields(T entity) {
}

@SneakyThrows
public void persist(T entity, @Nullable Map<Field<Object>, Object> fields) {
public void persist(T entity, DSLContext context, @Nullable Map<Field<Object>, Object> fields) {
Map<Field<Object>, Object> finalFields = fields == null ? this.persistFields(entity) : fields;

dslContext.transaction(configuration -> DSL
.using(configuration)
context
.insertInto(table)
.set(DSL.field(DSL.quotedName("key")), queueService.key(entity))
.set(finalFields)
.onConflict(DSL.field(DSL.quotedName("key")))
.doUpdate()
.set(finalFields)
.execute()
);
.execute();
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.runner.postgres;

import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.jdbc.runner.AbstractJdbcMultipleConditionStorage;
import io.kestra.repository.postgres.PostgresRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Singleton;

@Singleton
@PostgresQueueEnabled
public class PostgresMultipleConditionStorage extends AbstractJdbcMultipleConditionStorage {
public PostgresMultipleConditionStorage(ApplicationContext applicationContext) {
super(new PostgresRepository<>(MultipleConditionWindow.class, applicationContext));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,19 @@ CREATE INDEX logs_execution_id ON logs (execution_id);
CREATE INDEX logs_taskrun_id ON logs (taskrun_id);
CREATE INDEX logs_trigger_id ON logs (trigger_id);
CREATE INDEX logs_timestamp ON logs (timestamp);
CREATE INDEX logs_fulltext ON logs USING GIN (fulltext);
CREATE INDEX logs_fulltext ON logs USING GIN (fulltext);


/* ----------------------- multipleconditions ----------------------- */
CREATE TABLE multipleconditions (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL,
namespace VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED,
flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED,
condition_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'conditionId') STORED,
start_date TIMESTAMP NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'start')) STORED,
end_date TIMESTAMP NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'end')) STORED
);

CREATE INDEX multipleconditions_namespace__flow_id__condition_id ON multipleconditions (namespace, flow_id, condition_id);
CREATE INDEX multipleconditions_namespace__start_date__end_date ON multipleconditions (start_date, end_date);
Loading

0 comments on commit 521ddde

Please sign in to comment.