Skip to content

Commit

Permalink
feat(jdbc): adapt scheduler for jdbc
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 6, 2022
1 parent 2e90c56 commit c3465ff
Show file tree
Hide file tree
Showing 13 changed files with 171 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.repositories;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;

Expand All @@ -9,6 +10,8 @@
public interface TriggerRepositoryInterface {
Optional<Trigger> findLast(TriggerContext trigger);

Optional<Trigger> findByExecution(Execution execution);

List<Trigger> findAll();

Trigger save(Trigger trigger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

@Slf4j
@Singleton
public abstract class AbstractScheduler implements Runnable, AutoCloseable {
public abstract class AbstractScheduler implements Scheduler {
protected final ApplicationContext applicationContext;
private final QueueInterface<Execution> executionQueue;
protected final FlowListenersInterface flowListeners;
Expand Down Expand Up @@ -109,13 +109,15 @@ public void run() {

try {
handle.get();
} catch (CancellationException ignored) {

} catch (ExecutionException | InterruptedException e) {
log.error("Executor fatal exception", e);
applicationContext.close();
Runtime.getRuntime().exit(1);
}
},
"executor-listener"
"scheduler-listener"
);

thread.start();
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/io/kestra/core/schedulers/Scheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.kestra.core.schedulers;

import jakarta.inject.Singleton;

@Singleton
public interface Scheduler extends Runnable, AutoCloseable {

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@

import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand All @@ -21,7 +19,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

class SchedulerScheduleTest extends AbstractSchedulerTest {
public class SchedulerScheduleTest extends AbstractSchedulerTest {
@Inject
protected FlowListeners flowListenersService;

Expand Down Expand Up @@ -54,12 +52,22 @@ private static ZonedDateTime date(int minus) {
.truncatedTo(ChronoUnit.HOURS);
}

protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
executionStateSpy,
triggerState
);
}

@Test
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
SchedulerExecutionStateInterface executionStateSpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(5);
Set<String> date = new HashSet<>();

Flow flow = createScheduleFlow();

Expand All @@ -69,21 +77,16 @@ void schedule() throws Exception {

// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(executionRepositorySpy)
.when(executionStateSpy)
.findById(any());

// scheduler
try (AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
executionRepositorySpy,
triggerState
)) {

try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) {
// wait for execution
executionQueue.receive(SchedulerScheduleTest.class, execution -> {
executionQueue.receive(execution -> {
assertThat(execution.getInputs().get("testInputs"), is("test-inputs"));

date.add((String) execution.getTrigger().getVariables().get("date"));
queueCount.countDown();
if (execution.getState().getCurrent() == State.Type.CREATED) {
executionQueue.emit(execution.withState(State.Type.SUCCESS));
Expand All @@ -95,6 +98,7 @@ void schedule() throws Exception {
queueCount.await(1, TimeUnit.MINUTES);

assertThat(queueCount.getCount(), is(0L));
assertThat(date.size(), is(3));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ CREATE TABLE queues (
`consumers` SET(
'indexer',
'executor',
'worker'
'worker',
'scheduler'
)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

Expand Down Expand Up @@ -132,7 +133,9 @@ CREATE TABLE triggers (
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
`trigger_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.triggerId') STORED NOT NULL,
INDEX ix_executions_id (namespace, flow_id, trigger_id)
`execution_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.executionId') STORED ,
INDEX ix_namespace__flow_id__trigger_id (namespace, flow_id, trigger_id),
INDEX ix_execution_id (execution_id)
) ENGINE INNODB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.kestra.schedulers.mysql;

import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;

class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ CREATE TYPE log_level AS ENUM (
CREATE TYPE queue_consumers AS ENUM (
'indexer',
'executor',
'worker'
'worker',
'scheduler'
);

CREATE TYPE queue_type AS ENUM (
Expand Down Expand Up @@ -174,10 +175,12 @@ CREATE TABLE triggers (
value JSONB NOT NULL,
namespace VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED,
flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED,
trigger_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'triggerId') STORED
trigger_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'triggerId') STORED,
execution_id VARCHAR(150) GENERATED ALWAYS AS (value ->> 'executionId') STORED
);

CREATE INDEX triggers_namespace__flow_id__trigger_id ON triggers (namespace, flow_id, trigger_id);
CREATE INDEX triggers_execution_id ON triggers (execution_id);


/* ----------------------- logs ----------------------- */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.kestra.schedulers.postgres;

import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;

class PostgresSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy
);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.jdbc.repository;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.repositories.TriggerRepositoryInterface;
Expand Down Expand Up @@ -40,6 +41,23 @@ public Optional<Trigger> findLast(TriggerContext trigger) {
});
}

@Override
public Optional<Trigger> findByExecution(Execution execution) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
.using(configuration)
.select(DSL.field("value"))
.from(this.jdbcRepository.getTable())
.where(
DSL.field("execution_id").eq(execution.getId())
);

return this.jdbcRepository.fetchOne(select);
});
}

@Override
public List<Trigger> findAll() {
return this.jdbcRepository
Expand Down
57 changes: 57 additions & 0 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.kestra.jdbc.runner;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.schedulers.*;
import io.kestra.core.services.FlowListenersInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Replaces;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;

@JdbcRunnerEnabled
@Singleton
@Slf4j
@Replaces(DefaultScheduler.class)
public class JdbcScheduler extends AbstractScheduler {
private final QueueInterface<Execution> executionQueue;
private final TriggerRepositoryInterface triggerRepository;

@SuppressWarnings("unchecked")
@Inject
public JdbcScheduler(
ApplicationContext applicationContext,
FlowListenersInterface flowListeners
) {
super(applicationContext, flowListeners);

executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
triggerRepository = applicationContext.getBean(TriggerRepositoryInterface.class);
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
executionState = applicationContext.getBean(SchedulerExecutionState.class);

this.isReady = true;
}

@Override
public void run() {
flowListeners.run();

// reset scheduler trigger at end
executionQueue.receive(
Scheduler.class,
execution -> {
if (execution.getState().getCurrent().isTerninated() && execution.getTrigger() != null) {
triggerRepository
.findByExecution(execution)
.ifPresent(trigger -> triggerRepository.save(trigger.resetExecution()));
}
});

super.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public Optional<Trigger> findLast(TriggerContext context) {

@Override
public Trigger save(Trigger trigger) {
// noop save with trigger queue
triggerRepository.save(trigger);

return trigger;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.repository.elasticsearch;

import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.executions.Execution;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -34,6 +35,17 @@ public Optional<Trigger> findLast(TriggerContext trigger) {
return this.rawGetRequest(INDEX_NAME, trigger.uid());
}

@Override
public Optional<Trigger> findByExecution(Execution execution) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.termQuery("executionId", execution.getId()))
.size(1);

List<Trigger> query = this.query(INDEX_NAME, sourceBuilder);

return query.size() > 0 ? Optional.of(query.get(0)) : Optional.empty();
}

@Override
public List<Trigger> findAll() {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public Optional<Trigger> findLast(TriggerContext trigger) {
throw new UnsupportedOperationException();
}

@Override
public Optional<Trigger> findByExecution(Execution execution) {
throw new UnsupportedOperationException();
}

@Override
public List<Trigger> findAll() {
return this.triggers;
Expand Down

0 comments on commit c3465ff

Please sign in to comment.