Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): stop realtime trigger and generate a FAILED execution if … #4288

Merged
merged 1 commit into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.triggers.AbstractTrigger;

import java.util.Collections;
import java.util.Map;
import jakarta.validation.constraints.NotNull;

Expand All @@ -25,7 +26,7 @@ public static ExecutionTrigger of(AbstractTrigger abstractTrigger, Output output
return ExecutionTrigger.builder()
.id(abstractTrigger.getId())
.type(abstractTrigger.getType())
.variables(output.toMap())
.variables(output != null ? output.toMap() : Collections.emptyMap())
.build();
}

Expand Down
45 changes: 40 additions & 5 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.RealtimeTriggerInterface;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.models.triggers.*;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
Expand Down Expand Up @@ -378,6 +377,41 @@ private void handleTriggerError(WorkerTrigger workerTrigger, Throwable e) {
);
}

private void handleRealtimeTriggerError(WorkerTrigger workerTrigger, Throwable e) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
.increment();

// We create a FAILED execution, so the user is aware that the realtime trigger failed to be created
var execution = TriggerService
.generateRealtimeExecution(workerTrigger.getTrigger(), workerTrigger.getConditionContext(), workerTrigger.getTriggerContext(), null)
.withState(FAILED);

// We create an ERROR log attached to the execution
Logger logger = workerTrigger.getConditionContext().getRunContext().logger();
logService.logExecution(
execution,
logger,
Level.ERROR,
"[date: {}] Realtime trigger failed to be created in the worker with error: {}",
workerTrigger.getTriggerContext().getDate(),
e != null ? e.getMessage() : "unknown",
e
);
if (logger.isTraceEnabled() && e != null) {
logger.trace(Throwables.getStackTraceAsString(e));
}

this.workerTriggerResultQueue.emit(
WorkerTriggerResult.builder()
.success(false)
.execution(Optional.of(execution))
.triggerContext(workerTrigger.getTriggerContext())
.trigger(workerTrigger.getTrigger())
.build()
);
}

private void handleTrigger(WorkerTrigger workerTrigger) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_STARTED_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
Expand Down Expand Up @@ -431,8 +465,9 @@ private void handleTrigger(WorkerTrigger workerTrigger) {
);
io.kestra.core.models.flows.State.Type state = runThread(workerThread, runContext.logger());

// here the realtime trigger fail before the publisher being call so we create a fail execution
if (workerThread.getException() != null || !state.equals(SUCCESS)) {
this.handleTriggerError(workerTrigger, workerThread.getException());
this.handleRealtimeTriggerError(workerTrigger, workerThread.getException());
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.function.Consumer;

import static io.kestra.core.models.flows.State.Type.FAILED;
import static io.kestra.core.models.flows.State.Type.SUCCESS;

public class WorkerTriggerRealtimeThread extends AbstractWorkerTriggerThread {
Expand All @@ -29,10 +30,22 @@ public WorkerTriggerRealtimeThread(

@Override
public void doRun() throws Exception {
Publisher<Execution> evaluate = streamingTrigger.evaluate(
workerTrigger.getConditionContext().withRunContext(runContext),
workerTrigger.getTriggerContext()
);
Publisher<Execution> evaluate;

try {
evaluate = streamingTrigger.evaluate(
workerTrigger.getConditionContext().withRunContext(runContext),
workerTrigger.getTriggerContext()
);
} catch (Exception e) {
// If the Publisher cannot be created, we create a failed execution
taskState = FAILED;
exception = e;
return;
}

// Here the publisher can be created, so the task is in success.
// Errors can still occur, but they should be recovered automatically.
taskState = SUCCESS;
Flux.from(evaluate)
.onBackpressureBuffer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,15 @@ public void run() {
}

WorkerTriggerResult workerTriggerResult = either.getLeft();
if (workerTriggerResult.getSuccess() && workerTriggerResult.getExecution().isPresent()) {
if (workerTriggerResult.getTrigger() instanceof RealtimeTriggerInterface) {
this.emitExecution(workerTriggerResult.getExecution().get(), workerTriggerResult.getTriggerContext());
} else {
SchedulerExecutionWithTrigger triggerExecution = new SchedulerExecutionWithTrigger(
workerTriggerResult.getExecution().get(),
workerTriggerResult.getTriggerContext()
);
ZonedDateTime nextExecutionDate = this.nextEvaluationDate(workerTriggerResult.getTrigger());
this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate);
}
if (workerTriggerResult.getTrigger() instanceof RealtimeTriggerInterface && workerTriggerResult.getExecution().isPresent()) {
this.emitExecution(workerTriggerResult.getExecution().get(), workerTriggerResult.getTriggerContext());
} else if (workerTriggerResult.getSuccess() && workerTriggerResult.getExecution().isPresent()) {
SchedulerExecutionWithTrigger triggerExecution = new SchedulerExecutionWithTrigger(
workerTriggerResult.getExecution().get(),
workerTriggerResult.getTriggerContext()
);
ZonedDateTime nextExecutionDate = this.nextEvaluationDate(workerTriggerResult.getTrigger());
this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate);
} else {
ZonedDateTime nextExecutionDate = this.nextEvaluationDate(workerTriggerResult.getTrigger());
this.triggerState.update(Trigger.of(workerTriggerResult.getTriggerContext(), nextExecutionDate));
Expand Down
Loading