Skip to content

Commit

Permalink
feat(jdbc): first implementation of jdbc runner
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 17, 2022
1 parent 8b3fcbd commit 6ea7118
Show file tree
Hide file tree
Showing 29 changed files with 1,043 additions and 60 deletions.
3 changes: 3 additions & 0 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ kestra:
workertaskexecutions:
table: "workertaskexecutions"
cls: io.kestra.core.runners.WorkerTaskExecution
executorstate:
table: "executorstate"
cls: io.kestra.jdbc.runner.JdbcExecutorState

elasticsearch:
defaults:
Expand Down
45 changes: 45 additions & 0 deletions core/src/main/java/io/kestra/core/runners/DefaultFlowExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.kestra.core.runners;

import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowListenersInterface;
import lombok.Setter;

import java.util.Collection;
import java.util.List;
import java.util.Optional;

public class DefaultFlowExecutor implements FlowExecutorInterface {
private final FlowRepositoryInterface flowRepositoryInterface;
@Setter
private List<Flow> allFlows;

public DefaultFlowExecutor(FlowListenersInterface flowListeners, FlowRepositoryInterface flowRepositoryInterface) {
this.flowRepositoryInterface = flowRepositoryInterface;
flowListeners.listen(flows -> {
this.allFlows = flows;
});
}

@Override
public Collection<Flow> allLastVersion() {
return this.allFlows;
}

@Override
public Optional<Flow> findById(String namespace, String id, Optional<Integer> revision) {
Optional<Flow> find = this.allFlows
.stream()
.filter(flow -> flow.getNamespace().equals(namespace) &&
flow.getId().equals(id) &&
(revision.isEmpty() || revision.get().equals(flow.getRevision()))
)
.findFirst();

if (find.isPresent()) {
return find;
} else {
return flowRepositoryInterface.findById(namespace, id, revision);
}
}
}
25 changes: 0 additions & 25 deletions core/src/main/java/io/kestra/core/runners/MemoryFlowExecutor.java

This file was deleted.

1 change: 1 addition & 0 deletions jdbc-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ dependencies {
testImplementation project(':core').sourceSets.test.output
testImplementation project(':jdbc').sourceSets.test.output
testImplementation project(':runner-memory')
testImplementation project(':storage-local')
testImplementation 'org.mockito:mockito-junit-jupiter:4.5.1'
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.jdbc.repository.AbstractExecutionRepository;
import io.kestra.jdbc.runner.AbstractExecutorStateStorage;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand All @@ -14,11 +15,10 @@
@MysqlRepositoryEnabled
public class MysqlExecutionRepository extends AbstractExecutionRepository implements ExecutionRepositoryInterface {
@Inject
public MysqlExecutionRepository(ApplicationContext applicationContext) {
super(new MysqlRepository<>(Execution.class, applicationContext), applicationContext);
public MysqlExecutionRepository(ApplicationContext applicationContext, AbstractExecutorStateStorage executorStateStorage) {
super(new MysqlRepository<>(Execution.class, applicationContext), executorStateStorage);
}


@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "id"), query);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.runner.mysql;

import io.kestra.jdbc.runner.AbstractExecutorStateStorage;
import io.kestra.jdbc.runner.JdbcExecutorState;
import io.kestra.repository.mysql.MysqlRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Singleton;

@Singleton
@MysqlQueueEnabled
public class MysqlExecutorStateStorage extends AbstractExecutorStateStorage {
public MysqlExecutorStateStorage(ApplicationContext applicationContext) {
super(new MysqlRepository<>(JdbcExecutorState.class, applicationContext));
}
}
13 changes: 8 additions & 5 deletions jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected Result<Record> receiveFetch(DSLContext ctx, String consumerGroup) {
"ORDER BY offset ASC" + "\n" +
"LIMIT 10" + "\n" +
"FOR UPDATE SKIP LOCKED",
"indexer",
consumerGroup,
this.cls.getName()
)
.fetch();
Expand All @@ -61,10 +61,13 @@ protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, List<Int
ctx
.query(
"UPDATE " + table.getName() + "\n" +
"SET consumers = CONCAT(consumers, '" + consumerGroup + "')\n" +
"WHERE offset IN (" + offsets.stream()
.map(Object::toString)
.collect(Collectors.joining(",")) + ")",
"SET consumers = CONCAT_WS(',', consumers, '" + consumerGroup + "')\n" +
"WHERE offset IN (" +
offsets
.stream()
.map(Object::toString)
.collect(Collectors.joining(",")) +
")",
consumerGroup
)
.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ CREATE TABLE logs (
`task_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.taskId') STORED,
`execution_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.executionId') STORED NOT NULL,
`taskrun_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.taskRunId') STORED,
`attempt_number` INT GENERATED ALWAYS AS (value ->> '$.attemptNumber') STORED,
`attempt_number` INT GENERATED ALWAYS AS (IF(value ->> '$.attemptNumber' = 'null', NULL, value ->> '$.attemptNumber')) STORED,
`trigger_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.triggerId') STORED,
`message` TEXT GENERATED ALWAYS AS (value ->> '$.message') STORED,
`thread` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.thread') STORED,
Expand Down Expand Up @@ -204,3 +204,9 @@ CREATE TABLE workertaskexecutions (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;


CREATE TABLE executorstate (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.kestra.runner.mysql;

import io.kestra.jdbc.runner.JdbcRunnerTest;

public class MysqlRunnerTest extends JdbcRunnerTest {

}
7 changes: 7 additions & 0 deletions jdbc-mysql/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ kestra:
type: mysql
repository:
type: mysql
storage:
type: local
local:
base-path: /tmp/unittest

jdbc:
tables:
Expand All @@ -44,3 +48,6 @@ kestra:
workertaskexecutions:
table: "workertaskexecutions"
cls: io.kestra.core.runners.WorkerTaskExecution
executorstate:
table: "executorstate"
cls: io.kestra.jdbc.runner.JdbcExecutorState
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.jdbc.repository.AbstractExecutionRepository;
import io.kestra.jdbc.runner.AbstractExecutorStateStorage;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand All @@ -14,8 +15,8 @@
@PostgresRepositoryEnabled
public class PostgresExecutionRepository extends AbstractExecutionRepository implements ExecutionRepositoryInterface {
@Inject
public PostgresExecutionRepository(ApplicationContext applicationContext) {
super(new PostgresRepository<>(Execution.class, applicationContext), applicationContext);
public PostgresExecutionRepository(ApplicationContext applicationContext, AbstractExecutorStateStorage executorStateStorage) {
super(new PostgresRepository<>(Execution.class, applicationContext), executorStateStorage);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.runner.postgres;

import io.kestra.jdbc.runner.AbstractExecutorStateStorage;
import io.kestra.jdbc.runner.JdbcExecutorState;
import io.kestra.repository.postgres.PostgresRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Singleton;

@Singleton
@PostgresQueueEnabled
public class PostgresExecutorStateStorage extends AbstractExecutorStateStorage {
public PostgresExecutorStateStorage(ApplicationContext applicationContext) {
super(new PostgresRepository<>(JdbcExecutorState.class, applicationContext));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,10 @@ CREATE TABLE workertaskexecutions (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL
);


/* ----------------------- executorstate ----------------------- */
CREATE TABLE executorstate (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.kestra.runner.postgres;

import io.kestra.jdbc.runner.JdbcRunnerTest;

public class PostgresRunnerTest extends JdbcRunnerTest {

}
3 changes: 3 additions & 0 deletions jdbc-postgres/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ kestra:
workertaskexecutions:
table: "workertaskexecutions"
cls: io.kestra.core.runners.WorkerTaskExecution
executorstate:
table: "executorstate"
cls: io.kestra.jdbc.runner.JdbcExecutorState
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.Executor;
import io.kestra.jdbc.AbstractJdbcRepository;
import io.micronaut.context.ApplicationContext;
import io.kestra.jdbc.runner.AbstractExecutorStateStorage;
import io.kestra.jdbc.runner.JdbcExecutorState;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import org.jooq.*;
import org.jooq.impl.DSL;

Expand All @@ -21,15 +25,18 @@
import java.time.temporal.ChronoUnit;
import java.util.Comparator;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

@Singleton
public abstract class AbstractExecutionRepository extends AbstractRepository implements ExecutionRepositoryInterface {
public abstract class AbstractExecutionRepository extends AbstractRepository implements ExecutionRepositoryInterface, JdbcIndexerInterface<Execution> {
protected AbstractJdbcRepository<Execution> jdbcRepository;
protected AbstractExecutorStateStorage executorStateStorage;

public AbstractExecutionRepository(AbstractJdbcRepository<Execution> jdbcRepository, ApplicationContext applicationContext) {
public AbstractExecutionRepository(AbstractJdbcRepository<Execution> jdbcRepository, AbstractExecutorStateStorage executorStateStorage) {
this.jdbcRepository = jdbcRepository;
this.executorStateStorage = executorStateStorage;
}

@Override
Expand Down Expand Up @@ -369,4 +376,45 @@ public Execution save(Execution execution) {

return execution;
}

@Override
public Execution save(DSLContext dslContext, Execution execution) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(execution);
this.jdbcRepository.persist(execution, dslContext, fields);

return execution;
}

public Executor lock(String executionId, Function<Pair<Execution, JdbcExecutorState>, Pair<Executor, JdbcExecutorState>> function) {
return this.jdbcRepository
.getDslContext()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);

SelectForUpdateOfStep<Record1<Object>> from = context
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(DSL.field("id").eq(executionId))
.forUpdate();

Optional<Execution> execution = this.jdbcRepository.fetchOne(from);

// not ready for now, skip and wait for a first state
if (execution.isEmpty()) {
return null;
}

JdbcExecutorState jdbcExecutorState = executorStateStorage.get(context, execution.get());
Pair<Executor, JdbcExecutorState> pair = function.apply(Pair.of(execution.get(), jdbcExecutorState));

if (pair != null) {
this.jdbcRepository.persist(pair.getKey().getExecution(), context, null);
this.executorStateStorage.save(context, pair.getRight());

return pair.getKey();
}

return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.jdbc.AbstractJdbcRepository;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Singleton;
import org.jooq.*;
import org.jooq.impl.DSL;
import org.slf4j.event.Level;

import java.util.Collections;
import java.util.List;
import java.util.Map;

@Singleton
public abstract class AbstractLogRepository extends AbstractRepository implements LogRepositoryInterface {
public abstract class AbstractLogRepository extends AbstractRepository implements LogRepositoryInterface, JdbcIndexerInterface<LogEntry> {
protected AbstractJdbcRepository<LogEntry> jdbcRepository;

public AbstractLogRepository(AbstractJdbcRepository<LogEntry> jdbcRepository) {
Expand Down Expand Up @@ -82,6 +82,14 @@ public LogEntry save(LogEntry log) {
return log;
}

@Override
public LogEntry save(DSLContext dslContext, LogEntry logEntry) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(logEntry);
this.jdbcRepository.persist(logEntry, dslContext, fields);

return logEntry;
}

private List<LogEntry> query(Condition condition, Level minLevel) {
return this.jdbcRepository
.getDslContext()
Expand Down
Loading

0 comments on commit 6ea7118

Please sign in to comment.