Skip to content

Commit

Permalink
Fix swallowed exception - Improve output exception message (#185)
Browse files Browse the repository at this point in the history
* fix issue 184 - improve output exception message

update changelog

fix same issue for handleTaskCompleted, handleSubOrchestrationCompleted

improve the example and e2e tests

* add deserialize erro test - add POJO input sample

* quick test

* improve test method name

* improve samples
  • Loading branch information
kaibocai authored Nov 29, 2023
1 parent ad8c959 commit b3e2d3a
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## placeholder
* Fix infinite loop when use continueasnew after wait external event ([#183](https://github.com/microsoft/durabletask-java/pull/183))
* Fix the issue "Deserialize Exception got swallowed when use anyOf with external event." ([#185](https://github.com/microsoft/durabletask-java/pull/185))

## v1.5.0
* Fix exception type issue when using `RetriableTask` in fan in/out pattern ([#174](https://github.com/microsoft/durabletask-java/pull/174))
* Add implementation to generate name-based deterministic UUID ([#176](https://github.com/microsoft/durabletask-java/pull/176))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import com.microsoft.azure.functions.internal.spi.middleware.Middleware;
import com.microsoft.azure.functions.internal.spi.middleware.MiddlewareChain;
import com.microsoft.azure.functions.internal.spi.middleware.MiddlewareContext;
import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.DataConverter;
import com.microsoft.durabletask.OrchestrationRunner;
import com.microsoft.durabletask.TaskFailedException;
import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
import com.microsoft.durabletask.interruption.OrchestratorBlockedException;

Expand Down Expand Up @@ -48,6 +51,23 @@ public void invoke(MiddlewareContext context, MiddlewareChain chain) throws Exce
if (cause instanceof ContinueAsNewInterruption) {
throw (ContinueAsNewInterruption) cause;
}
// Below types of exception are raised by the client sdk, they data should be correctly pass back to
// durable function host. We need to cast them to the correct type so later when build the FailureDetails
// the correct exception data can be saved and pass back.
if (cause instanceof TaskFailedException) {
throw (TaskFailedException) cause;
}

if (cause instanceof CompositeTaskFailedException) {
throw (CompositeTaskFailedException) cause;
}

if (cause instanceof DataConverter.DataConverterException) {
throw (DataConverter.DataConverterException) cause;
}
// e will be InvocationTargetException as using reflection, so we wrap it into a RuntimeException, so it
// won't change the current OrchestratorFunction API. We cannot throw the cause which is a Throwable, it
// requires update on OrchestratorFunction API.
throw new RuntimeException("Unexpected failure in the task execution", e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.logging.Logger;
import java.util.stream.Collectors;

final class TaskOrchestrationExecutor {

Expand Down Expand Up @@ -511,10 +510,13 @@ private void handleTaskCompleted(HistoryEvent e) {
rawResult != null ? rawResult : "(null)"));

}

Object result = this.dataConverter.deserialize(rawResult, record.getDataType());
CompletableTask task = record.getTask();
task.complete(result);
try {
Object result = this.dataConverter.deserialize(rawResult, record.getDataType());
task.complete(result);
} catch (Exception ex) {
task.completeExceptionally(ex);
}
}

private void handleTaskFailed(HistoryEvent e) {
Expand Down Expand Up @@ -558,11 +560,15 @@ private void handleEventRaised(HistoryEvent e) {
this.outstandingEvents.remove(eventName);
}
String rawResult = eventRaised.getInput().getValue();
Object result = this.dataConverter.deserialize(
rawResult,
matchingTaskRecord.getDataType());
CompletableTask task = matchingTaskRecord.getTask();
task.complete(result);
try {
Object result = this.dataConverter.deserialize(
rawResult,
matchingTaskRecord.getDataType());
task.complete(result);
} catch (Exception ex) {
task.completeExceptionally(ex);
}
}

private void handleEventWhileSuspended (HistoryEvent historyEvent){
Expand Down Expand Up @@ -694,10 +700,13 @@ private void handleSubOrchestrationCompleted(HistoryEvent e) {
rawResult != null ? rawResult : "(null)"));

}

Object result = this.dataConverter.deserialize(rawResult, record.getDataType());
CompletableTask task = record.getTask();
task.complete(result);
try {
Object result = this.dataConverter.deserialize(rawResult, record.getDataType());
task.complete(result);
} catch (Exception ex) {
task.completeExceptionally(ex);
}
}

private void handleSubOrchestrationFailed(HistoryEvent e){
Expand Down Expand Up @@ -1331,6 +1340,10 @@ protected void handleException(Throwable e) {
throw (CompositeTaskFailedException)e;
}

if (e instanceof DataConverter.DataConverterException) {
throw (DataConverter.DataConverterException)e;
}

throw new RuntimeException("Unexpected failure in the task execution", e);
}

Expand Down
103 changes: 103 additions & 0 deletions endtoendtests/src/main/java/com/functions/DeserializeErrorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.functions;

import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.HttpMethod;
import com.microsoft.azure.functions.HttpRequestMessage;
import com.microsoft.azure.functions.HttpResponseMessage;
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.HttpTrigger;
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskOrchestrationContext;
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;

import java.util.Optional;

public class DeserializeErrorTest {
@FunctionName("DeserializeErrorHttp")
public HttpResponseMessage deserializeErrorHttp(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS)
HttpRequestMessage<Optional<String>> request,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");

DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance("DeserializeErrorOrchestrator");
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
return durableContext.createCheckStatusResponse(request, instanceId);
}

@FunctionName("DeserializeErrorOrchestrator")
public String deserializeErrorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// cause deserialize error
Person result = ctx.callActivity("Capitalize", "Austin", Person.class).await();
return result.getName();
}

@FunctionName("SubCompletedErrorHttp")
public HttpResponseMessage subCompletedErrorHttp(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");

DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance("CompletedErrorOrchestrator");
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
return durableContext.createCheckStatusResponse(request, instanceId);
}

@FunctionName("CompletedErrorOrchestrator")
public String completedErrorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// cause deserialize issue
Person result = ctx.callSubOrchestrator("CompletedErrorSubOrchestrator", "Austin", Person.class).await();
return result.getName();
}

@FunctionName("CompletedErrorSubOrchestrator")
public String completedErrorSubOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
return "test";
}

@FunctionName("ExternalEventHttp")
public HttpResponseMessage externalEventHttp(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");

DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance("ExternalEventActivity");
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
return durableContext.createCheckStatusResponse(request, instanceId);
}

@FunctionName("ExternalEventActivity")
public void externalEventActivity(@DurableOrchestrationTrigger(name = "runtimeState") TaskOrchestrationContext ctx)
{
System.out.println("Waiting external event...");
Task<String> event = ctx.waitForExternalEvent("event", String.class);
Task<?> result = ctx.anyOf(event).await();
Object input = result.await();
System.out.println(input);
}

static class Person {
String name;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}
}
47 changes: 47 additions & 0 deletions endtoendtests/src/test/java/com/functions/EndToEndTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,53 @@ public void suspendResume() throws InterruptedException {
assertTrue(completed);
}

@Test
public void externalEventDeserializeFail() throws InterruptedException {
String startOrchestrationPath = "api/ExternalEventHttp";
Response response = post(startOrchestrationPath);
JsonPath jsonPath = response.jsonPath();
String sendEventPostUri = jsonPath.get("sendEventPostUri");
sendEventPostUri = sendEventPostUri.replace("{eventName}", "event");

String requestBody = "{\"value\":\"Test\"}";
RestAssured
.given()
.contentType(ContentType.JSON) // Set the request content type
.body(requestBody) // Set the request body
.post(sendEventPostUri)
.then()
.statusCode(202);
// assert orchestration status
String statusQueryGetUri = jsonPath.get("statusQueryGetUri");
boolean completed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10));
assertTrue(completed);

// assert exception message
Response resp = get(statusQueryGetUri);
String errorMessage = resp.jsonPath().get("output");
assertTrue(errorMessage.contains("Failed to deserialize the JSON text to java.lang.String"));
}

@ParameterizedTest
@ValueSource(strings = {
"DeserializeErrorHttp",
"SubCompletedErrorHttp"
})
public void DeserializeFail(String functionName) throws InterruptedException {
String startOrchestrationPath = "api/" + functionName;
Response response = post(startOrchestrationPath);
JsonPath jsonPath = response.jsonPath();
// assert orchestration status
String statusQueryGetUri = jsonPath.get("statusQueryGetUri");
boolean completed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10));
assertTrue(completed);

// assert exception message
Response resp = get(statusQueryGetUri);
String errorMessage = resp.jsonPath().get("output");
assertTrue(errorMessage.contains("Failed to deserialize the JSON text"));
}

private boolean pollingCheck(String statusQueryGetUri,
String expectedState,
Set<String> continueStates,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.functions;

import com.functions.model.Person;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
import java.util.*;
Expand Down Expand Up @@ -55,4 +56,29 @@ public String capitalize(
context.getLogger().info("Capitalizing: " + name);
return name.toUpperCase();
}

// Orchestration with POJO input
@FunctionName("StartOrchestrationPOJO")
public HttpResponseMessage startOrchestrationPOJO(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");

Person person = new Person();
person.setName("testname");

DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance("Person", person);
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
return durableContext.createCheckStatusResponse(request, instanceId);
}

@FunctionName("Person")
public Person personOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
Person person = ctx.getInput(Person.class);
person.setName(ctx.callActivity("Capitalize", person.getName(), String.class).await());
return person;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.functions;

import com.functions.model.Person;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.HttpMethod;
import com.microsoft.azure.functions.HttpRequestMessage;
import com.microsoft.azure.functions.HttpResponseMessage;
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.HttpTrigger;
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.TaskOrchestrationContext;
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;

import java.util.Optional;

public class SubOrchestrator {

@FunctionName("SubOrchestratorHttp")
public HttpResponseMessage subOrchestratorHttp(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");

DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance("RootOrchestrator");
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
return durableContext.createCheckStatusResponse(request, instanceId);
}

@FunctionName("RootOrchestrator")
public String rootOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
return ctx.callSubOrchestrator("SubOrchestrator", "Austin", String.class).await();
}

@FunctionName("SubOrchestrator")
public String subOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
return ctx.callSubOrchestrator("Capitalize", input, String.class).await();
}
}
Loading

0 comments on commit b3e2d3a

Please sign in to comment.