Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve deserialization error handling #2622

Merged
merged 5 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions core/src/main/java/io/kestra/core/models/executions/TaskRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,27 @@ public TaskRun withState(State.Type state) {
);
}

public TaskRun fail() {
var attempt = TaskRunAttempt.builder().state(new State(State.Type.FAILED)).build();
List<TaskRunAttempt> newAttempts = this.attempts == null ? new ArrayList<>(1) : this.attempts;
newAttempts.add(attempt);

return new TaskRun(
this.tenantId,
this.id,
this.executionId,
this.namespace,
this.flowId,
this.taskId,
this.parentTaskRunId,
this.value,
newAttempts,
this.outputs,
this.state.withState(State.Type.FAILED),
this.items
);
}

public TaskRun forChildExecution(Map<String, String> remapTaskRunId, String executionId, State state) {
return TaskRun.builder()
.tenantId(this.getTenantId())
Expand Down
22 changes: 17 additions & 5 deletions core/src/main/java/io/kestra/core/runners/FlowListeners.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package io.kestra.core.runners;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.serializers.JacksonMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowListenersInterface;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -24,6 +25,8 @@
@Singleton
@Slf4j
public class FlowListeners implements FlowListenersInterface {
private static final ObjectMapper MAPPER = JacksonMapper.ofJson();

private Boolean isStarted = false;
private final QueueInterface<Flow> flowQueue;
private final List<Flow> flows;
Expand All @@ -47,12 +50,21 @@ public void run() {
this.isStarted = true;

this.flowQueue.receive(either -> {
Flow flow;
if (either.isRight()) {
log.error("Unable to deserialize a flow: {}", either.getRight().getMessage());
return;
try {
var jsonNode = MAPPER.readTree(either.getRight().getRecord());
flow = FlowWithException.from(jsonNode, either.getRight()).orElseThrow(IOException::new);
} catch (IOException e) {
// if we cannot create a FlowWithException, ignore the message
log.error("Unexpected exception when trying to handle a deserialization error", e);
return;
}
}
else {
flow = either.getLeft();
}

Flow flow = either.getLeft();
Optional<Flow> previous = this.previous(flow);

if (flow.isDeleted()) {
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.hash.Hashing;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.TimeoutExceededException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
Expand All @@ -18,6 +19,7 @@
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
Expand Down Expand Up @@ -130,6 +132,7 @@ public void run() {
executors.execute(() -> {
if (either.isRight()) {
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
handleDeserializationError(either.getRight());
return;
}

Expand All @@ -145,6 +148,29 @@ else if (workerTask instanceof WorkerTrigger trigger) {
);
}

private void handleDeserializationError(DeserializationException deserializationException) {
if (deserializationException.getRecord() != null) {
try {
var json = MAPPER.readTree(deserializationException.getRecord());
var type = json.get("type") != null ? json.get("type").asText() : null;
if ("task".equals(type)) {
// try to deserialize the taskRun to fail it
var taskRun = MAPPER.treeToValue(json.get("taskRun"), TaskRun.class);
this.workerTaskResultQueue.emit(new WorkerTaskResult(taskRun.fail()));
} else if ("trigger".equals(type)) {
// try to deserialize the triggerContext to fail it
var triggerContext = MAPPER.treeToValue(json.get("triggerContext"), TriggerContext.class);
var workerTriggerResult = WorkerTriggerResult.builder().triggerContext(triggerContext).success(false).execution(Optional.empty()).build();
this.workerTriggerResultQueue.emit(workerTriggerResult);
}
}
catch (IOException e) {
// ignore the message if we cannot do anything about it
log.error("Unexpected exception when trying to handle a deserialization error", e);
}
}
}

private void handleTask(WorkerTask workerTask) {
if (workerTask.getTask() instanceof RunnableTask) {
this.run(workerTask, true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package io.kestra.core.runners;

import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.utils.Await;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@Singleton
public class DeserializationIssuesCaseTest {
private static final String INVALID_WORKER_TASK_KEY = "5PGRX6ve2cztrRSIbfGphO";
private static final String INVALID_WORKER_TASK_VALUE = """
{
"task": {
"id": "invalid",
"type": "io.kestra.notfound.Invalid"
},
"type": "task",
"taskRun": {
"id": "5PGRX6ve2cztrRSIbfGphO",
"state": {
"current": "CREATED",
"duration": 0.058459656,
"histories": [
{
"date": "2023-11-28T10:16:22.324536603Z",
"state": "CREATED"
}
],
"startDate": "2023-11-28T10:16:22.324536603Z"
},
"flowId": "hello-world",
"taskId": "hello",
"namespace": "company.team",
"executionId": "7IBX10Tg3ZzZuNUnLhoXcT"
},
"runContext": {
"variables": {
"envs": {
"plugins_path": "/home/loic/dev/kestra-plugins"
},
"flow": {
"id": "hello-world",
"revision": 1,
"namespace": "company.team"
},
"task": {
"id": "hello",
"type": "io.kestra.core.tasks.log.Log"
},
"taskrun": {
"id": "5PGRX6ve2cztrRSIbfGphO",
"startDate": "2023-11-28T10:16:22.324536603Z",
"attemptsCount": 0
},
"execution": {
"id": "7IBX10Tg3ZzZuNUnLhoXcT",
"startDate": "2023-11-28T10:16:21.648Z",
"originalId": "7IBX10Tg3ZzZuNUnLhoXcT"
}
},
"storageOutputPrefix": "///company/team/hello-world/executions/7IBX10Tg3ZzZuNUnLhoXcT/tasks/hello/5PGRX6ve2cztrRSIbfGphO"
}
}""";

private static final String INVALID_WORKER_TRIGGER_KEY = "dev_http-trigger_http";
private static final String INVALID_WORKER_TRIGGER_VALUE = """
{
"type": "trigger",
"trigger": {
"id": "invalid",
"type": "io.kestra.notfound.Invalid"
},
"triggerContext": {
"date": "2023-11-24T15:48:57.632881597Z",
"flowId": "http-trigger",
"namespace": "dev",
"triggerId": "http",
"flowRevision": 3
},
"conditionContext": {
"flow": {
"id": "http-trigger",
"tasks": [
{
"id": "hello",
"type": "io.kestra.core.tasks.log.Log",
"message": "Kestra team wishes you a great day! 👋"
}
],
"deleted": false,
"disabled": false,
"revision": 3,
"triggers": [
{
"id": "invalid",
"type": "io.kestra.notfound.Invalid"
}
],
"namespace": "dev"
},
"runContext": {
"variables": {
"envs": {
"plugins_path": "/home/loic/dev/kestra-plugins"
},
"flow": {
"id": "http-trigger",
"revision": 3,
"namespace": "dev"
},
"trigger": {
"id": "invalid",
"type": "io.kestra.notfound.Invalid"
}
}
}
}
}
""";

private static final String INVALID_FLOW_KEY = "company.team_hello-world_2";
private static final String INVALID_FLOW_VALUE = """
{
"id": "hello-world",
"tasks": [
{
"id": "invalid",
"type": "io.kestra.notfound.Invalid"
}
],
"deleted": false,
"disabled": false,
"revision": 2,
"namespace": "company.team"
}
""";

@Inject
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
protected QueueInterface<WorkerTaskResult> workerTaskResultQueue;

@Inject
@Named(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED)
protected QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;

@Inject
private FlowListenersInterface flowListeners;

public record QueueMessage(Class<?> type, String key, String value) {}


public void workerTaskDeserializationIssue(Consumer<QueueMessage> sendToQueue) throws TimeoutException {
AtomicReference<WorkerTaskResult> workerTaskResult = new AtomicReference<>();
workerTaskResultQueue.receive(either -> {
if (either != null) {
workerTaskResult.set(either.getLeft());
}
});

sendToQueue.accept(new QueueMessage(WorkerJob.class, INVALID_WORKER_TASK_KEY, INVALID_WORKER_TASK_VALUE));

Await.until(
() -> workerTaskResult.get() != null && workerTaskResult.get().getTaskRun().getState().isTerminated(),
Duration.ofMillis(100),
Duration.ofMinutes(1)
);
assertThat(workerTaskResult.get().getTaskRun().getState().getHistories().size(), is(2));
loicmathieu marked this conversation as resolved.
Show resolved Hide resolved
assertThat(workerTaskResult.get().getTaskRun().getState().getHistories().get(0).getState(), is(State.Type.CREATED));
assertThat(workerTaskResult.get().getTaskRun().getState().getCurrent(), is(State.Type.FAILED));
}

public void workerTriggerDeserializationIssue(Consumer<QueueMessage> sendToQueue) throws TimeoutException {
AtomicReference<WorkerTriggerResult> workerTriggerResult = new AtomicReference<>();
workerTriggerResultQueue.receive(either -> {
if (either != null) {
workerTriggerResult.set(either.getLeft());
}
});

sendToQueue.accept(new QueueMessage(WorkerJob.class, INVALID_WORKER_TRIGGER_KEY, INVALID_WORKER_TRIGGER_VALUE));

Await.until(
() -> workerTriggerResult.get() != null,
Duration.ofMillis(100),
Duration.ofMinutes(1)
);
assertThat(workerTriggerResult.get().getSuccess(), is(Boolean.FALSE));
}

public void flowDeserializationIssue(Consumer<QueueMessage> sendToQueue) throws TimeoutException {
AtomicReference<List<Flow>> flows = new AtomicReference<>();
flowListeners.listen(newFlows -> flows.set(newFlows));

sendToQueue.accept(new QueueMessage(Flow.class, INVALID_FLOW_KEY, INVALID_FLOW_VALUE));

Await.until(
() -> flows.get() != null && flows.get().stream().anyMatch(newFlow -> newFlow.uid().equals("company.team_hello-world_2") && (newFlow.getTasks() == null || newFlow.getTasks().isEmpty())),
Duration.ofMillis(100),
Duration.ofMinutes(1)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.kestra.runner.h2;

import io.kestra.jdbc.runner.AbstractJdbcDeserializationIssuesTest;

class H2JdbcDeserializationIssuesTest extends AbstractJdbcDeserializationIssuesTest {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.kestra.runner.mysql;

import io.kestra.jdbc.runner.AbstractJdbcDeserializationIssuesTest;

class MySqlJdbcDeserializationIssuesTest extends AbstractJdbcDeserializationIssuesTest {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.kestra.runner.postgres;

import io.kestra.core.runners.DeserializationIssuesCaseTest;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.kestra.jdbc.runner.AbstractJdbcDeserializationIssuesTest;
import org.jooq.Field;
import org.jooq.impl.DSL;

import java.util.Map;

class PostgresJdbcDeserializationIssuesTest extends AbstractJdbcDeserializationIssuesTest {
protected Map<Field<Object>, Object> fields(DeserializationIssuesCaseTest.QueueMessage queueMessage) {
Map<Field<Object>, Object> fields = super.fields(queueMessage);
fields.put(AbstractJdbcRepository.field("type"), DSL.field("CAST(? AS queue_type)", queueMessage.type().getName()));
return fields;
}
}
Loading