Skip to content

Commit

Permalink
feat(jdbc): implementation of WorkerTaskExecutionStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 17, 2022
1 parent 521ddde commit d308d51
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 6 deletions.
3 changes: 3 additions & 0 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ kestra:
multipleconditions:
table: "multipleconditions"
cls: io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow
workertaskexecutions:
table: "workertaskexecutions"
cls: io.kestra.core.runners.WorkerTaskExecution

elasticsearch:
defaults:
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/java/io/kestra/core/queues/QueueService.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.core.runners.WorkerTask;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.runners.WorkerTaskRunning;
import io.kestra.core.runners.*;

import jakarta.inject.Singleton;

Expand Down Expand Up @@ -39,6 +36,8 @@ public String key(Object object) {
return ((Trigger) object).uid();
} else if (object.getClass() == MultipleConditionWindow.class) {
return ((MultipleConditionWindow) object).uid();
} else if (object.getClass() == WorkerTaskExecution.class) {
return ((WorkerTaskExecution) object).getExecution().getId();
} else {
throw new IllegalArgumentException("Unknown type '" + object.getClass().getName() + "'");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.runner.mysql;

import io.kestra.core.runners.WorkerTaskExecution;
import io.kestra.jdbc.runner.AbstractWorkerTaskExecutionStorage;
import io.kestra.repository.mysql.MysqlRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Singleton;

@Singleton
@MysqlQueueEnabled
public class MysqlWorkerTaskExecutionStorage extends AbstractWorkerTaskExecutionStorage {
public MysqlWorkerTaskExecutionStorage(ApplicationContext applicationContext) {
super(new MysqlRepository<>(WorkerTaskExecution.class, applicationContext));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,9 @@ CREATE TABLE multipleconditions (
INDEX namespace__flow_id__condition_id (namespace, flow_id, condition_id),
INDEX start_date__end_date (start_date, end_date)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;


CREATE TABLE workertaskexecutions (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.kestra.runner.mysql;

import io.kestra.core.models.triggers.multipleflows.AbstractMultipleConditionStorageTest;
import io.kestra.jdbc.runner.AbstractWorkerTaskExecutionStorage;
import io.kestra.jdbc.runner.AbstractWorkerTaskExecutionTest;

class MysqlWorkerTaskExecutionStorageTest extends AbstractWorkerTaskExecutionTest {

}
3 changes: 3 additions & 0 deletions jdbc-mysql/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ kestra:
multipleconditions:
table: "multipleconditions"
cls: io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow
workertaskexecutions:
table: "workertaskexecutions"
cls: io.kestra.core.runners.WorkerTaskExecution
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

@Singleton
@PostgresQueueEnabled
public class PostgresMultipleConditionStorage extends AbstractJdbcMultipleConditionStorage {
public class PostgresMultipleConditionStorage extends AbstractJdbcMultipleConditionStorage {
public PostgresMultipleConditionStorage(ApplicationContext applicationContext) {
super(new PostgresRepository<>(MultipleConditionWindow.class, applicationContext));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.runner.postgres;

import io.kestra.core.runners.WorkerTaskExecution;
import io.kestra.jdbc.runner.AbstractWorkerTaskExecutionStorage;
import io.kestra.repository.postgres.PostgresRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Singleton;

@Singleton
@PostgresQueueEnabled
public class PostgresWorkerTaskExecutionStorage extends AbstractWorkerTaskExecutionStorage {
public PostgresWorkerTaskExecutionStorage(ApplicationContext applicationContext) {
super(new PostgresRepository<>(WorkerTaskExecution.class, applicationContext));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,11 @@ CREATE TABLE multipleconditions (
);

CREATE INDEX multipleconditions_namespace__flow_id__condition_id ON multipleconditions (namespace, flow_id, condition_id);
CREATE INDEX multipleconditions_namespace__start_date__end_date ON multipleconditions (start_date, end_date);
CREATE INDEX multipleconditions_namespace__start_date__end_date ON multipleconditions (start_date, end_date);


/* ----------------------- workertaskexecutions ----------------------- */
CREATE TABLE workertaskexecutions (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.kestra.runner.postgres;

import io.kestra.jdbc.runner.AbstractWorkerTaskExecutionTest;

class PostgresWorkerTaskExecutionStorageTest extends AbstractWorkerTaskExecutionTest {

}
3 changes: 3 additions & 0 deletions jdbc-postgres/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ kestra:
multipleconditions:
table: "multipleconditions"
cls: io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow
workertaskexecutions:
table: "workertaskexecutions"
cls: io.kestra.core.runners.WorkerTaskExecution
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.kestra.jdbc.runner;

import io.kestra.core.runners.WorkerTaskExecution;
import io.kestra.jdbc.AbstractJdbcRepository;
import io.kestra.jdbc.repository.AbstractRepository;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Record1;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;

import java.util.List;
import java.util.Map;
import java.util.Optional;

public abstract class AbstractWorkerTaskExecutionStorage extends AbstractRepository {
protected AbstractJdbcRepository<WorkerTaskExecution> jdbcRepository;

public AbstractWorkerTaskExecutionStorage(AbstractJdbcRepository<WorkerTaskExecution> jdbcRepository) {
this.jdbcRepository = jdbcRepository;
}

public Optional<WorkerTaskExecution> get(String executionId) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(
DSL.field(DSL.quotedName("key")).eq(executionId)
);

return this.jdbcRepository.fetchOne(select);
});
}

public void save(List<WorkerTaskExecution> workerTaskExecutions) {
this.jdbcRepository
.getDslContext()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);

workerTaskExecutions.forEach(workerTaskExecution -> {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(workerTaskExecution);
this.jdbcRepository.persist(workerTaskExecution, context, fields);
});
});
}

public void delete(WorkerTaskExecution workerTaskExecution) {
this.jdbcRepository.delete(workerTaskExecution);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.kestra.jdbc.runner;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.tasks.flows.Flow;
import io.kestra.core.runners.WorkerTaskExecution;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.JdbcTestUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Optional;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@MicronautTest(transactional = false)
public abstract class AbstractWorkerTaskExecutionTest {
@Inject
AbstractWorkerTaskExecutionStorage workerTaskExecutionStorage;

@Inject
JdbcTestUtils jdbcTestUtils;

@Test
void suite() throws Exception {

WorkerTaskExecution workerTaskExecution = WorkerTaskExecution.builder()
.execution(Execution.builder().id(IdUtils.create()).build())
.task(Flow.builder().type(Flow.class.getName()).id(IdUtils.create()).build())
.taskRun(TaskRun.builder().id(IdUtils.create()).build())
.build();

workerTaskExecutionStorage.save(List.of(workerTaskExecution));


Optional<WorkerTaskExecution> find = workerTaskExecutionStorage.get(workerTaskExecution.getExecution().getId());
assertThat(find.isPresent(), is(true));
assertThat(find.get().getExecution().getId(), is(workerTaskExecution.getExecution().getId()));


workerTaskExecutionStorage.delete(workerTaskExecution);

find = workerTaskExecutionStorage.get(workerTaskExecution.getExecution().getId());
assertThat(find.isPresent(), is(false));
}

@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

0 comments on commit d308d51

Please sign in to comment.