Skip to content

Commit

Permalink
fix(jdbc): remove a plugins can lead to stop of executor
Browse files Browse the repository at this point in the history
close #608
  • Loading branch information
tchiotludo committed Sep 21, 2022
1 parent 71554e5 commit 42d1da3
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 46 deletions.
8 changes: 7 additions & 1 deletion core/src/main/java/io/kestra/core/runners/FlowListeners.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.flows.FlowSource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import io.kestra.core.models.flows.Flow;
Expand All @@ -16,6 +17,8 @@
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
Expand All @@ -39,7 +42,10 @@ public FlowListeners(
@Named(QueueFactoryInterface.FLOW_NAMED) QueueInterface<Flow> flowQueue
) {
this.flowQueue = flowQueue;
this.flows = flowRepository.findAll();
this.flows = flowRepository.findAll()
.stream()
.filter(flow -> !(flow instanceof FlowSource))
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.repository.postgres;

import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.micronaut.context.ApplicationContext;
import io.micronaut.data.model.Pageable;
Expand Down Expand Up @@ -35,7 +36,7 @@ public Condition fullTextCondition(List<String> fields, String query) {
public Map<Field<Object>, Object> persistFields(T entity) {
Map<Field<Object>, Object> fields = super.persistFields(entity);

String json = MAPPER.writeValueAsString(entity);
String json = JdbcMapper.of().writeValueAsString(entity);
fields.replace(AbstractJdbcRepository.field("value"), DSL.val(JSONB.valueOf(json)));

return fields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@

@MicronautTest
public class PostgresFlowRepositoryTest extends AbstractJdbcFlowRepositoryTest {
@Override
public void invalidFlow() {

}
}
60 changes: 22 additions & 38 deletions jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package io.kestra.jdbc;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.serializers.JacksonMapper;
Expand All @@ -15,48 +11,26 @@
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.jooq.*;
import org.jooq.impl.DSL;

import java.io.IOException;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public abstract class AbstractJdbcRepository<T> {
protected static final ObjectMapper MAPPER = JacksonMapper.ofJson();

static {
final SimpleModule module = new SimpleModule();
module.addSerializer(Instant.class, new JsonSerializer<>() {
@Override
public void serialize(Instant instant, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
jsonGenerator.writeString(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.withZone(ZoneOffset.UTC)
.format(instant)
);
}
});

module.addSerializer(ZonedDateTime.class, new JsonSerializer<>() {
@Override
public void serialize(ZonedDateTime instant, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
jsonGenerator.writeString(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
.format(instant)
);
}
});

MAPPER.registerModule(module);
}

protected final QueueService queueService;

protected final Class<T> cls;

@Setter
protected Function<Record, T> deserializer;

@Getter
protected final JooqDSLContextWrapper dslContextWrapper;

Expand Down Expand Up @@ -91,7 +65,7 @@ protected String key(T entity) {
@SneakyThrows
public Map<Field<Object>, Object> persistFields(T entity) {
return new HashMap<>(ImmutableMap
.of(io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"), MAPPER.writeValueAsString(entity))
.of(io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"), JdbcMapper.of().writeValueAsString(entity))
);
}

Expand Down Expand Up @@ -132,10 +106,20 @@ public int delete(DSLContext dslContext, T entity) {
}

public <R extends Record> T map(R record) {
if (deserializer != null) {
return deserializer.apply(record);
} else {
return this.deserialize(record.get("value", String.class));
}
}

public T deserialize(String record) {
try {
return JacksonMapper.ofJson().readValue(record.get("value", String.class), cls);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
return JacksonMapper.ofJson().readValue(record, cls);
} catch (InvalidTypeIdException e) {
throw new DeserializationException(e);
} catch (IOException e) {
throw new DeserializationException(e);
}
}

Expand Down
49 changes: 49 additions & 0 deletions jdbc/src/main/java/io/kestra/jdbc/JdbcMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.kestra.jdbc;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.kestra.core.serializers.JacksonMapper;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;

public abstract class JdbcMapper {
private static ObjectMapper MAPPER;

public static ObjectMapper of() {
if (MAPPER == null) {
MAPPER = JacksonMapper.ofJson().copy();

final SimpleModule module = new SimpleModule();
module.addSerializer(Instant.class, new JsonSerializer<>() {
@Override
public void serialize(Instant instant, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
jsonGenerator.writeString(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.withZone(ZoneOffset.UTC)
.format(instant)
);
}
});

module.addSerializer(ZonedDateTime.class, new JsonSerializer<>() {
@Override
public void serialize(ZonedDateTime instant, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
jsonGenerator.writeString(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
.format(instant)
);
}
});

MAPPER.registerModule(module);
}

return MAPPER;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package io.kestra.jdbc.repository;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowSource;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.queues.QueueFactoryInterface;
Expand All @@ -12,6 +16,7 @@
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService;
import io.kestra.jdbc.JdbcMapper;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.data.model.Pageable;
Expand Down Expand Up @@ -40,6 +45,28 @@ public AbstractJdbcFlowRepository(io.kestra.jdbc.AbstractJdbcRepository<Flow> jd
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.triggerQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TRIGGER_NAMED));
this.flowQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.FLOW_NAMED));

this.jdbcRepository.setDeserializer(record -> {
String source = record.get("value", String.class);

try {
return this.jdbcRepository.deserialize(source);
} catch (DeserializationException e) {
try {
JsonNode jsonNode = JdbcMapper.of().readTree(source);
return FlowSource.builder()
.id(jsonNode.get("id").asText())
.namespace(jsonNode.get("namespace").asText())
.revision(jsonNode.get("revision").asInt())
.source(JacksonMapper.ofJson().writeValueAsString(JacksonMapper.toMap(source)))
.exception(e.getMessage())
.tasks(List.of())
.build();
} catch (JsonProcessingException ex) {
throw new DeserializationException(ex);
}
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;

class AbstractJdbcRepositoryTest {
class JdbcMapperTest {
@Test
void instant() throws JsonProcessingException {
String serialize = AbstractJdbcRepository.MAPPER.writeValueAsString(LogEntry.builder()
String serialize = JdbcMapper.of().writeValueAsString(LogEntry.builder()
.timestamp(Instant.parse("2019-10-06T18:27:49.000Z"))
.build()
);
Expand All @@ -24,7 +24,7 @@ void instant() throws JsonProcessingException {

@Test
void zoneDateTime() throws JsonProcessingException {
String serialize = AbstractJdbcRepository.MAPPER.writeValueAsString(MultipleConditionWindow.builder()
String serialize = JdbcMapper.of().writeValueAsString(MultipleConditionWindow.builder()
.start(ZonedDateTime.parse("2013-09-08T16:19:12.000000+02:00"))
.build()
);
Expand All @@ -34,7 +34,7 @@ void zoneDateTime() throws JsonProcessingException {

@Test
void zoneDateTimeMs() throws JsonProcessingException {
String serialize = AbstractJdbcRepository.MAPPER.writeValueAsString(MultipleConditionWindow.builder()
String serialize = JdbcMapper.of().writeValueAsString(MultipleConditionWindow.builder()
.start(ZonedDateTime.parse("2013-09-08T16:19:12.001234+02:00"))
.build()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
import io.kestra.core.Helpers;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowSource;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -14,12 +19,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import jakarta.inject.Inject;

import static io.kestra.jdbc.repository.AbstractJdbcRepository.field;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;

public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repositories.AbstractFlowRepositoryTest {
@Inject
Expand All @@ -28,6 +34,9 @@ public abstract class AbstractJdbcFlowRepositoryTest extends io.kestra.core.repo
@Inject
JdbcTestUtils jdbcTestUtils;

@Inject
protected JooqDSLContextWrapper dslContextWrapper;

@Test
void find() {
List<Flow> save = flowRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc("id"))), null, null, null);
Expand Down Expand Up @@ -61,6 +70,35 @@ void findSourceCode() {
assertThat(flow.getFragments().get(0), containsString("types.MultipleCondition[/mark]"));
}

@Test
public void invalidFlow() {
dslContextWrapper.transaction(configuration -> {
DSLContext context = DSL.using(configuration);

context.insertInto(flowRepository.jdbcRepository.getTable())
.set(field("key"), "io.kestra.unittest_invalid")
.set(field("source_code"), "")
.set(field("value"), JacksonMapper.ofJson().writeValueAsString(Map.of(
"id", "invalid",
"namespace", "io.kestra.unittest",
"revision", 1,
"tasks", List.of(Map.of(
"id", "invalid",
"type", "io.kestra.core.tasks.debugs.Echo",
"level", "invalid"
)),
"deleted", false
)))
.execute();
});

Optional<Flow> flow = flowRepository.findById("io.kestra.unittest", "invalid");

assertThat(flow.isPresent(), is(true));
assertThat(flow.get(), instanceOf(FlowSource.class));
assertThat(((FlowSource) flow.get()).getException(), containsString("Cannot deserialize value of type `org.slf4j.event.Level`"));
}

@BeforeEach
protected void init() throws IOException, URISyntaxException {
jdbcTestUtils.drop();
Expand Down

0 comments on commit 42d1da3

Please sign in to comment.