Skip to content

Commit

Permalink
chore(tests): fix falling test
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Feb 11, 2022
1 parent f879df4 commit 77e9367
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ protected FlowExecutorInterface flowExecutorInterface() {
}

public Executor process(Executor executor) {
// previous failed (flow join can fail), just forward
if (executor.getException() != null) {
return executor;
}

try {
executor = this.handleRestart(executor);
executor = this.handleEnd(executor);
Expand Down
21 changes: 16 additions & 5 deletions core/src/main/java/io/kestra/core/utils/Await.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,31 @@ public static void until(BooleanSupplier condition, Duration sleep, Duration tim
}
}

public static <T> T until(Supplier<T> supplier, Duration sleep) {
AtomicReference<T> result = new AtomicReference<>();

Await.until(() -> {
private static <T> BooleanSupplier untilSupplier(Supplier<T> supplier, AtomicReference<T> result) {
return () -> {
T t = supplier.get();
if (t != null) {
result.set(t);
return true;
} else {
return false;
}
}, sleep);
};
}

public static <T> T until(Supplier<T> supplier, Duration sleep, Duration timeout) throws TimeoutException {
AtomicReference<T> result = new AtomicReference<>();

Await.until(untilSupplier(supplier, result), sleep, timeout);

return result.get();
}

public static <T> T until(Supplier<T> supplier, Duration sleep) {
AtomicReference<T> result = new AtomicReference<>();

Await.until(untilSupplier(supplier, result), sleep);

return result.get();
}
}
11 changes: 6 additions & 5 deletions core/src/test/java/io/kestra/core/tasks/flows/TemplateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.ListenersTest;
import io.kestra.core.tasks.debugs.Echo;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -43,7 +44,10 @@ public class TemplateTest extends AbstractMemoryRunnerTest {
.namespace("io.kestra.tests")
.tasks(Collections.singletonList(Echo.builder().id("test").type(Echo.class.getName()).format("{{ parent.outputs.args['my-forward'] }}").build())).build();

public static void withTemplate(RunnerUtils runnerUtils, TemplateRepositoryInterface templateRepository, QueueInterface<LogEntry> logQueue) throws TimeoutException {
public static void withTemplate(RunnerUtils runnerUtils, TemplateRepositoryInterface templateRepository, LocalFlowRepositoryLoader repositoryLoader, QueueInterface<LogEntry> logQueue) throws TimeoutException, IOException, URISyntaxException {
templateRepository.create(TEMPLATE_1);
repositoryLoader.load(Objects.requireNonNull(ListenersTest.class.getClassLoader().getResource("flows/tests/with-template.yaml")));

List<LogEntry> logs = new ArrayList<>();
logQueue.receive(logs::add);

Expand All @@ -66,9 +70,6 @@ public static void withTemplate(RunnerUtils runnerUtils, TemplateRepositoryInter

@Test
void withTemplate() throws TimeoutException, IOException, URISyntaxException {
templateRepository.create(TEMPLATE_1);
repositoryLoader.load(Objects.requireNonNull(ListenersTest.class.getClassLoader().getResource("flows/tests/with-template.yaml")));

TemplateTest.withTemplate(runnerUtils, templateRepository, logQueue);
TemplateTest.withTemplate(runnerUtils, templateRepository, repositoryLoader, logQueue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import org.apache.kafka.streams.processor.ProcessorContext;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.TimeoutException;

@Slf4j
public class FlowJoinerTransformer implements ValueTransformerWithKey<String, Executor, Executor> {
Expand All @@ -36,24 +35,23 @@ public void init(final ProcessorContext context) {

@Override
public Executor transform(String key, Executor executor) {
Instant start = Instant.now();
Flow flow;

// pooling of new flow can be delayed on ExecutorStore, we maybe need to wait that the flow is updated
Flow flow = Await.until(
() -> {
Optional<Flow> flowState = flowExecutorInterface.findByExecution(executor.getExecution());

if (flowState.isEmpty() && start.plusSeconds(60).isBefore(Instant.now())) {
log.warn("Unable to find flow with namespace: '" + executor.getExecution().getNamespace() +
", id: " + executor.getExecution().getFlowId() + "', " +
"revision '" + executor.getExecution().getFlowRevision() + "' " +
"since '" + start + "'");
}

return flowState.orElse(null);
},
Duration.ofMillis(100)
);
try {
// pooling of new flow can be delayed on ExecutorStore, we maybe need to wait that the flow is updated
flow = Await.until(
() -> flowExecutorInterface.findByExecution(executor.getExecution()).orElse(null),
Duration.ofMillis(100),
Duration.ofMinutes(5)
);
} catch (TimeoutException e) {
return executor.withException(
new Exception("Unable to find flow with namespace: '" + executor.getExecution().getNamespace() + "'" +
", id: '" + executor.getExecution().getFlowId() + "', " +
"revision '" + executor.getExecution().getFlowRevision() + "'"),
"joinFlow"
);
}

if (!withDefaults) {
return executor.withFlow(flow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ void eachWithNull() throws Exception {

@Test
void withTemplate() throws Exception {
TemplateTest.withTemplate(runnerUtils, templateRepository, logsQueue);
TemplateTest.withTemplate(runnerUtils, templateRepository, repositoryLoader, logsQueue);
}

@Test
Expand Down

0 comments on commit 77e9367

Please sign in to comment.