-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(jdbc): introduce h2 in memory runner
- Loading branch information
1 parent
90372ac
commit c162a36
Showing
63 changed files
with
1,326 additions
and
164 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
publishSonatypePublicationPublicationToSonatypeRepository.enabled = false | ||
|
||
dependencies { | ||
implementation project(":core") | ||
implementation project(":jdbc") | ||
|
||
implementation("io.micronaut.sql:micronaut-jooq") | ||
runtimeOnly("com.h2database:h2:2.1.212") | ||
|
||
testImplementation project(':core').sourceSets.test.output | ||
testImplementation project(':jdbc').sourceSets.test.output | ||
testImplementation project(':runner-memory') | ||
testImplementation project(':storage-local') | ||
testImplementation 'org.mockito:mockito-junit-jupiter:4.5.1' | ||
} |
26 changes: 26 additions & 0 deletions
26
jdbc-h2/src/main/java/io/kestra/repository/h2/H2ExecutionRepository.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package io.kestra.repository.h2; | ||
|
||
import io.kestra.core.models.executions.Execution; | ||
import io.kestra.core.repositories.ExecutionRepositoryInterface; | ||
import io.kestra.jdbc.repository.AbstractExecutionRepository; | ||
import io.kestra.jdbc.runner.AbstractExecutorStateStorage; | ||
import io.micronaut.context.ApplicationContext; | ||
import jakarta.inject.Inject; | ||
import jakarta.inject.Singleton; | ||
import org.jooq.Condition; | ||
|
||
import java.util.List; | ||
|
||
@Singleton | ||
@H2RepositoryEnabled | ||
public class H2ExecutionRepository extends AbstractExecutionRepository implements ExecutionRepositoryInterface { | ||
@Inject | ||
public H2ExecutionRepository(ApplicationContext applicationContext, AbstractExecutorStateStorage executorStateStorage) { | ||
super(new H2Repository<>(Execution.class, applicationContext), executorStateStorage); | ||
} | ||
|
||
@Override | ||
protected Condition findCondition(String query) { | ||
return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query); | ||
} | ||
} |
29 changes: 29 additions & 0 deletions
29
jdbc-h2/src/main/java/io/kestra/repository/h2/H2FlowRepository.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package io.kestra.repository.h2; | ||
|
||
import io.kestra.core.models.flows.Flow; | ||
import io.kestra.jdbc.repository.AbstractFlowRepository; | ||
import io.micronaut.context.ApplicationContext; | ||
import jakarta.inject.Inject; | ||
import jakarta.inject.Singleton; | ||
import org.jooq.Condition; | ||
|
||
import java.util.List; | ||
|
||
@Singleton | ||
@H2RepositoryEnabled | ||
public class H2FlowRepository extends AbstractFlowRepository { | ||
@Inject | ||
public H2FlowRepository(ApplicationContext applicationContext) { | ||
super(new H2Repository<>(Flow.class, applicationContext), applicationContext); | ||
} | ||
|
||
@Override | ||
protected Condition findCondition(String query) { | ||
return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query); | ||
} | ||
|
||
@Override | ||
protected Condition findSourceCodeCondition(String query) { | ||
return this.jdbcRepository.fullTextCondition(List.of("source_code"), query); | ||
} | ||
} |
25 changes: 25 additions & 0 deletions
25
jdbc-h2/src/main/java/io/kestra/repository/h2/H2LogRepository.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package io.kestra.repository.h2; | ||
|
||
import io.kestra.core.models.executions.LogEntry; | ||
import io.kestra.jdbc.repository.AbstractLogRepository; | ||
import io.micronaut.context.ApplicationContext; | ||
import jakarta.inject.Inject; | ||
import jakarta.inject.Singleton; | ||
import org.jooq.Condition; | ||
|
||
import java.util.List; | ||
|
||
@Singleton | ||
@H2RepositoryEnabled | ||
public class H2LogRepository extends AbstractLogRepository { | ||
@Inject | ||
public H2LogRepository(ApplicationContext applicationContext) { | ||
super(new H2Repository<>(LogEntry.class, applicationContext)); | ||
} | ||
|
||
@Override | ||
protected Condition findCondition(String query) { | ||
return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query); | ||
} | ||
} | ||
|
81 changes: 81 additions & 0 deletions
81
jdbc-h2/src/main/java/io/kestra/repository/h2/H2Repository.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package io.kestra.repository.h2; | ||
|
||
import io.kestra.core.repositories.ArrayListTotal; | ||
import io.kestra.jdbc.AbstractJdbcRepository; | ||
import io.kestra.jdbc.repository.AbstractRepository; | ||
import io.micronaut.context.ApplicationContext; | ||
import io.micronaut.data.model.Pageable; | ||
import lombok.SneakyThrows; | ||
import org.jooq.*; | ||
import org.jooq.impl.DSL; | ||
|
||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
import javax.annotation.Nullable; | ||
|
||
public class H2Repository<T> extends AbstractJdbcRepository<T> { | ||
public H2Repository(Class<T> cls, ApplicationContext applicationContext) { | ||
super(cls, applicationContext); | ||
} | ||
|
||
@SneakyThrows | ||
public void persist(T entity, DSLContext context, @Nullable Map<Field<Object>, Object> fields) { | ||
Map<Field<Object>, Object> finalFields = fields == null ? this.persistFields(entity) : fields; | ||
|
||
context | ||
.insertInto(table) | ||
.set(AbstractRepository.field("key"), key(entity)) | ||
.set(finalFields) | ||
.onConflict(AbstractRepository.field("key")) | ||
.doUpdate() | ||
.set(finalFields) | ||
.execute(); | ||
} | ||
|
||
public Condition fullTextCondition(List<String> fields, String query) { | ||
if (query == null || query.equals("*")) { | ||
return DSL.trueCondition(); | ||
} | ||
|
||
if (fields.size() > 1) { | ||
throw new IllegalStateException("Too many fields for h2 '" + fields + "'"); | ||
} | ||
|
||
Field<Object> field = AbstractRepository.field(fields.get(0)); | ||
|
||
List<LikeEscapeStep> match = Arrays | ||
.stream(query.split("\\p{P}|\\p{S}|\\p{Z}")) | ||
.map(s -> field.likeIgnoreCase("%" + s.toUpperCase(Locale.ROOT) + "%")) | ||
.collect(Collectors.toList()); | ||
|
||
if (match.size() == 0) { | ||
return DSL.falseCondition(); | ||
} | ||
|
||
return DSL.and(match); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
public <R extends Record, E> ArrayListTotal<E> fetchPage(DSLContext context, SelectConditionStep<R> select, Pageable pageable, RecordMapper<R, E> mapper) { | ||
Result<Record> results = this.limit( | ||
context.select(DSL.asterisk(), DSL.count().over().as("total_count")) | ||
.from(this | ||
.sort(select, pageable) | ||
.asTable("page") | ||
) | ||
.where(DSL.trueCondition()), | ||
pageable | ||
) | ||
.fetch(); | ||
|
||
Integer totalCount = results.size() > 0 ? results.get(0).get("total_count", Integer.class) : 0; | ||
|
||
List<E> map = results | ||
.map((Record record) -> mapper.map((R) record)); | ||
|
||
return new ArrayListTotal<>(map, totalCount); | ||
} | ||
} |
14 changes: 14 additions & 0 deletions
14
jdbc-h2/src/main/java/io/kestra/repository/h2/H2RepositoryEnabled.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package io.kestra.repository.h2; | ||
|
||
import io.micronaut.context.annotation.DefaultImplementation; | ||
import io.micronaut.context.annotation.Requires; | ||
|
||
import java.lang.annotation.*; | ||
|
||
@Documented | ||
@Retention(RetentionPolicy.RUNTIME) | ||
@Target({ElementType.PACKAGE, ElementType.TYPE}) | ||
@Requires(property = "kestra.repository.type", value = "h2") | ||
@DefaultImplementation | ||
public @interface H2RepositoryEnabled { | ||
} |
25 changes: 25 additions & 0 deletions
25
jdbc-h2/src/main/java/io/kestra/repository/h2/H2TemplateRepository.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package io.kestra.repository.h2; | ||
|
||
import io.kestra.core.models.templates.Template; | ||
import io.kestra.core.repositories.TemplateRepositoryInterface; | ||
import io.kestra.jdbc.repository.AbstractTemplateRepository; | ||
import io.micronaut.context.ApplicationContext; | ||
import jakarta.inject.Inject; | ||
import jakarta.inject.Singleton; | ||
import org.jooq.Condition; | ||
|
||
import java.util.List; | ||
|
||
@Singleton | ||
@H2RepositoryEnabled | ||
public class H2TemplateRepository extends AbstractTemplateRepository implements TemplateRepositoryInterface { | ||
@Inject | ||
public H2TemplateRepository(ApplicationContext applicationContext) { | ||
super(new H2Repository<>(Template.class, applicationContext), applicationContext); | ||
} | ||
|
||
@Override | ||
protected Condition findCondition(String query) { | ||
return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query); | ||
} | ||
} |
16 changes: 16 additions & 0 deletions
16
jdbc-h2/src/main/java/io/kestra/repository/h2/H2TriggerRepository.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package io.kestra.repository.h2; | ||
|
||
import io.kestra.core.models.triggers.Trigger; | ||
import io.kestra.jdbc.repository.AbstractTriggerRepository; | ||
import io.micronaut.context.ApplicationContext; | ||
import jakarta.inject.Inject; | ||
import jakarta.inject.Singleton; | ||
|
||
@Singleton | ||
@H2RepositoryEnabled | ||
public class H2TriggerRepository extends AbstractTriggerRepository { | ||
@Inject | ||
public H2TriggerRepository(ApplicationContext applicationContext) { | ||
super(new H2Repository<>(Trigger.class, applicationContext)); | ||
} | ||
} |
15 changes: 15 additions & 0 deletions
15
jdbc-h2/src/main/java/io/kestra/runner/h2/H2ExecutorStateStorage.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package io.kestra.runner.h2; | ||
|
||
import io.kestra.jdbc.runner.AbstractExecutorStateStorage; | ||
import io.kestra.jdbc.runner.JdbcExecutorState; | ||
import io.kestra.repository.h2.H2Repository; | ||
import io.micronaut.context.ApplicationContext; | ||
import jakarta.inject.Singleton; | ||
|
||
@Singleton | ||
@H2QueueEnabled | ||
public class H2ExecutorStateStorage extends AbstractExecutorStateStorage { | ||
public H2ExecutorStateStorage(ApplicationContext applicationContext) { | ||
super(new H2Repository<>(JdbcExecutorState.class, applicationContext)); | ||
} | ||
} |
63 changes: 63 additions & 0 deletions
63
jdbc-h2/src/main/java/io/kestra/runner/h2/H2Functions.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
package io.kestra.runner.h2; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.node.NullNode; | ||
import io.kestra.core.serializers.JacksonMapper; | ||
import lombok.SneakyThrows; | ||
import net.thisptr.jackson.jq.BuiltinFunctionLoader; | ||
import net.thisptr.jackson.jq.JsonQuery; | ||
import net.thisptr.jackson.jq.Scope; | ||
import net.thisptr.jackson.jq.Versions; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.function.Function; | ||
|
||
public class H2Functions { | ||
private static final Scope rootScope; | ||
private static final Scope scope; | ||
|
||
static { | ||
rootScope = Scope.newEmptyScope(); | ||
BuiltinFunctionLoader.getInstance().loadFunctions(Versions.JQ_1_6, rootScope); | ||
scope = Scope.newEmptyScope(); | ||
} | ||
|
||
public static Boolean jqBoolean(String value, String expression) { | ||
return H2Functions.jq(value, expression, JsonNode::asBoolean); | ||
} | ||
|
||
public static String jqString(String value, String expression) { | ||
return H2Functions.jq(value, expression, JsonNode::asText); | ||
} | ||
|
||
public static Long jqLong(String value, String expression) { | ||
return H2Functions.jq(value, expression, JsonNode::asLong); | ||
} | ||
|
||
public static Integer jqInteger(String value, String expression) { | ||
return H2Functions.jq(value, expression, JsonNode::asInt); | ||
} | ||
|
||
public static Double jqDouble(String value, String expression) { | ||
return H2Functions.jq(value, expression, JsonNode::asDouble); | ||
} | ||
|
||
@SneakyThrows | ||
private static <T> T jq(String value, String expression, Function<JsonNode, T> function) { | ||
JsonQuery q = JsonQuery.compile(expression, Versions.JQ_1_6); | ||
|
||
final List<JsonNode> out = new ArrayList<>(); | ||
JsonNode in = JacksonMapper.ofJson().readTree(value); | ||
|
||
q.apply(scope, in, out::add); | ||
|
||
JsonNode node = out.get(0); | ||
|
||
if (node instanceof NullNode) { | ||
return null; | ||
} else { | ||
return function.apply(node); | ||
} | ||
} | ||
} |
15 changes: 15 additions & 0 deletions
15
jdbc-h2/src/main/java/io/kestra/runner/h2/H2MultipleConditionStorage.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package io.kestra.runner.h2; | ||
|
||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow; | ||
import io.kestra.jdbc.runner.AbstractJdbcMultipleConditionStorage; | ||
import io.kestra.repository.h2.H2Repository; | ||
import io.micronaut.context.ApplicationContext; | ||
import jakarta.inject.Singleton; | ||
|
||
@Singleton | ||
@H2QueueEnabled | ||
public class H2MultipleConditionStorage extends AbstractJdbcMultipleConditionStorage { | ||
public H2MultipleConditionStorage(ApplicationContext applicationContext) { | ||
super(new H2Repository<>(MultipleConditionWindow.class, applicationContext)); | ||
} | ||
} |
Oops, something went wrong.