Skip to content

Commit

Permalink
fix(kafka-runner): add a Await on FlowJoinerTransformer
Browse files Browse the repository at this point in the history
if the flow is already fetch, this crashed the executor since before we are sending null values
  • Loading branch information
tchiotludo committed Feb 10, 2022
1 parent 4fb6507 commit f879df4
Showing 1 changed file with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.services.TaskDefaultService;
import io.kestra.core.tasks.flows.Template;
import io.kestra.core.utils.Await;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.processor.ProcessorContext;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;

@Slf4j
Expand All @@ -33,13 +36,24 @@ public void init(final ProcessorContext context) {

@Override
public Executor transform(String key, Executor executor) {
Optional<Flow> flowState = flowExecutorInterface.findByExecution(executor.getExecution());
Instant start = Instant.now();

if (flowState.isEmpty()) {
return null;
}
// pooling of new flow can be delayed on ExecutorStore, we maybe need to wait that the flow is updated
Flow flow = Await.until(
() -> {
Optional<Flow> flowState = flowExecutorInterface.findByExecution(executor.getExecution());

if (flowState.isEmpty() && start.plusSeconds(60).isBefore(Instant.now())) {
log.warn("Unable to find flow with namespace: '" + executor.getExecution().getNamespace() +
", id: " + executor.getExecution().getFlowId() + "', " +
"revision '" + executor.getExecution().getFlowRevision() + "' " +
"since '" + start + "'");
}

Flow flow = flowState.get();
return flowState.orElse(null);
},
Duration.ofMillis(100)
);

if (!withDefaults) {
return executor.withFlow(flow);
Expand Down

0 comments on commit f879df4

Please sign in to comment.