From 7e16b718be3403e38408052f75102f5fec34154c Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Sat, 21 May 2022 21:21:16 +0200 Subject: [PATCH] feat(jdbc): implementation of trigger repository --- cli/src/main/resources/application.yml | 14 +- .../TriggerRepositoryInterface.java | 3 + .../AbstractExecutionRepositoryTest.java | 284 ++++++++++++++ .../AbstractFlowRepositoryTest.java | 14 +- .../AbstractTemplateRepositoryTest.java | 14 +- .../AbstractTriggerRepositoryTest.java | 62 +++ .../core/repositories}/ExecutionFixture.java | 2 +- docker-compose-ci.yml | 2 + jdbc-mysql/build.gradle | 1 + .../mysql/MysqlExecutionRepository.java | 17 + .../repository/mysql/MysqlFlowRepository.java | 58 +-- .../repository/mysql/MysqlRepository.java | 18 +- .../mysql/MysqlTemplateRepository.java | 22 -- .../mysql/MysqlTriggerRepository.java | 15 + .../migrations/mysql/V1__initial.sql | 70 +++- .../mysql/MysqlExecutionRepositoryTest.java | 16 + .../mysql/MysqlTriggerRepositoryTest.java | 8 + jdbc-mysql/src/test/resources/application.yml | 15 +- .../org.mockito.plugins.MockMaker | 1 + jdbc-postgres/build.gradle | 1 + .../postgres/PostgresExecutionRepository.java | 17 + .../postgres/PostgresFlowRepository.java | 57 +-- .../postgres/PostgresRepository.java | 24 +- .../postgres/PostgresTemplateRepository.java | 23 -- .../postgres/PostgresTriggerRepository.java | 17 + .../migrations/postgres/V1__initial.sql | 132 +++++-- .../PostgresExecutionRepositoryTest.java | 16 + .../PostgresTriggerRepositoryTest.java | 7 + .../src/test/resources/application.yml | 15 +- .../org.mockito.plugins.MockMaker | 1 + .../kestra/jdbc/AbstractJdbcRepository.java | 25 +- .../AbstractExecutionRepository.java | 370 ++++++++++++++++++ .../repository/AbstractFlowRepository.java | 142 ++++--- .../AbstractTemplateRepository.java | 97 +++-- .../repository/AbstractTriggerRepository.java | 66 ++++ .../java/io/kestra/jdbc/runner/JdbcQueue.java | 25 +- .../AbstractJdbcExecutionRepositoryTest.java | 19 + .../AbstractJdbcTriggerRepositoryTest.java | 16 + .../ElasticsearchTriggerRepository.java | 3 +- .../ElasticSearchExecutionRepositoryTest.java | 279 +------------ .../ElasticSearchTriggerRepositoryTest.java | 71 +--- .../memory/MemoryTriggerRepository.java | 14 +- 42 files changed, 1432 insertions(+), 641 deletions(-) create mode 100644 core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java create mode 100644 core/src/test/java/io/kestra/core/repositories/AbstractTriggerRepositoryTest.java rename {repository-elasticsearch/src/test/java/io/kestra/repository/elasticsearch => core/src/test/java/io/kestra/core/repositories}/ExecutionFixture.java (98%) create mode 100644 jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlExecutionRepository.java create mode 100644 jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlTriggerRepository.java create mode 100644 jdbc-mysql/src/test/java/io/kestra/repository/mysql/MysqlExecutionRepositoryTest.java create mode 100644 jdbc-mysql/src/test/java/io/kestra/repository/mysql/MysqlTriggerRepositoryTest.java create mode 100644 jdbc-mysql/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker create mode 100644 jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresExecutionRepository.java create mode 100644 jdbc-postgres/src/main/java/io/kestra/repository/postgres/PostgresTriggerRepository.java create mode 100644 jdbc-postgres/src/test/java/io/kestra/repository/postgres/PostgresExecutionRepositoryTest.java create mode 100644 jdbc-postgres/src/test/java/io/kestra/repository/postgres/PostgresTriggerRepositoryTest.java create mode 100644 jdbc-postgres/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker create mode 100644 jdbc/src/main/java/io/kestra/jdbc/repository/AbstractExecutionRepository.java create mode 100644 jdbc/src/main/java/io/kestra/jdbc/repository/AbstractTriggerRepository.java create mode 100644 jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcExecutionRepositoryTest.java create mode 100644 jdbc/src/test/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepositoryTest.java diff --git a/cli/src/main/resources/application.yml b/cli/src/main/resources/application.yml index 96654e533b..17fd2867c6 100644 --- a/cli/src/main/resources/application.yml +++ b/cli/src/main/resources/application.yml @@ -206,25 +206,23 @@ kestra: delete.retention.ms: 86400000 jdbc: - defaults: - table-prefix: "kestra_" tables: queues: - table: "${kestra.jdbc.defaults.table-prefix}queues" + table: "queues" flows: - table: "${kestra.jdbc.defaults.table-prefix}flows" + table: "flows" cls: io.kestra.core.models.flows.Flow executions: - table: "${kestra.jdbc.defaults.table-prefix}executions" + table: "executions" cls: io.kestra.core.models.executions.Execution templates: - table: "${kestra.jdbc.defaults.table-prefix}templates" + table: "templates" cls: io.kestra.core.models.templates.Template triggers: - table: "${kestra.jdbc.defaults.table-prefix}triggers" + table: "triggers" cls: io.kestra.core.models.triggers.Trigger logs: - table: "${kestra.jdbc.defaults.table-prefix}logs" + table: "logs" cls: io.kestra.core.models.executions.LogEntry elasticsearch: diff --git a/core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java b/core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java index 5d65292e9d..b52ab9b9f1 100644 --- a/core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java +++ b/core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java @@ -10,4 +10,7 @@ public interface TriggerRepositoryInterface { Optional findLast(TriggerContext trigger); List findAll(); + + Trigger save(Trigger trigger); } + diff --git a/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java b/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java new file mode 100644 index 0000000000..4d2e6aedae --- /dev/null +++ b/core/src/test/java/io/kestra/core/repositories/AbstractExecutionRepositoryTest.java @@ -0,0 +1,284 @@ +package io.kestra.core.repositories; + +import com.devskiller.friendly_id.FriendlyId; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.executions.statistics.DailyExecutionStatistics; +import io.kestra.core.models.executions.statistics.ExecutionCount; +import io.kestra.core.models.executions.statistics.Flow; +import io.kestra.core.models.flows.State; +import io.kestra.core.models.tasks.ResolvedTask; +import io.kestra.core.tasks.debugs.Return; +import io.micronaut.data.model.Pageable; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.*; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +@MicronautTest(transactional = false) +public abstract class AbstractExecutionRepositoryTest { + public static final String NAMESPACE = "io.kestra.unittest"; + public static final String FLOW = "full"; + + @Inject + protected ExecutionRepositoryInterface executionRepository; + + public static Execution.ExecutionBuilder builder(State.Type state, String flowId) { + State finalState = randomDuration(state); + + Execution.ExecutionBuilder execution = Execution.builder() + .id(FriendlyId.createFriendlyId()) + .namespace(NAMESPACE) + .flowId(flowId == null ? FLOW : flowId) + .flowRevision(1) + .state(finalState); + + + List taskRuns = Arrays.asList( + TaskRun.of(execution.build(), ResolvedTask.of( + Return.builder().id("first").type(Return.class.getName()).format("test").build()) + ) + .withState(State.Type.SUCCESS), + spyTaskRun(TaskRun.of(execution.build(), ResolvedTask.of( + Return.builder().id("second").type(Return.class.getName()).format("test").build()) + ) + .withState(state), + state + ), + TaskRun.of(execution.build(), ResolvedTask.of( + Return.builder().id("third").type(Return.class.getName()).format("test").build())).withState(state) + ); + + if (flowId == null) { + return execution.taskRunList(List.of(taskRuns.get(0), taskRuns.get(1), taskRuns.get(2))); + } + + return execution.taskRunList(List.of(taskRuns.get(0), taskRuns.get(1))); + } + + + static TaskRun spyTaskRun(TaskRun taskRun, State.Type state) { + TaskRun spy = spy(taskRun); + + doReturn(randomDuration(state)) + .when(spy) + .getState(); + + return spy; + } + + static State randomDuration(State.Type state) { + State finalState = new State(); + + finalState = spy(finalState + .withState(state != null ? state : State.Type.SUCCESS) + ); + + Random rand = new Random(); + doReturn(Duration.ofSeconds(rand.nextInt(150))) + .when(finalState) + .getDuration(); + + return finalState; + } + + + protected void inject() { + for (int i = 0; i < 28; i++) { + executionRepository.save(builder( + i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS), + i < 15 ? null : "second" + ).build()); + } + } + + @Test + protected void find() { + inject(); + + ArrayListTotal executions = executionRepository.find("*", Pageable.from(1, 10), null); + assertThat(executions.getTotal(), is(28L)); + assertThat(executions.size(), is(10)); + } + + @Test + protected void findTaskRun() { + inject(); + + ArrayListTotal executions = executionRepository.findTaskRun("*", Pageable.from(1, 10), null); + assertThat(executions.getTotal(), is(71L)); + assertThat(executions.size(), is(10)); + } + + + @Test + protected void findById() { + executionRepository.save(ExecutionFixture.EXECUTION_1); + + Optional full = executionRepository.findById(ExecutionFixture.EXECUTION_1.getId()); + assertThat(full.isPresent(), is(true)); + + full.ifPresent(current -> { + assertThat(full.get().getId(), is(ExecutionFixture.EXECUTION_1.getId())); + }); + } + + @Test + protected void mappingConflict() { + executionRepository.save(ExecutionFixture.EXECUTION_2); + executionRepository.save(ExecutionFixture.EXECUTION_1); + + ArrayListTotal page1 = executionRepository.findByFlowId(NAMESPACE, FLOW, Pageable.from(1, 10)); + + assertThat(page1.size(), is(2)); + } + + @Test + protected void dailyGroupByFlowStatistics() throws InterruptedException { + for (int i = 0; i < 28; i++) { + executionRepository.save(builder( + i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS), + i < 15 ? null : "second" + ).build()); + } + + // mysql need some time ... + Thread.sleep(500); + + Map>> result = executionRepository.dailyGroupByFlowStatistics( + "*", + ZonedDateTime.now().minusDays(10), + ZonedDateTime.now(), + false + ); + + assertThat(result.size(), is(1)); + assertThat(result.get("io.kestra.unittest").size(), is(2)); + + DailyExecutionStatistics full = result.get("io.kestra.unittest").get(FLOW).get(10); + DailyExecutionStatistics second = result.get("io.kestra.unittest").get("second").get(10); + + assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L)); + assertThat(full.getExecutionCounts().size(), is(9)); + assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L)); + assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L)); + assertThat(full.getExecutionCounts().get(State.Type.SUCCESS), is(7L)); + assertThat(full.getExecutionCounts().get(State.Type.CREATED), is(0L)); + + assertThat(second.getDuration().getAvg().getSeconds(), greaterThan(0L)); + assertThat(second.getExecutionCounts().size(), is(9)); + assertThat(second.getExecutionCounts().get(State.Type.SUCCESS), is(13L)); + assertThat(second.getExecutionCounts().get(State.Type.CREATED), is(0L)); + + result = executionRepository.dailyGroupByFlowStatistics( + "*", + ZonedDateTime.now().minusDays(10), + ZonedDateTime.now(), + true + ); + + assertThat(result.size(), is(1)); + assertThat(result.get("io.kestra.unittest").size(), is(1)); + full = result.get("io.kestra.unittest").get("*").get(10); + assertThat(full.getDuration().getAvg().getSeconds(), greaterThan(0L)); + assertThat(full.getExecutionCounts().size(), is(9)); + assertThat(full.getExecutionCounts().get(State.Type.FAILED), is(3L)); + assertThat(full.getExecutionCounts().get(State.Type.RUNNING), is(5L)); + assertThat(full.getExecutionCounts().get(State.Type.SUCCESS), is(20L)); + assertThat(full.getExecutionCounts().get(State.Type.CREATED), is(0L)); + } + + @Test + protected void dailyStatistics() throws InterruptedException { + for (int i = 0; i < 28; i++) { + executionRepository.save(builder( + i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS), + i < 15 ? null : "second" + ).build()); + } + + // mysql need some time ... + Thread.sleep(500); + + List result = executionRepository.dailyStatistics( + "*", + ZonedDateTime.now().minusDays(10), + ZonedDateTime.now(), + false + ); + + assertThat(result.size(), is(11)); + assertThat(result.get(10).getExecutionCounts().size(), is(9)); + assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L)); + + assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L)); + assertThat(result.get(10).getExecutionCounts().get(State.Type.RUNNING), is(5L)); + assertThat(result.get(10).getExecutionCounts().get(State.Type.SUCCESS), is(20L)); + } + + @Test + protected void taskRunsDailyStatistics() { + for (int i = 0; i < 28; i++) { + executionRepository.save(builder( + i < 5 ? State.Type.RUNNING : (i < 8 ? State.Type.FAILED : State.Type.SUCCESS), + i < 15 ? null : "second" + ).build()); + } + + List result = executionRepository.dailyStatistics( + "*", + ZonedDateTime.now().minusDays(10), + ZonedDateTime.now(), + true + ); + + assertThat(result.size(), is(11)); + assertThat(result.get(10).getExecutionCounts().size(), is(9)); + assertThat(result.get(10).getDuration().getAvg().getSeconds(), greaterThan(0L)); + + assertThat(result.get(10).getExecutionCounts().get(State.Type.FAILED), is(3L * 2)); + assertThat(result.get(10).getExecutionCounts().get(State.Type.RUNNING), is(5L * 2)); + assertThat(result.get(10).getExecutionCounts().get(State.Type.SUCCESS), is(55L)); + } + + @SuppressWarnings("OptionalGetWithoutIsPresent") + @Test + protected void executionsCount() throws InterruptedException { + for (int i = 0; i < 28; i++) { + executionRepository.save(builder( + State.Type.SUCCESS, + i < 4 ? "first" : (i < 10 ? "second" : "third") + ).build()); + } + + // mysql need some time ... + Thread.sleep(500); + + List result = executionRepository.executionCounts( + List.of( + new io.kestra.core.models.executions.statistics.Flow(NAMESPACE, "first"), + new io.kestra.core.models.executions.statistics.Flow(NAMESPACE, "second"), + new io.kestra.core.models.executions.statistics.Flow(NAMESPACE, "third"), + new Flow(NAMESPACE, "missing") + ), + "*", + ZonedDateTime.now().minusDays(10), + ZonedDateTime.now() + ); + + assertThat(result.size(), is(4)); + assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("first")).findFirst().get().getCount(), is(4L)); + assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("second")).findFirst().get().getCount(), is(6L)); + assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("third")).findFirst().get().getCount(), is(18L)); + assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("missing")).findFirst().get().getCount(), is(0L)); + } +} diff --git a/core/src/test/java/io/kestra/core/repositories/AbstractFlowRepositoryTest.java b/core/src/test/java/io/kestra/core/repositories/AbstractFlowRepositoryTest.java index 01b52ff290..30c499415f 100644 --- a/core/src/test/java/io/kestra/core/repositories/AbstractFlowRepositoryTest.java +++ b/core/src/test/java/io/kestra/core/repositories/AbstractFlowRepositoryTest.java @@ -2,10 +2,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableList; -import io.micronaut.context.event.ApplicationEventListener; -import io.micronaut.test.extensions.junit5.annotation.MicronautTest; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import io.kestra.core.Helpers; import io.kestra.core.events.CrudEvent; import io.kestra.core.events.CrudEventType; @@ -20,11 +16,14 @@ import io.kestra.core.tasks.scripts.Bash; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; - +import io.micronaut.context.event.ApplicationEventListener; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; import jakarta.inject.Inject; import jakarta.inject.Named; import jakarta.inject.Singleton; -import javax.validation.ConstraintViolationException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; @@ -33,13 +32,14 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import javax.validation.ConstraintViolationException; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertThrows; -@MicronautTest +@MicronautTest(transactional = false) public abstract class AbstractFlowRepositoryTest { @Inject protected FlowRepositoryInterface flowRepository; diff --git a/core/src/test/java/io/kestra/core/repositories/AbstractTemplateRepositoryTest.java b/core/src/test/java/io/kestra/core/repositories/AbstractTemplateRepositoryTest.java index 597a1a3eae..31d5534fa1 100644 --- a/core/src/test/java/io/kestra/core/repositories/AbstractTemplateRepositoryTest.java +++ b/core/src/test/java/io/kestra/core/repositories/AbstractTemplateRepositoryTest.java @@ -1,14 +1,16 @@ package io.kestra.core.repositories; -import io.micronaut.context.event.ApplicationEventListener; -import io.micronaut.test.extensions.junit5.annotation.MicronautTest; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import io.kestra.core.events.CrudEvent; import io.kestra.core.events.CrudEventType; import io.kestra.core.models.templates.Template; import io.kestra.core.tasks.debugs.Return; import io.kestra.core.utils.IdUtils; +import io.micronaut.context.event.ApplicationEventListener; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.net.URISyntaxException; @@ -16,13 +18,11 @@ import java.util.Collections; import java.util.List; import java.util.Optional; -import jakarta.inject.Inject; -import jakarta.inject.Singleton; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; -@MicronautTest +@MicronautTest(transactional = false) public abstract class AbstractTemplateRepositoryTest { @Inject protected TemplateRepositoryInterface templateRepository; diff --git a/core/src/test/java/io/kestra/core/repositories/AbstractTriggerRepositoryTest.java b/core/src/test/java/io/kestra/core/repositories/AbstractTriggerRepositoryTest.java new file mode 100644 index 0000000000..bd4338e30f --- /dev/null +++ b/core/src/test/java/io/kestra/core/repositories/AbstractTriggerRepositoryTest.java @@ -0,0 +1,62 @@ +package io.kestra.core.repositories; + +import io.kestra.core.models.triggers.Trigger; +import io.kestra.core.utils.IdUtils; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Optional; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +@MicronautTest(transactional = false) +public abstract class AbstractTriggerRepositoryTest { + @Inject + protected TriggerRepositoryInterface triggerRepository; + + private static Trigger.TriggerBuilder trigger() { + return Trigger.builder() + .flowId(IdUtils.create()) + .namespace("io.kestra.unittest") + .flowRevision(1) + .triggerId(IdUtils.create()) + .executionId(IdUtils.create()) + .date(ZonedDateTime.now()); + } + + @Test + void all() { + Trigger.TriggerBuilder builder = trigger(); + + Optional find = triggerRepository.findLast(builder.build()); + assertThat(find.isPresent(), is(false)); + + + Trigger save = triggerRepository.save(builder.build()); + + find = triggerRepository.findLast(save); + + assertThat(find.isPresent(), is(true)); + assertThat(find.get().getExecutionId(), is(save.getExecutionId())); + + save = triggerRepository.save(builder.executionId(IdUtils.create()).build()); + + find = triggerRepository.findLast(save); + + assertThat(find.isPresent(), is(true)); + assertThat(find.get().getExecutionId(), is(save.getExecutionId())); + + + triggerRepository.save(trigger().build()); + triggerRepository.save(trigger().build()); + triggerRepository.save(trigger().build()); + + List all = triggerRepository.findAll(); + + assertThat(all.size(), is(4)); + } +} diff --git a/repository-elasticsearch/src/test/java/io/kestra/repository/elasticsearch/ExecutionFixture.java b/core/src/test/java/io/kestra/core/repositories/ExecutionFixture.java similarity index 98% rename from repository-elasticsearch/src/test/java/io/kestra/repository/elasticsearch/ExecutionFixture.java rename to core/src/test/java/io/kestra/core/repositories/ExecutionFixture.java index a5a284a5f3..d1796988c9 100644 --- a/repository-elasticsearch/src/test/java/io/kestra/repository/elasticsearch/ExecutionFixture.java +++ b/core/src/test/java/io/kestra/core/repositories/ExecutionFixture.java @@ -1,4 +1,4 @@ -package io.kestra.repository.elasticsearch; +package io.kestra.core.repositories; import com.google.common.collect.ImmutableMap; import io.kestra.core.models.executions.Execution; diff --git a/docker-compose-ci.yml b/docker-compose-ci.yml index 3fb7ae806c..53953d0597 100644 --- a/docker-compose-ci.yml +++ b/docker-compose-ci.yml @@ -39,6 +39,8 @@ services: MYSQL_USER: kestra MYSQL_PASSWORD: k3str4 MYSQL_ROOT_PASSWORD: "p4ssw0rd" + command: + - --log-bin-trust-function-creators=1 ports: - 3306:3306 diff --git a/jdbc-mysql/build.gradle b/jdbc-mysql/build.gradle index 1aee8d2ed0..b11025ba19 100644 --- a/jdbc-mysql/build.gradle +++ b/jdbc-mysql/build.gradle @@ -11,4 +11,5 @@ dependencies { testImplementation project(':core').sourceSets.test.output testImplementation project(':jdbc').sourceSets.test.output testImplementation project(':runner-memory') + testImplementation 'org.mockito:mockito-junit-jupiter:4.5.1' } diff --git a/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlExecutionRepository.java b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlExecutionRepository.java new file mode 100644 index 0000000000..006a61d9a7 --- /dev/null +++ b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlExecutionRepository.java @@ -0,0 +1,17 @@ +package io.kestra.repository.mysql; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.repositories.ExecutionRepositoryInterface; +import io.kestra.jdbc.repository.AbstractExecutionRepository; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +@Singleton +@MysqlRepositoryEnabled +public class MysqlExecutionRepository extends AbstractExecutionRepository implements ExecutionRepositoryInterface { + @Inject + public MysqlExecutionRepository(ApplicationContext applicationContext) { + super(new MysqlRepository<>(Execution.class, applicationContext), applicationContext); + } +} diff --git a/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlFlowRepository.java b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlFlowRepository.java index dfb8167ae1..21ecf48bde 100644 --- a/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlFlowRepository.java +++ b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlFlowRepository.java @@ -8,10 +8,7 @@ import io.micronaut.data.model.Pageable; import jakarta.inject.Inject; import jakarta.inject.Singleton; -import org.jooq.Field; -import org.jooq.Record; -import org.jooq.Record1; -import org.jooq.SelectConditionStep; +import org.jooq.*; import org.jooq.impl.DSL; import java.util.ArrayList; @@ -28,15 +25,14 @@ public MysqlFlowRepository(ApplicationContext applicationContext) { } @SuppressWarnings("unchecked") - private SelectConditionStep fullTextSelect(List> field) { + private SelectConditionStep fullTextSelect(DSLContext context, List> field) { ArrayList> fields = new ArrayList<>(Collections.singletonList(DSL.field("value"))); if (field != null) { fields.addAll(field); } - return (SelectConditionStep) this.jdbcRepository - .getDslContext() + return (SelectConditionStep) context .select(fields) .hint("SQL_CALC_FOUND_ROWS") .from(lastRevision(false)) @@ -49,30 +45,44 @@ private SelectConditionStep fullTextSelect(List find(String query, Pageable pageable) { - SelectConditionStep> select = this.fullTextSelect(Collections.emptyList()); + return this.jdbcRepository + .getDslContext() + .transactionResult(configuration -> { + DSLContext context = DSL.using(configuration); - if (query != null) { - select.and(this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "id"), query)); - } + SelectConditionStep> select = this.fullTextSelect(context, Collections.emptyList()); + + if (query != null) { + select.and(this.jdbcRepository.fullTextCondition(Arrays.asList("namespace", "id"), query)); + } + + return this.jdbcRepository.fetchPage(context, select, pageable); + }); - return this.jdbcRepository.fetchPage(select, pageable); } @Override public ArrayListTotal> findSourceCode(String query, Pageable pageable) { - SelectConditionStep select = this.fullTextSelect(Collections.singletonList(DSL.field("source_code"))); + return this.jdbcRepository + .getDslContext() + .transactionResult(configuration -> { + DSLContext context = DSL.using(configuration); - if (query != null) { - select.and(this.jdbcRepository.fullTextCondition(Collections.singletonList("source_code"), query)); - } + SelectConditionStep select = this.fullTextSelect(context, Collections.singletonList(DSL.field("source_code"))); - return this.jdbcRepository.fetchPage( - select, - pageable, - record -> new SearchResult<>( - this.jdbcRepository.map(record), - this.jdbcRepository.fragments(query, record.getValue("source_code", String.class)) - ) - ); + if (query != null) { + select.and(this.jdbcRepository.fullTextCondition(Collections.singletonList("source_code"), query)); + } + + return this.jdbcRepository.fetchPage( + context, + select, + pageable, + record -> new SearchResult<>( + this.jdbcRepository.map(record), + this.jdbcRepository.fragments(query, record.getValue("source_code", String.class)) + ) + ); + }); } } diff --git a/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlRepository.java b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlRepository.java index 0185d0bb3a..45da9cabdf 100644 --- a/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlRepository.java +++ b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlRepository.java @@ -5,22 +5,23 @@ import io.kestra.jdbc.AbstractJdbcRepository; import io.micronaut.context.ApplicationContext; import io.micronaut.data.model.Pageable; -import org.jooq.Condition; -import org.jooq.Record; -import org.jooq.RecordMapper; -import org.jooq.SelectConditionStep; +import org.jooq.*; import org.jooq.impl.DSL; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -public class MysqlRepository extends AbstractJdbcRepository { +public class MysqlRepository extends AbstractJdbcRepository { public MysqlRepository(Class cls, ApplicationContext applicationContext) { super(cls, applicationContext); } public Condition fullTextCondition(List fields, String query) { + if (query == null || query.equals("*")) { + return DSL.trueCondition(); + } + String match = Arrays .stream(query.split("\\p{IsPunct}")) .filter(s -> s.length() >= 3) @@ -34,11 +35,14 @@ public Condition fullTextCondition(List fields, String query) { return DSL.condition("MATCH (" + String.join(", ", fields) + ") AGAINST (? IN BOOLEAN MODE)", match); } - public ArrayListTotal fetchPage(SelectConditionStep select, Pageable pageable, RecordMapper mapper) { + public ArrayListTotal fetchPage(DSLContext context, SelectConditionStep select, Pageable pageable, RecordMapper mapper) { List map = this.pageable(select, pageable) .fetch() .map(mapper); - return new ArrayListTotal<>(map, dslContext.fetchOne("SELECT FOUND_ROWS()").into(Integer.class)); + return dslContext.transactionResult(configuration -> new ArrayListTotal<>( + map, + DSL.using(configuration).fetchOne("SELECT FOUND_ROWS()").into(Integer.class) + )); } } diff --git a/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlTemplateRepository.java b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlTemplateRepository.java index 29d471a7bd..4878105504 100644 --- a/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlTemplateRepository.java +++ b/jdbc-mysql/src/main/java/io/kestra/repository/mysql/MysqlTemplateRepository.java @@ -1,18 +1,11 @@ package io.kestra.repository.mysql; import io.kestra.core.models.templates.Template; -import io.kestra.core.repositories.ArrayListTotal; import io.kestra.core.repositories.TemplateRepositoryInterface; import io.kestra.jdbc.repository.AbstractTemplateRepository; import io.micronaut.context.ApplicationContext; -import io.micronaut.data.model.Pageable; import jakarta.inject.Inject; import jakarta.inject.Singleton; -import org.jooq.Record1; -import org.jooq.SelectConditionStep; -import org.jooq.impl.DSL; - -import java.util.Arrays; @Singleton @MysqlRepositoryEnabled @@ -21,19 +14,4 @@ public class MysqlTemplateRepository extends AbstractTemplateRepository implemen public MysqlTemplateRepository(ApplicationContext applicationContext) { super(new MysqlRepository<>(Template.class, applicationContext), applicationContext); } - - public ArrayListTotal