Skip to content

Commit

Permalink
feat(jdbc): implementation of FlowListeners
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 17, 2022
1 parent 94aaad9 commit 334f895
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.kestra.runner.memory;
package io.kestra.core.runners;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -20,8 +20,7 @@

@Singleton
@Slf4j
@MemoryQueueEnabled
public class MemoryFlowListeners implements FlowListenersInterface {
public class FlowListeners implements FlowListenersInterface {
private static final ObjectMapper MAPPER = JacksonMapper.ofJson();
private static final TypeReference<List<Flow>> TYPE_REFERENCE = new TypeReference<>(){};

Expand All @@ -32,7 +31,7 @@ public class MemoryFlowListeners implements FlowListenersInterface {
private final List<Consumer<List<Flow>>> consumers = new ArrayList<>();

@Inject
public MemoryFlowListeners(
public FlowListeners(
FlowRepositoryInterface flowRepository,
@Named(QueueFactoryInterface.FLOW_NAMED) QueueInterface<Flow> flowQueue
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.tasks.debugs.Return;
import io.kestra.core.utils.IdUtils;
import io.kestra.runner.memory.MemoryFlowListeners;

import java.util.Collections;
import java.util.concurrent.CountDownLatch;
Expand All @@ -18,7 +17,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@MicronautTest
@MicronautTest(transactional = false)
abstract public class FlowListenersTest {
@Inject
protected FlowRepositoryInterface flowRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.types.Schedule;
import io.kestra.runner.memory.MemoryFlowListeners;
import io.kestra.core.runners.FlowListeners;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

Expand All @@ -26,7 +26,7 @@

class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
protected MemoryFlowListeners flowListenersService;
protected FlowListeners flowListenersService;

@Inject
protected SchedulerTriggerStateInterface triggerState;
Expand Down Expand Up @@ -58,7 +58,7 @@ private static Flow createScheduleFlow() {
@Test
void schedule() throws Exception {
// mock flow listeners
MemoryFlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(4);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.kestra.core.schedulers;

import org.junit.jupiter.api.Test;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.types.Schedule;
import io.kestra.runner.memory.MemoryFlowListeners;
import io.kestra.core.runners.FlowListeners;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
Expand All @@ -15,16 +16,14 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

class SchedulerScheduleTest extends AbstractSchedulerTest {
@Inject
protected MemoryFlowListeners flowListenersService;
protected FlowListeners flowListenersService;

@Inject
protected SchedulerTriggerStateInterface triggerState;
Expand Down Expand Up @@ -58,7 +57,7 @@ private static ZonedDateTime date(int minus) {
@Test
void schedule() throws Exception {
// mock flow listeners
MemoryFlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(5);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.kestra.core.schedulers;

import io.kestra.core.models.flows.TaskDefault;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.runner.memory.MemoryFlowListeners;
import io.kestra.core.models.flows.TaskDefault;
import io.kestra.core.runners.FlowListeners;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;
Expand All @@ -15,16 +16,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.inject.Inject;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

public class SchedulerThreadTest extends AbstractSchedulerTest {
@Inject
protected MemoryFlowListeners flowListenersService;
protected FlowListeners flowListenersService;

@Inject
protected SchedulerTriggerStateInterface triggerState;
Expand All @@ -49,7 +48,7 @@ public static Flow createThreadFlow() {
@Test
void thread() throws Exception {
// mock flow listeners
MemoryFlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface schedulerExecutionStateSpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(2);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.kestra.runner.mysql;

import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.FlowListenersTest;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class MysqlFlowListenersTest extends FlowListenersTest {
@Inject
FlowListeners flowListenersService;

@Inject
JdbcTestUtils jdbcTestUtils;

@Test
public void all() {
this.suite(flowListenersService);
}

@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}
1 change: 1 addition & 0 deletions jdbc-postgres/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ dependencies {
testImplementation project(':core').sourceSets.test.output
testImplementation project(':jdbc').sourceSets.test.output
testImplementation project(':runner-memory')
testImplementation project(':storage-local')
testImplementation 'org.mockito:mockito-junit-jupiter:4.5.1'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.kestra.runner.postgres;

import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.FlowListenersTest;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class PostgresFlowListenersTest extends FlowListenersTest {
@Inject
FlowListeners flowListenersService;

@Inject
JdbcTestUtils jdbcTestUtils;

@Test
public void all() {
this.suite(flowListenersService);
}

@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}
4 changes: 4 additions & 0 deletions jdbc-postgres/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ kestra:
type: postgres
repository:
type: postgres
storage:
type: local
local:
base-path: /tmp/unittest

jdbc:
tables:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.kestra.runner.kafka;

import io.kestra.core.runners.FlowListeners;
import io.kestra.runner.kafka.services.*;
import io.micronaut.context.annotation.Replaces;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -28,6 +30,7 @@
@Singleton
@Slf4j
@KafkaQueueEnabled
@Replaces(FlowListeners.class)
public class KafkaFlowListeners implements FlowListenersInterface {
private final KafkaAdminService kafkaAdminService;
private final KafkaStreamService kafkaStreamService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package io.kestra.runner.memory;

import io.kestra.core.runners.FlowListeners;
import org.junit.jupiter.api.Test;
import io.kestra.core.runners.FlowListenersTest;

import jakarta.inject.Inject;

class MemoryFlowListenersTest extends FlowListenersTest {
@Inject
MemoryFlowListeners flowListenersService;
FlowListeners flowListenersService;

@Test
public void all() {
Expand Down

0 comments on commit 334f895

Please sign in to comment.