Skip to content

Commit

Permalink
feat(core): wait for an execution (#788)
Browse files Browse the repository at this point in the history
close #785
  • Loading branch information
fryck authored Oct 14, 2022
1 parent c787f66 commit 53023ee
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.micronaut.validation.Validated;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
Expand Down Expand Up @@ -244,7 +245,7 @@ public Execution get(

@Delete(uri = "executions/{executionId}", produces = MediaType.TEXT_JSON)
@ExecuteOn(TaskExecutors.IO)
@Operation(tags = {"Flows"}, summary = "Delete an execution")
@Operation(tags = {"Executions"}, summary = "Delete an execution")
@ApiResponses(
@ApiResponse(responseCode = "204", description = "On success")
)
Expand Down Expand Up @@ -357,8 +358,9 @@ private Execution webhook(
public Execution trigger(
@Parameter(description = "The flow namespace") String namespace,
@Parameter(description = "The flow id") String id,
@Nullable Map<String, String> inputs,
@Nullable Publisher<StreamingFileUpload> files
@Parameter(description = "The inputs") @Nullable Map<String, String> inputs,
@Parameter(description = "The inputs of type file") @Nullable Publisher<StreamingFileUpload> files,
@Parameter(description = "If the server will wait the end of the execution") @QueryValue(value = "wait", defaultValue = "false") Boolean wait
) {
Optional<Flow> find = flowRepository.findById(namespace, id);
if (find.isEmpty()) {
Expand All @@ -377,7 +379,33 @@ public Execution trigger(
executionQueue.emit(current);
eventPublisher.publishEvent(new CrudEvent<>(current, CrudEventType.CREATE));

return current;
if (!wait) {
return current;
}

AtomicReference<Runnable> cancel = new AtomicReference<>();

return Single
.<Execution>create(emitter -> {
Runnable receive = this.executionQueue.receive(item -> {
Flow flow = flowRepository.findByExecution(current);

if (item.getId().equals(current.getId())) {

if (this.isStopFollow(flow, item)) {
emitter.onSuccess(item);
}
}
});

cancel.set(receive);
})
.doFinally(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
})
.blockingGet();
}

protected <T> HttpResponse<T> validateFile(String executionId, URI path, String redirect) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ void getNotFound() {
assertThat(e.getStatus(), is(HttpStatus.NOT_FOUND));
}

private Execution triggerExecution(String namespace, String flowId, MultipartBody requestBody) {
private Execution triggerExecution(String namespace, String flowId, MultipartBody requestBody, Boolean wait) {
return client.toBlocking().retrieve(
HttpRequest
.POST("/api/v1/executions/trigger/" + namespace + "/" + flowId, requestBody)
.POST("/api/v1/executions/trigger/" + namespace + "/" + flowId + (wait ? "?wait=true" : ""), requestBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE),
Execution.class
);
Expand All @@ -106,15 +106,15 @@ private MultipartBody createInputsFlowBody() {
.build();
}

private Execution triggerInputsFlowExecution() {
private Execution triggerInputsFlowExecution(Boolean wait) {
MultipartBody requestBody = createInputsFlowBody();

return triggerExecution(TESTS_FLOW_NS, "inputs", requestBody);
return triggerExecution(TESTS_FLOW_NS, "inputs", requestBody, wait);
}

@Test
void trigger() {
Execution result = triggerInputsFlowExecution();
Execution result = triggerInputsFlowExecution(false);

assertThat(result.getState().getCurrent(), is(State.Type.CREATED));
assertThat(result.getFlowId(), is("inputs"));
Expand All @@ -123,9 +123,17 @@ void trigger() {
assertThat(result.getInputs().get("file").toString(), startsWith("kestra:///io/kestra/tests/inputs/executions/"));
}

@Test
void triggerAndWait() {
Execution result = triggerInputsFlowExecution(true);

assertThat(result.getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(result.getTaskRunList().size(), is(5));
}

@Test
void get() {
Execution result = triggerInputsFlowExecution();
Execution result = triggerInputsFlowExecution(false);

// Get the triggered execution by execution id
Execution foundExecution = client.retrieve(
Expand All @@ -150,7 +158,7 @@ void findByFlowId() {

assertThat(executionsBefore.getTotal(), is(0L));

triggerExecution(namespace, flowId, MultipartBody.builder().addPart("string", "myString").build());
triggerExecution(namespace, flowId, MultipartBody.builder().addPart("string", "myString").build(), false);

PagedResults<Execution> executionsAfter = client.toBlocking().retrieve(
HttpRequest.GET("/api/v1/executions?namespace=" + namespace + "&flowId=" + flowId),
Expand All @@ -162,7 +170,7 @@ void findByFlowId() {

@Test
void triggerAndFollow() {
Execution result = triggerInputsFlowExecution();
Execution result = triggerInputsFlowExecution(false);

RxSseClient sseClient = embeddedServer.getApplicationContext().createBean(RxSseClient.class, embeddedServer.getURL());

Expand Down

0 comments on commit 53023ee

Please sign in to comment.