Skip to content

Commit

Permalink
feat(jdbc): add support for executor delayed
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 17, 2022
1 parent 98f529a commit 63aab8f
Show file tree
Hide file tree
Showing 17 changed files with 349 additions and 121 deletions.
3 changes: 3 additions & 0 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ kestra:
executorstate:
table: "executorstate"
cls: io.kestra.jdbc.runner.JdbcExecutorState
executordelayed:
table: "executordelayed"
cls: io.kestra.core.runners.ExecutionDelay

elasticsearch:
defaults:
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/io/kestra/core/utils/ExecutorsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,21 @@ public ExecutorService singleThreadExecutor(String name) {
);
}

public ExecutorService singleThreadScheduledExecutor(String name) {
return this.wrap(
name,
Executors.newSingleThreadScheduledExecutor(
threadFactoryBuilder.build(name + "_%d")
)
);
}

private ExecutorService wrap(String name, ExecutorService executorService) {
return ExecutorServiceMetrics.monitor(
meterRegistry,
executorService,
name
);
}

}
107 changes: 62 additions & 45 deletions core/src/test/java/io/kestra/core/tasks/flows/PauseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.services.ExecutionService;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.junit.jupiter.api.Test;

import java.time.Duration;
Expand All @@ -19,57 +21,72 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

class PauseTest extends AbstractMemoryRunnerTest {
public class PauseTest extends AbstractMemoryRunnerTest {
@Inject
ExecutionService executionService;

@Inject
FlowRepositoryInterface flowRepository;

@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
Suite suite;

@Test
void run() throws Exception {
Flow flow = flowRepository.findById("io.kestra.tests", "pause").orElseThrow();
Execution execution = runnerUtils.runOne("io.kestra.tests", "pause", Duration.ofSeconds(120));

assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList(), hasSize(1));

Execution restarted = executionService.markAs(
execution,
execution.findTaskRunByTaskIdAndValue("pause", List.of()).getId(),
State.Type.RUNNING
);

execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
() -> executionQueue.emit(restarted),
Duration.ofSeconds(120)
);

assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
suite.run(runnerUtils);
}

@Test
void runDelay() throws Exception {
Execution execution = runnerUtils.runOne("io.kestra.tests", "pause-delay");

assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList(), hasSize(1));

execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
() -> {},
Duration.ofSeconds(30)
);

assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(2L));
assertThat(execution.getTaskRunList(), hasSize(3));
void failed() throws Exception {
suite.runDelay(runnerUtils);
}

@Singleton
public static class Suite {
@Inject
ExecutionService executionService;

@Inject
FlowRepositoryInterface flowRepository;

@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;

public void run(RunnerUtils runnerUtils) throws Exception {
Flow flow = flowRepository.findById("io.kestra.tests", "pause").orElseThrow();
Execution execution = runnerUtils.runOne("io.kestra.tests", "pause", Duration.ofSeconds(120));

assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList(), hasSize(1));

Execution restarted = executionService.markAs(
execution,
execution.findTaskRunByTaskIdAndValue("pause", List.of()).getId(),
State.Type.RUNNING
);

execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
() -> executionQueue.emit(restarted),
Duration.ofSeconds(120)
);

assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
}

public void runDelay(RunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOne("io.kestra.tests", "pause-delay");

assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList(), hasSize(1));

execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
() -> {},
Duration.ofSeconds(30)
);

assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(2L));
assertThat(execution.getTaskRunList(), hasSize(3));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.runner.mysql;

import io.kestra.core.runners.ExecutionDelay;
import io.kestra.jdbc.runner.AbstractExecutionDelayStorage;
import io.kestra.repository.mysql.MysqlRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Singleton;

@Singleton
@MysqlQueueEnabled
public class MysqlExecutionDelayStorage extends AbstractExecutionDelayStorage {
public MysqlExecutionDelayStorage(ApplicationContext applicationContext) {
super(new MysqlRepository<>(ExecutionDelay.class, applicationContext));
}
}
22 changes: 20 additions & 2 deletions jdbc-mysql/src/main/resources/migrations/mysql/V1__initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ CREATE TABLE multipleconditions (
)
)
) STORED NOT NULL,
INDEX namespace__flow_id__condition_id (namespace, flow_id, condition_id),
INDEX start_date__end_date (start_date, end_date)
INDEX ix_namespace__flow_id__condition_id (namespace, flow_id, condition_id),
INDEX ix_start_date__end_date (start_date, end_date)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;


Expand All @@ -212,3 +212,21 @@ CREATE TABLE executorstate (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;


CREATE TABLE executordelayed (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`date` DATETIME(6) GENERATED ALWAYS AS (
IF(
SUBSTRING(value ->> '$.date', LENGTH(value ->> '$.date'), LENGTH(value ->> '$.date')) = 'Z',
STR_TO_DATE(value ->> '$.date', '%Y-%m-%dT%H:%i:%s.%fZ'),
CONVERT_TZ(
STR_TO_DATE(SUBSTRING(value ->> '$.date', 1, LENGTH(value ->> '$.date') - 6), '%Y-%m-%dT%H:%i:%s.%f'),
SUBSTRING(value ->> '$.date', LENGTH(value ->> '$.date') - 5, 5),
'UTC'
)
)
) STORED NOT NULL,
INDEX ix_date (`date`)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
5 changes: 4 additions & 1 deletion jdbc-mysql/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,7 @@ kestra:
cls: io.kestra.core.runners.WorkerTaskExecution
executorstate:
table: "executorstate"
cls: io.kestra.jdbc.runner.JdbcExecutorState
cls: io.kestra.jdbc.runner.JdbcExecutorState
executordelayed:
table: "executordelayed"
cls: io.kestra.core.runners.ExecutionDelay
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.runner.postgres;

import io.kestra.core.runners.ExecutionDelay;
import io.kestra.jdbc.runner.AbstractExecutionDelayStorage;
import io.kestra.repository.postgres.PostgresRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Singleton;

@Singleton
@PostgresQueueEnabled
public class PostgresExecutionDelayStorage extends AbstractExecutionDelayStorage {
public PostgresExecutionDelayStorage(ApplicationContext applicationContext) {
super(new PostgresRepository<>(ExecutionDelay.class, applicationContext));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ protected Map<Field<Object>, Object> produceFields(String key, T message) {

map.put(
DSL.field(DSL.quotedName("type")),
DSL.field("CAST(? AS " + this.prefix + "queue_type)", this.cls.getName())
DSL.field("CAST(? AS queue_type)", this.cls.getName())
);

return map;
Expand All @@ -43,7 +43,7 @@ protected Result<Record> receiveFetch(DSLContext ctx, @NonNull Integer offset) {
"FROM " + table.getName() + "\n" +
"WHERE 1 = 1" + "\n" +
(offset != 0 ? "AND \"offset\" > ?" + "\n" : "") +
"AND type = CAST(? AS " + this.prefix + "queue_type)" + "\n" +
"AND type = CAST(? AS queue_type)" + "\n" +
"ORDER BY \"offset\" ASC" + "\n" +
"LIMIT 10" + "\n" +
"FOR UPDATE SKIP LOCKED",
Expand All @@ -62,9 +62,9 @@ protected Result<Record> receiveFetch(DSLContext ctx, String consumerGroup) {
"FROM " + table.getName() + "\n" +
"WHERE (" +
" \"consumers\" IS NULL" + "\n" +
" OR NOT(CAST(? AS " + this.prefix + "queue_consumers) = ANY(\"consumers\"))" + "\n" +
" OR NOT(CAST(? AS queue_consumers) = ANY(\"consumers\"))" + "\n" +
")" + "\n" +
"AND type = CAST(? AS " + this.prefix + "queue_type)" + "\n" +
"AND type = CAST(? AS queue_type)" + "\n" +
"ORDER BY \"offset\" ASC" + "\n" +
"LIMIT 10" + "\n" +
"FOR UPDATE SKIP LOCKED",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,14 @@ CREATE TABLE workertaskexecutions (
CREATE TABLE executorstate (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL
);
);


/* ----------------------- executorstate ----------------------- */
CREATE TABLE executordelayed (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL,
date TIMESTAMPTZ NOT NULL GENERATED ALWAYS AS (PARSE_ISO8601_DATETIME(value ->> 'date')) STORED
);

CREATE INDEX executordelayed_date ON executordelayed (date);
3 changes: 3 additions & 0 deletions jdbc-postgres/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@ kestra:
executorstate:
table: "executorstate"
cls: io.kestra.jdbc.runner.JdbcExecutorState
executordelayed:
table: "executordelayed"
cls: io.kestra.core.runners.ExecutionDelay
16 changes: 10 additions & 6 deletions jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,16 @@ public void persist(T entity, DSLContext dslContext, Map<Field<Object>, Object>
}

public void delete(T entity) {
dslContext.transaction(configuration ->
DSL.using(configuration)
.delete(table)
.where(DSL.field(DSL.quotedName("key")).eq(key(entity)))
.execute()
);
dslContext.transaction(configuration -> {
this.delete(DSL.using(configuration), entity);
});
}

public void delete(DSLContext dslContext, T entity) {
dslContext
.delete(table)
.where(DSL.field(DSL.quotedName("key")).eq(key(entity)))
.execute();
}

public <R extends Record> T map(R record) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.kestra.jdbc.runner;

import io.kestra.core.runners.ExecutionDelay;
import io.kestra.jdbc.AbstractJdbcRepository;
import io.kestra.jdbc.repository.AbstractRepository;
import org.jooq.Field;
import org.jooq.Record1;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;

import java.time.ZonedDateTime;
import java.util.Map;
import java.util.function.Consumer;

public abstract class AbstractExecutionDelayStorage extends AbstractRepository {
protected AbstractJdbcRepository<ExecutionDelay> jdbcRepository;

public AbstractExecutionDelayStorage(AbstractJdbcRepository<ExecutionDelay> jdbcRepository) {
this.jdbcRepository = jdbcRepository;
}

public void get(Consumer<ExecutionDelay> consumer) {
ZonedDateTime now = ZonedDateTime.now();

this.jdbcRepository
.getDslContext()
.transaction(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(
DSL.field("date").lessOrEqual(now.toOffsetDateTime())
);

this.jdbcRepository.fetch(select)
.forEach(executionDelay -> {
consumer.accept(executionDelay);
jdbcRepository.delete(executionDelay);
});
});
}

public void save(ExecutionDelay executionDelay) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionDelay);
this.jdbcRepository.persist(executionDelay, fields);
}
}
Loading

0 comments on commit 63aab8f

Please sign in to comment.