Skip to content

Commit

Permalink
feat(jdbc): introduce h2 in memory runner
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 17, 2022
1 parent 6980899 commit 06337f5
Show file tree
Hide file tree
Showing 63 changed files with 1,326 additions and 164 deletions.
1 change: 1 addition & 0 deletions cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
implementation project(":indexer-kafka-elasticsearch")

implementation project(":jdbc")
implementation project(":jdbc-h2")
implementation project(":jdbc-mysql")
implementation project(":jdbc-postgres")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ public class MultipleConditionWindow {

String conditionId;

@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm[:ss][.SSS]XXX")
ZonedDateTime start;


@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm[:ss][.SSS]XXX")
ZonedDateTime end;

Map<String, Boolean> results;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,14 @@ protected void dailyGroupByFlowStatistics() throws InterruptedException {
DailyExecutionStatistics full = result.get("io.kestra.unittest").get(FLOW).get(10);
DailyExecutionStatistics second = result.get("io.kestra.unittest").get("second").get(10);

assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(full.getDuration().getAvg().toMillis(), greaterThan(0L));
assertThat(full.getExecutionCounts().size(), is(9));
assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L));
assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L));
assertThat(full.getExecutionCounts().get(State.Type.SUCCESS), is(7L));
assertThat(full.getExecutionCounts().get(State.Type.CREATED), is(0L));

assertThat(second.getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(second.getDuration().getAvg().toMillis(), greaterThan(0L));
assertThat(second.getExecutionCounts().size(), is(9));
assertThat(second.getExecutionCounts().get(State.Type.SUCCESS), is(13L));
assertThat(second.getExecutionCounts().get(State.Type.CREATED), is(0L));
Expand All @@ -193,7 +193,7 @@ protected void dailyGroupByFlowStatistics() throws InterruptedException {
assertThat(result.size(), is(1));
assertThat(result.get("io.kestra.unittest").size(), is(1));
full = result.get("io.kestra.unittest").get("*").get(10);
assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(full.getDuration().getAvg().toMillis(), greaterThan(0L));
assertThat(full.getExecutionCounts().size(), is(9));
assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L));
assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L));
Expand Down Expand Up @@ -224,7 +224,7 @@ protected void dailyStatistics() throws InterruptedException {

assertThat(result.size(), is(11));
assertThat(result.get(10).getExecutionCounts().size(), is(9));
assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(result.get(10).getDuration().getAvg().toMillis(), greaterThan(0L));

assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L));
assertThat(result.get(10).getExecutionCounts().get(State.Type.RUNNING), is(5L));
Expand All @@ -251,7 +251,7 @@ protected void taskRunsDailyStatistics() {

assertThat(result.size(), is(11));
assertThat(result.get(10).getExecutionCounts().size(), is(9));
assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L));
assertThat(result.get(10).getDuration().getAvg().toMillis(), greaterThan(0L));

assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L * 2));
assertThat(result.get(10).getExecutionCounts().get(State.Type.RUNNING), is(5L * 2));
Expand Down
15 changes: 15 additions & 0 deletions jdbc-h2/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
publishSonatypePublicationPublicationToSonatypeRepository.enabled = false

dependencies {
implementation project(":core")
implementation project(":jdbc")

implementation("io.micronaut.sql:micronaut-jooq")
runtimeOnly("com.h2database:h2:2.1.212")

testImplementation project(':core').sourceSets.test.output
testImplementation project(':jdbc').sourceSets.test.output
testImplementation project(':runner-memory')
testImplementation project(':storage-local')
testImplementation 'org.mockito:mockito-junit-jupiter:4.5.1'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.kestra.repository.h2;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.jdbc.repository.AbstractExecutionRepository;
import io.kestra.jdbc.runner.AbstractExecutorStateStorage;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Condition;

import java.util.List;

@Singleton
@H2RepositoryEnabled
public class H2ExecutionRepository extends AbstractExecutionRepository implements ExecutionRepositoryInterface {
@Inject
public H2ExecutionRepository(ApplicationContext applicationContext, AbstractExecutorStateStorage executorStateStorage) {
super(new H2Repository<>(Execution.class, applicationContext), executorStateStorage);
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.kestra.repository.h2;

import io.kestra.core.models.flows.Flow;
import io.kestra.jdbc.repository.AbstractFlowRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Condition;

import java.util.List;

@Singleton
@H2RepositoryEnabled
public class H2FlowRepository extends AbstractFlowRepository {
@Inject
public H2FlowRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(Flow.class, applicationContext), applicationContext);
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query);
}

@Override
protected Condition findSourceCodeCondition(String query) {
return this.jdbcRepository.fullTextCondition(List.of("source_code"), query);
}
}
25 changes: 25 additions & 0 deletions jdbc-h2/src/main/java/io/kestra/repository/h2/H2LogRepository.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.kestra.repository.h2;

import io.kestra.core.models.executions.LogEntry;
import io.kestra.jdbc.repository.AbstractLogRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Condition;

import java.util.List;

@Singleton
@H2RepositoryEnabled
public class H2LogRepository extends AbstractLogRepository {
@Inject
public H2LogRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(LogEntry.class, applicationContext));
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query);
}
}

81 changes: 81 additions & 0 deletions jdbc-h2/src/main/java/io/kestra/repository/h2/H2Repository.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.kestra.repository.h2;

import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.jdbc.AbstractJdbcRepository;
import io.kestra.jdbc.repository.AbstractRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.data.model.Pageable;
import lombok.SneakyThrows;
import org.jooq.*;
import org.jooq.impl.DSL;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

public class H2Repository<T> extends AbstractJdbcRepository<T> {
public H2Repository(Class<T> cls, ApplicationContext applicationContext) {
super(cls, applicationContext);
}

@SneakyThrows
public void persist(T entity, DSLContext context, @Nullable Map<Field<Object>, Object> fields) {
Map<Field<Object>, Object> finalFields = fields == null ? this.persistFields(entity) : fields;

context
.insertInto(table)
.set(AbstractRepository.field("key"), key(entity))
.set(finalFields)
.onConflict(AbstractRepository.field("key"))
.doUpdate()
.set(finalFields)
.execute();
}

public Condition fullTextCondition(List<String> fields, String query) {
if (query == null || query.equals("*")) {
return DSL.trueCondition();
}

if (fields.size() > 1) {
throw new IllegalStateException("Too many fields for h2 '" + fields + "'");
}

Field<Object> field = AbstractRepository.field(fields.get(0));

List<LikeEscapeStep> match = Arrays
.stream(query.split("\\p{P}|\\p{S}|\\p{Z}"))
.map(s -> field.likeIgnoreCase("%" + s.toUpperCase(Locale.ROOT) + "%"))
.collect(Collectors.toList());

if (match.size() == 0) {
return DSL.falseCondition();
}

return DSL.and(match);
}

@SuppressWarnings("unchecked")
public <R extends Record, E> ArrayListTotal<E> fetchPage(DSLContext context, SelectConditionStep<R> select, Pageable pageable, RecordMapper<R, E> mapper) {
Result<Record> results = this.limit(
context.select(DSL.asterisk(), DSL.count().over().as("total_count"))
.from(this
.sort(select, pageable)
.asTable("page")
)
.where(DSL.trueCondition()),
pageable
)
.fetch();

Integer totalCount = results.size() > 0 ? results.get(0).get("total_count", Integer.class) : 0;

List<E> map = results
.map((Record record) -> mapper.map((R) record));

return new ArrayListTotal<>(map, totalCount);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.kestra.repository.h2;

import io.micronaut.context.annotation.DefaultImplementation;
import io.micronaut.context.annotation.Requires;

import java.lang.annotation.*;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "kestra.repository.type", value = "h2")
@DefaultImplementation
public @interface H2RepositoryEnabled {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.kestra.repository.h2;

import io.kestra.core.models.templates.Template;
import io.kestra.core.repositories.TemplateRepositoryInterface;
import io.kestra.jdbc.repository.AbstractTemplateRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jooq.Condition;

import java.util.List;

@Singleton
@H2RepositoryEnabled
public class H2TemplateRepository extends AbstractTemplateRepository implements TemplateRepositoryInterface {
@Inject
public H2TemplateRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(Template.class, applicationContext), applicationContext);
}

@Override
protected Condition findCondition(String query) {
return this.jdbcRepository.fullTextCondition(List.of("fulltext"), query);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.kestra.repository.h2;

import io.kestra.core.models.triggers.Trigger;
import io.kestra.jdbc.repository.AbstractTriggerRepository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

@Singleton
@H2RepositoryEnabled
public class H2TriggerRepository extends AbstractTriggerRepository {
@Inject
public H2TriggerRepository(ApplicationContext applicationContext) {
super(new H2Repository<>(Trigger.class, applicationContext));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.runner.h2;

import io.kestra.jdbc.runner.AbstractExecutorStateStorage;
import io.kestra.jdbc.runner.JdbcExecutorState;
import io.kestra.repository.h2.H2Repository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Singleton;

@Singleton
@H2QueueEnabled
public class H2ExecutorStateStorage extends AbstractExecutorStateStorage {
public H2ExecutorStateStorage(ApplicationContext applicationContext) {
super(new H2Repository<>(JdbcExecutorState.class, applicationContext));
}
}
63 changes: 63 additions & 0 deletions jdbc-h2/src/main/java/io/kestra/runner/h2/H2Functions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.kestra.runner.h2;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import io.kestra.core.serializers.JacksonMapper;
import lombok.SneakyThrows;
import net.thisptr.jackson.jq.BuiltinFunctionLoader;
import net.thisptr.jackson.jq.JsonQuery;
import net.thisptr.jackson.jq.Scope;
import net.thisptr.jackson.jq.Versions;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public class H2Functions {
private static final Scope rootScope;
private static final Scope scope;

static {
rootScope = Scope.newEmptyScope();
BuiltinFunctionLoader.getInstance().loadFunctions(Versions.JQ_1_6, rootScope);
scope = Scope.newEmptyScope();
}

public static Boolean jqBoolean(String value, String expression) {
return H2Functions.jq(value, expression, JsonNode::asBoolean);
}

public static String jqString(String value, String expression) {
return H2Functions.jq(value, expression, JsonNode::asText);
}

public static Long jqLong(String value, String expression) {
return H2Functions.jq(value, expression, JsonNode::asLong);
}

public static Integer jqInteger(String value, String expression) {
return H2Functions.jq(value, expression, JsonNode::asInt);
}

public static Double jqDouble(String value, String expression) {
return H2Functions.jq(value, expression, JsonNode::asDouble);
}

@SneakyThrows
private static <T> T jq(String value, String expression, Function<JsonNode, T> function) {
JsonQuery q = JsonQuery.compile(expression, Versions.JQ_1_6);

final List<JsonNode> out = new ArrayList<>();
JsonNode in = JacksonMapper.ofJson().readTree(value);

q.apply(scope, in, out::add);

JsonNode node = out.get(0);

if (node instanceof NullNode) {
return null;
} else {
return function.apply(node);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.runner.h2;

import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.jdbc.runner.AbstractJdbcMultipleConditionStorage;
import io.kestra.repository.h2.H2Repository;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Singleton;

@Singleton
@H2QueueEnabled
public class H2MultipleConditionStorage extends AbstractJdbcMultipleConditionStorage {
public H2MultipleConditionStorage(ApplicationContext applicationContext) {
super(new H2Repository<>(MultipleConditionWindow.class, applicationContext));
}
}
Loading

0 comments on commit 06337f5

Please sign in to comment.