Skip to content

Commit

Permalink
fix(kafka-runner): don't crash on KafkaFlowListeners for invalid state
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Nov 3, 2021
1 parent 3cc7ef4 commit f0b0ad5
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 19 deletions.
2 changes: 2 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class Flow implements DeletedInterface {
private static final ObjectMapper jsonMapper = JacksonMapper.ofJson().copy()
.setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package io.kestra.core.models.flows;

import io.micronaut.core.annotation.Introspected;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.*;
import lombok.experimental.SuperBuilder;

@SuperBuilder
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class FlowSource extends Flow {
String source;
String exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.micronaut.core.annotation.Introspected;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.util.*;
Expand All @@ -25,6 +22,8 @@
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class Template implements DeletedInterface {
@NotNull
@NotBlank
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package io.kestra.core.models.templates;

import io.micronaut.core.annotation.Introspected;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.*;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;

Expand All @@ -12,6 +10,8 @@
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class TemplateSource extends Template {
String source;
String exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public Topology topology() {
KStream<String, Flow> stream = builder
.stream(
kafkaAdminService.getTopicName(Flow.class),
Consumed.with(Serdes.String(), JsonSerde.of(Flow.class))
Consumed.with(Serdes.String(), JsonSerde.of(Flow.class, false))
);

KStream<String, Flow> result = KafkaStreamSourceService.logIfEnabled(
Expand All @@ -98,12 +98,13 @@ public Topology topology() {
),
"flow-in"
)
.filter((key, value) -> value != null, Named.as("notNull"))
.selectKey((key, value) -> value.uidWithoutRevision(), Named.as("rekey"))
.groupBy(
(String key, Flow value) -> value.uidWithoutRevision(),
Grouped.<String, Flow>as("grouped")
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde.of(Flow.class))
.withValueSerde(JsonSerde.of(Flow.class, false))
)
.aggregate(
AllFlowRevision::new,
Expand All @@ -114,7 +115,7 @@ public Topology topology() {
},
Materialized.<String, AllFlowRevision, KeyValueStore<Bytes, byte[]>>as("list")
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde.of(AllFlowRevision.class))
.withValueSerde(JsonSerde.of(AllFlowRevision.class, false))
)
.mapValues(
(readOnlyKey, value) -> {
Expand Down Expand Up @@ -171,11 +172,12 @@ public Topology topology() {
builder
.table(
kafkaAdminService.getTopicName(KafkaStreamSourceService.TOPIC_FLOWLAST),
Consumed.with(Serdes.String(), JsonSerde.of(Flow.class)),
Consumed.with(Serdes.String(), JsonSerde.of(Flow.class, false)),
Materialized.<String, Flow, KeyValueStore<Bytes, byte[]>>as("flow")
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde.of(Flow.class))
.withValueSerde(JsonSerde.of(Flow.class, false))
)
.filter((key, value) -> value != null)
.toStream()
.peek((key, value) -> {
send(flows());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@ public class JsonDeserializer<T> implements Deserializer<T> {
private static final ObjectMapper mapper = JacksonMapper.ofJson(false);

private Class<T> cls;
private boolean strict;


public JsonDeserializer(Class<T> cls) {
super();

this.cls = cls;
this.strict = true;
}

public JsonDeserializer(Class<T> cls, boolean strict) {
super();

this.cls = cls;
this.strict = strict;
}

@Override
Expand All @@ -32,7 +42,11 @@ public T deserialize(String topic, byte[] bytes) {
try {
return mapper.readValue(bytes, this.cls);
} catch (IOException e) {
throw new SerializationException(e);
if (strict) {
throw new SerializationException(e);
} else {
return null;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@ public class JsonSerde<T> implements Serde<T> {
private final Serializer<T> serializer;
private final Deserializer<T> deserializer;

private JsonSerde(Class<T> cls) {
this.deserializer = new JsonDeserializer<>(cls);
private JsonSerde(Class<T> cls, boolean strict) {
this.deserializer = new JsonDeserializer<>(cls, strict);
this.serializer = new JsonSerializer<>();
}

public static <T> JsonSerde<T> of(Class<T> cls) {
return new JsonSerde<>(cls);
return new JsonSerde<>(cls, true);
}

public static <T> JsonSerde<T> of(Class<T> cls, boolean strict) {
return new JsonSerde<>(cls, strict);
}


@Override
public void configure(Map<String, ?> settings, boolean isKey) {
this.serializer.configure(settings, isKey);
Expand Down

0 comments on commit f0b0ad5

Please sign in to comment.