Skip to content

Commit

Permalink
fix(kafka-runner): remove a plugins can lead to stop of executor
Browse files Browse the repository at this point in the history
related to #608
  • Loading branch information
tchiotludo committed Sep 21, 2022
1 parent 8372709 commit 2e0e6ce
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ public Optional<Flow> findById(String namespace, String id, Optional<Integer> re
return flowRepositoryInterface.findById(namespace, id, revision);
}
}

@Override
public Boolean isReady() {
return true;
}
}
4 changes: 4 additions & 0 deletions core/src/main/java/io/kestra/core/runners/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public Executor(WorkerTaskResult workerTaskResult) {
this.joined = workerTaskResult;
}

public Boolean canBeProcessed() {
return !(this.getException() != null || this.getFlow() == null || this.getExecution().isDeleted());
}

public Executor withFlow(Flow flow) {
this.flow = flow;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected FlowExecutorInterface flowExecutorInterface() {

public Executor process(Executor executor) {
// previous failed (flow join can fail), just forward
if (executor.getException() != null || executor.getExecution().isDeleted()) {
if (!executor.canBeProcessed()) {
return executor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ public interface FlowExecutorInterface {

Optional<Flow> findById(String namespace, String id, Optional<Integer> revision);

Boolean isReady();

default Optional<Flow> findByIdFromFlowTask(String namespace, String id, Optional<Integer> revision, String fromNamespace, String fromId) {
return this.findById(
namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public boolean isTerminatedWithListeners(Flow flow, Execution execution) {
}

public List<ResolvedTask> findValidListeners(Flow flow, Execution execution) {
if (flow.getListeners() == null) {
if (flow == null || flow.getListeners() == null) {
return new ArrayList<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public synchronized void setStore(SafeKeyValueStore<String, Flow> store) {
@SneakyThrows
private void await() {
if (flowsLast == null || store == null) {
Await.until(() -> this.flowsLast != null && store != null, Duration.ofMillis(100), Duration.ofMinutes(5));
Await.until(() -> this.isReady() == true, Duration.ofMillis(100), Duration.ofMinutes(5));
}
}

Expand Down Expand Up @@ -67,4 +67,9 @@ public Optional<Flow> findById(String namespace, String id, Optional<Integer> re

return this.store.get(Flow.uid(namespace, id, revision));
}

@Override
public Boolean isReady() {
return this.flowsLast != null && store != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

@Singleton
@Slf4j
Expand Down Expand Up @@ -74,16 +73,29 @@ public Executor joinFlow(Executor executor, Boolean withDefaults) {

try {
// pooling of new flow can be delayed on ExecutorStore, we maybe need to wait that the flow is updated
flow = Await.until(
() -> flowExecutorInterface.findByExecution(executor.getExecution()).orElse(null),
Duration.ofMillis(100),
Duration.ofMinutes(5)
);
if (!flowExecutorInterface.isReady()) {
flow = Await.until(
() -> flowExecutorInterface.findByExecution(executor.getExecution()).orElse(null),
Duration.ofMillis(100),
Duration.ofMinutes(5)
);
} else {
flow = flowExecutorInterface.findByExecution(executor.getExecution())
.orElseThrow(() -> new TimeoutException("Unable to find flow with flow executor ready"));
}
} catch (TimeoutException e) {
// execution is failed, can't find flow, avoid recursive exception, skipped it.
if (executor.getExecution().getState().isFailed()) {
return executor;
}

return executor.withException(
new Exception("Unable to find flow with namespace: '" + executor.getExecution().getNamespace() + "'" +
", id: '" + executor.getExecution().getFlowId() + "', " +
"revision '" + executor.getExecution().getFlowRevision() + "'"),
new Exception(
"Unable to find flow with namespace: '" + executor.getExecution().getNamespace() + "'" +
", id: '" + executor.getExecution().getFlowId() + "', " +
"revision '" + executor.getExecution().getFlowRevision() + "'",
e
),
"joinFlow"
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public void init(final ProcessorContext context) {

@Override
public Executor transform(final String key, final Executor value) {
// previous failed (flow join can fail), just forward
if (!value.canBeProcessed()) {
return value;
}

Executor executor = executorService.process(value);

if (executor.getNexts().size() == 0) {
Expand Down

0 comments on commit 2e0e6ce

Please sign in to comment.