diff --git a/CHANGELOG.md b/CHANGELOG.md index 9259ed3..f7c68d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## placeholder +* Fix infinite loop when use continueasnew after wait external event ([#183](https://github.com/microsoft/durabletask-java/pull/183)) +* Fix the issue "Deserialize Exception got swallowed when use anyOf with external event." ([#185](https://github.com/microsoft/durabletask-java/pull/185)) + ## v1.5.0 * Fix exception type issue when using `RetriableTask` in fan in/out pattern ([#174](https://github.com/microsoft/durabletask-java/pull/174)) * Add implementation to generate name-based deterministic UUID ([#176](https://github.com/microsoft/durabletask-java/pull/176)) diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java index 0c26e2f..02be55f 100644 --- a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java +++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java @@ -9,7 +9,10 @@ import com.microsoft.azure.functions.internal.spi.middleware.Middleware; import com.microsoft.azure.functions.internal.spi.middleware.MiddlewareChain; import com.microsoft.azure.functions.internal.spi.middleware.MiddlewareContext; +import com.microsoft.durabletask.CompositeTaskFailedException; +import com.microsoft.durabletask.DataConverter; import com.microsoft.durabletask.OrchestrationRunner; +import com.microsoft.durabletask.TaskFailedException; import com.microsoft.durabletask.interruption.ContinueAsNewInterruption; import com.microsoft.durabletask.interruption.OrchestratorBlockedException; @@ -48,6 +51,23 @@ public void invoke(MiddlewareContext context, MiddlewareChain chain) throws Exce if (cause instanceof ContinueAsNewInterruption) { throw (ContinueAsNewInterruption) cause; } + // Below types of exception are raised by the client sdk, they data should be correctly pass back to + // durable function host. We need to cast them to the correct type so later when build the FailureDetails + // the correct exception data can be saved and pass back. + if (cause instanceof TaskFailedException) { + throw (TaskFailedException) cause; + } + + if (cause instanceof CompositeTaskFailedException) { + throw (CompositeTaskFailedException) cause; + } + + if (cause instanceof DataConverter.DataConverterException) { + throw (DataConverter.DataConverterException) cause; + } + // e will be InvocationTargetException as using reflection, so we wrap it into a RuntimeException, so it + // won't change the current OrchestratorFunction API. We cannot throw the cause which is a Throwable, it + // requires update on OrchestratorFunction API. throw new RuntimeException("Unexpected failure in the task execution", e); } }); diff --git a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java index b79ed00..4b55bd4 100644 --- a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java @@ -22,7 +22,6 @@ import java.util.function.Function; import java.util.function.IntFunction; import java.util.logging.Logger; -import java.util.stream.Collectors; final class TaskOrchestrationExecutor { @@ -511,10 +510,13 @@ private void handleTaskCompleted(HistoryEvent e) { rawResult != null ? rawResult : "(null)")); } - - Object result = this.dataConverter.deserialize(rawResult, record.getDataType()); CompletableTask task = record.getTask(); - task.complete(result); + try { + Object result = this.dataConverter.deserialize(rawResult, record.getDataType()); + task.complete(result); + } catch (Exception ex) { + task.completeExceptionally(ex); + } } private void handleTaskFailed(HistoryEvent e) { @@ -558,11 +560,15 @@ private void handleEventRaised(HistoryEvent e) { this.outstandingEvents.remove(eventName); } String rawResult = eventRaised.getInput().getValue(); - Object result = this.dataConverter.deserialize( - rawResult, - matchingTaskRecord.getDataType()); CompletableTask task = matchingTaskRecord.getTask(); - task.complete(result); + try { + Object result = this.dataConverter.deserialize( + rawResult, + matchingTaskRecord.getDataType()); + task.complete(result); + } catch (Exception ex) { + task.completeExceptionally(ex); + } } private void handleEventWhileSuspended (HistoryEvent historyEvent){ @@ -694,10 +700,13 @@ private void handleSubOrchestrationCompleted(HistoryEvent e) { rawResult != null ? rawResult : "(null)")); } - - Object result = this.dataConverter.deserialize(rawResult, record.getDataType()); CompletableTask task = record.getTask(); - task.complete(result); + try { + Object result = this.dataConverter.deserialize(rawResult, record.getDataType()); + task.complete(result); + } catch (Exception ex) { + task.completeExceptionally(ex); + } } private void handleSubOrchestrationFailed(HistoryEvent e){ @@ -1331,6 +1340,10 @@ protected void handleException(Throwable e) { throw (CompositeTaskFailedException)e; } + if (e instanceof DataConverter.DataConverterException) { + throw (DataConverter.DataConverterException)e; + } + throw new RuntimeException("Unexpected failure in the task execution", e); } diff --git a/endtoendtests/src/main/java/com/functions/DeserializeErrorTest.java b/endtoendtests/src/main/java/com/functions/DeserializeErrorTest.java new file mode 100644 index 0000000..7e376d2 --- /dev/null +++ b/endtoendtests/src/main/java/com/functions/DeserializeErrorTest.java @@ -0,0 +1,103 @@ +package com.functions; + +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.Task; +import com.microsoft.durabletask.TaskOrchestrationContext; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; + +import java.util.Optional; + +public class DeserializeErrorTest { + @FunctionName("DeserializeErrorHttp") + public HttpResponseMessage deserializeErrorHttp( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) + HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Java HTTP trigger processed a request."); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("DeserializeErrorOrchestrator"); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + @FunctionName("DeserializeErrorOrchestrator") + public String deserializeErrorOrchestrator( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + // cause deserialize error + Person result = ctx.callActivity("Capitalize", "Austin", Person.class).await(); + return result.getName(); + } + + @FunctionName("SubCompletedErrorHttp") + public HttpResponseMessage subCompletedErrorHttp( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Java HTTP trigger processed a request."); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("CompletedErrorOrchestrator"); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + @FunctionName("CompletedErrorOrchestrator") + public String completedErrorOrchestrator( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + // cause deserialize issue + Person result = ctx.callSubOrchestrator("CompletedErrorSubOrchestrator", "Austin", Person.class).await(); + return result.getName(); + } + + @FunctionName("CompletedErrorSubOrchestrator") + public String completedErrorSubOrchestrator( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + return "test"; + } + + @FunctionName("ExternalEventHttp") + public HttpResponseMessage externalEventHttp( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Java HTTP trigger processed a request."); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("ExternalEventActivity"); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + @FunctionName("ExternalEventActivity") + public void externalEventActivity(@DurableOrchestrationTrigger(name = "runtimeState") TaskOrchestrationContext ctx) + { + System.out.println("Waiting external event..."); + Task event = ctx.waitForExternalEvent("event", String.class); + Task result = ctx.anyOf(event).await(); + Object input = result.await(); + System.out.println(input); + } + + static class Person { + String name; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java index cad43bd..53d40dc 100644 --- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java +++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java @@ -215,6 +215,53 @@ public void suspendResume() throws InterruptedException { assertTrue(completed); } + @Test + public void externalEventDeserializeFail() throws InterruptedException { + String startOrchestrationPath = "api/ExternalEventHttp"; + Response response = post(startOrchestrationPath); + JsonPath jsonPath = response.jsonPath(); + String sendEventPostUri = jsonPath.get("sendEventPostUri"); + sendEventPostUri = sendEventPostUri.replace("{eventName}", "event"); + + String requestBody = "{\"value\":\"Test\"}"; + RestAssured + .given() + .contentType(ContentType.JSON) // Set the request content type + .body(requestBody) // Set the request body + .post(sendEventPostUri) + .then() + .statusCode(202); + // assert orchestration status + String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); + boolean completed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10)); + assertTrue(completed); + + // assert exception message + Response resp = get(statusQueryGetUri); + String errorMessage = resp.jsonPath().get("output"); + assertTrue(errorMessage.contains("Failed to deserialize the JSON text to java.lang.String")); + } + + @ParameterizedTest + @ValueSource(strings = { + "DeserializeErrorHttp", + "SubCompletedErrorHttp" + }) + public void DeserializeFail(String functionName) throws InterruptedException { + String startOrchestrationPath = "api/" + functionName; + Response response = post(startOrchestrationPath); + JsonPath jsonPath = response.jsonPath(); + // assert orchestration status + String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); + boolean completed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10)); + assertTrue(completed); + + // assert exception message + Response resp = get(statusQueryGetUri); + String errorMessage = resp.jsonPath().get("output"); + assertTrue(errorMessage.contains("Failed to deserialize the JSON text")); + } + private boolean pollingCheck(String statusQueryGetUri, String expectedState, Set continueStates, diff --git a/samples-azure-functions/src/main/java/com/functions/AzureFunctions.java b/samples-azure-functions/src/main/java/com/functions/AzureFunctions.java index ad80437..755e50a 100644 --- a/samples-azure-functions/src/main/java/com/functions/AzureFunctions.java +++ b/samples-azure-functions/src/main/java/com/functions/AzureFunctions.java @@ -1,5 +1,6 @@ package com.functions; +import com.functions.model.Person; import com.microsoft.azure.functions.annotation.*; import com.microsoft.azure.functions.*; import java.util.*; @@ -55,4 +56,29 @@ public String capitalize( context.getLogger().info("Capitalizing: " + name); return name.toUpperCase(); } + + // Orchestration with POJO input + @FunctionName("StartOrchestrationPOJO") + public HttpResponseMessage startOrchestrationPOJO( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Java HTTP trigger processed a request."); + + Person person = new Person(); + person.setName("testname"); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("Person", person); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + @FunctionName("Person") + public Person personOrchestrator( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + Person person = ctx.getInput(Person.class); + person.setName(ctx.callActivity("Capitalize", person.getName(), String.class).await()); + return person; + } } diff --git a/samples-azure-functions/src/main/java/com/functions/SubOrchestrator.java b/samples-azure-functions/src/main/java/com/functions/SubOrchestrator.java new file mode 100644 index 0000000..f082acc --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/SubOrchestrator.java @@ -0,0 +1,46 @@ +package com.functions; + +import com.functions.model.Person; +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.TaskOrchestrationContext; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; + +import java.util.Optional; + +public class SubOrchestrator { + + @FunctionName("SubOrchestratorHttp") + public HttpResponseMessage subOrchestratorHttp( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Java HTTP trigger processed a request."); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("RootOrchestrator"); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + @FunctionName("RootOrchestrator") + public String rootOrchestrator( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + return ctx.callSubOrchestrator("SubOrchestrator", "Austin", String.class).await(); + } + + @FunctionName("SubOrchestrator") + public String subOrchestrator( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + String input = ctx.getInput(String.class); + return ctx.callSubOrchestrator("Capitalize", input, String.class).await(); + } +} diff --git a/samples-azure-functions/src/main/java/com/functions/WaitExternalEvent.java b/samples-azure-functions/src/main/java/com/functions/WaitExternalEvent.java new file mode 100644 index 0000000..88fba1c --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/WaitExternalEvent.java @@ -0,0 +1,48 @@ +package com.functions; + +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.Task; +import com.microsoft.durabletask.TaskOrchestrationContext; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; + +import java.time.Duration; +import java.util.Optional; + +public class WaitExternalEvent { + @FunctionName("ExternalEventHttp") + public HttpResponseMessage externalEventHttp( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Java HTTP trigger processed a request."); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("ExternalEventOrchestrator"); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + @FunctionName("ExternalEventOrchestrator") + public void externalEventOrchestrator(@DurableOrchestrationTrigger(name = "runtimeState") TaskOrchestrationContext ctx) + { + System.out.println("Waiting external event..."); + Task event = ctx.waitForExternalEvent("event", String.class); + Task timer = ctx.createTimer(Duration.ofSeconds(10)); + Task winner = ctx.anyOf(event, timer).await(); + if (winner == event) { + String eventResult = event.await(); + ctx.complete(eventResult); + } else { + ctx.complete("time out"); + } + } +} diff --git a/samples-azure-functions/src/main/java/com/functions/model/Person.java b/samples-azure-functions/src/main/java/com/functions/model/Person.java new file mode 100644 index 0000000..26a3d0b --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/model/Person.java @@ -0,0 +1,13 @@ +package com.functions.model; + +public class Person { + String name; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} diff --git a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java index 30245e8..279854f 100644 --- a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java +++ b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java @@ -34,10 +34,10 @@ public void setupHost() { @ParameterizedTest @ValueSource(strings = { - "StartOrchestration", - "StartParallelOrchestration", - "StartParallelAnyOf", - "StartParallelCatchException" + "StartOrchestration", + "StartParallelOrchestration", + "StartParallelAnyOf", + "StartParallelCatchException" }) public void generalFunctions(String functionName) throws InterruptedException { Set continueStates = new HashSet<>(); @@ -114,12 +114,12 @@ public void continueAsNewExternalEvent() throws InterruptedException { // empty request body RestAssured - .given() - .contentType(ContentType.JSON) // Set the request content type - .body("{}") - .post(sendEventPostUri) - .then() - .statusCode(202); + .given() + .contentType(ContentType.JSON) // Set the request content type + .body("{}") + .post(sendEventPostUri) + .then() + .statusCode(202); //wait 5 seconds for the continue-as-new to start new orchestration TimeUnit.SECONDS.sleep(5); @@ -198,12 +198,12 @@ public void suspendResume() throws InterruptedException { String requestBody = "{\"value\":\"Test\"}"; RestAssured - .given() - .contentType(ContentType.JSON) // Set the request content type - .body(requestBody) // Set the request body - .post(sendEventPostUri) - .then() - .statusCode(202); + .given() + .contentType(ContentType.JSON) // Set the request content type + .body(requestBody) // Set the request body + .post(sendEventPostUri) + .then() + .statusCode(202); boolean suspendAfterEventSent = pollingCheck(statusQueryGetUri, "Suspended", null, Duration.ofSeconds(5)); assertTrue(suspendAfterEventSent); @@ -215,6 +215,20 @@ public void suspendResume() throws InterruptedException { assertTrue(completed); } + @Test + public void orchestrationPOJO() throws InterruptedException { + Set continueStates = new HashSet<>(); + String startOrchestrationPath = "/api/StartOrchestrationPOJO"; + Response response = post(startOrchestrationPath); + JsonPath jsonPath = response.jsonPath(); + String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); + boolean pass = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(20)); + assertTrue(pass); + Response statusResponse = get(statusQueryGetUri); + String outputName = statusResponse.jsonPath().get("output.name"); + assertEquals("TESTNAME", outputName); + } + private boolean pollingCheck(String statusQueryGetUri, String expectedState, Set continueStates,