diff --git a/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java b/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java index b6e57592b32..7f290b89d66 100644 --- a/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java +++ b/core/src/main/java/io/kestra/core/queues/QueueFactoryInterface.java @@ -26,6 +26,7 @@ public interface QueueFactoryInterface { String TRIGGER_NAMED = "triggerQueue"; String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue"; String CLUSTER_EVENT_NAMED = "clusterEventQueue"; + String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue"; QueueInterface execution(); @@ -58,4 +59,6 @@ public interface QueueFactoryInterface { WorkerTriggerResultQueueInterface workerTriggerResultQueue(); QueueInterface subflowExecutionResult(); + + QueueInterface subflowExecutionEnd(); } diff --git a/core/src/main/java/io/kestra/core/queues/QueueService.java b/core/src/main/java/io/kestra/core/queues/QueueService.java index 2a996e667eb..0349938b571 100644 --- a/core/src/main/java/io/kestra/core/queues/QueueService.java +++ b/core/src/main/java/io/kestra/core/queues/QueueService.java @@ -29,6 +29,8 @@ public String key(Object object) { return null; } else if (object.getClass() == ExecutionRunning.class) { return ((ExecutionRunning) object).getExecution().getId(); + } else if (object.getClass() == SubflowExecutionEnd.class) { + return ((SubflowExecutionEnd) object).getExecutionId(); } else { throw new IllegalArgumentException("Unknown type '" + object.getClass().getName() + "'"); } diff --git a/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java index 923f1bb2524..db15dd1a4c4 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java @@ -146,7 +146,8 @@ public static > Optional> "namespace", currentFlow.getNamespace(), "flowId", currentFlow.getId(), "flowRevision", currentFlow.getRevision(), - "taskRunId", currentTaskRun.getId() + "taskRunId", currentTaskRun.getId(), + "taskId", currentTaskRun.getTaskId() )); if (currentTaskRun.getValue() != null) { variables.put("taskRunValue", currentTaskRun.getValue()); @@ -185,6 +186,10 @@ private static List