Skip to content

Commit

Permalink
Filter out Temporal errors from SyncWorkflowImpl (#19293)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev authored Nov 10, 2022
1 parent cc93c46 commit 6b92151
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
public class TemporalSdkInterceptor implements TraceInterceptor {

/**
* Trace resource name used to scope the filtering performed by this interceptor.
* Connection Manager trace resource name used to scope the filtering performed by this interceptor.
*/
static final String CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME = "ConnectionManagerWorkflowImpl.run";

/**
* Sync Workflow trace resource name used to scope the filtering performed by this interceptor.
*/
static final String SYNC_WORKFLOW_IMPL_RESOURCE_NAME = "SyncWorkflowImpl.run";

/**
* Error message tag key name that contains the Temporal exit error message.
*/
Expand Down Expand Up @@ -79,7 +84,8 @@ boolean isExitTrace(final MutableSpan trace) {
return trace.isError() &&
EXIT_ERROR_MESSAGE.equalsIgnoreCase(trace.getTags().getOrDefault(ERROR_MESSAGE_TAG_KEY, "").toString()) &&
(safeEquals(trace.getOperationName(), WORKFLOW_TRACE_OPERATION_NAME)
|| safeEquals(trace.getResourceName(), CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME));
|| safeEquals(trace.getResourceName(), CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME)
|| safeEquals(trace.getResourceName(), SYNC_WORKFLOW_IMPL_RESOURCE_NAME));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME;
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.ERROR_MESSAGE_TAG_KEY;
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.EXIT_ERROR_MESSAGE;
import static io.airbyte.workers.tracing.TemporalSdkInterceptor.SYNC_WORKFLOW_IMPL_RESOURCE_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -39,10 +40,15 @@ void testOnTraceComplete() {
temporalExitMsgOperationNameError.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
temporalExitMsgOperationNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);

final var temporalExitMsgResourceNameError = new DummySpan();
temporalExitMsgResourceNameError.setError(true);
temporalExitMsgResourceNameError.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
temporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);
final var connectionManagerTemporalExitMsgResourceNameError = new DummySpan();
connectionManagerTemporalExitMsgResourceNameError.setError(true);
connectionManagerTemporalExitMsgResourceNameError.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
connectionManagerTemporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);

final var syncWorkflowTemporalExitMsgResourceNameError = new DummySpan();
syncWorkflowTemporalExitMsgResourceNameError.setError(true);
syncWorkflowTemporalExitMsgResourceNameError.setResourceName(SYNC_WORKFLOW_IMPL_RESOURCE_NAME);
syncWorkflowTemporalExitMsgResourceNameError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);

final var temporalExitMsgOtherOperationError = new DummySpan();
temporalExitMsgOtherOperationError.setError(true);
Expand All @@ -55,7 +61,8 @@ void testOnTraceComplete() {
temporalExitMsgOtherResourceError.setTag(ERROR_MESSAGE_TAG_KEY, EXIT_ERROR_MESSAGE);

final var spans = List.of(
simple, noError, otherError, temporalExitMsgOperationNameError, temporalExitMsgResourceNameError, temporalExitMsgOtherOperationError,
simple, noError, otherError, temporalExitMsgOperationNameError, connectionManagerTemporalExitMsgResourceNameError,
syncWorkflowTemporalExitMsgResourceNameError, temporalExitMsgOtherOperationError,
temporalExitMsgOtherResourceError);

final var interceptor = new TemporalSdkInterceptor();
Expand All @@ -66,7 +73,8 @@ void testOnTraceComplete() {
assertFalse(noError.isError());
assertTrue(otherError.isError());
assertFalse(temporalExitMsgOperationNameError.isError());
assertFalse(temporalExitMsgResourceNameError.isError());
assertFalse(connectionManagerTemporalExitMsgResourceNameError.isError());
assertFalse(syncWorkflowTemporalExitMsgResourceNameError.isError());
assertTrue(temporalExitMsgOtherOperationError.isError());
assertTrue(temporalExitMsgOtherResourceError.isError());
}
Expand All @@ -91,10 +99,15 @@ void testIsExitTrace() {
temporalTraceWithErrorAndOperationName.setOperationName(WORKFLOW_TRACE_OPERATION_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndOperationName));

final var temporalTraceWithErrorAndResourceName = new DummySpan();
temporalTraceWithErrorAndResourceName.setError(true);
temporalTraceWithErrorAndResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndResourceName));
final var temporalTraceWithErrorAndConnectionManagerResourceName = new DummySpan();
temporalTraceWithErrorAndConnectionManagerResourceName.setError(true);
temporalTraceWithErrorAndConnectionManagerResourceName.setResourceName(CONNECTION_MANAGER_WORKFLOW_IMPL_RESOURCE_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndConnectionManagerResourceName));

final var temporalTraceWithErrorAndSyncWorkflowResourceName = new DummySpan();
temporalTraceWithErrorAndSyncWorkflowResourceName.setError(true);
temporalTraceWithErrorAndSyncWorkflowResourceName.setResourceName(SYNC_WORKFLOW_IMPL_RESOURCE_NAME);
assertEquals(false, interceptor.isExitTrace(temporalTraceWithErrorAndSyncWorkflowResourceName));

final var temporalTraceWithExitErrorAndOperationName = new DummySpan();
temporalTraceWithExitErrorAndOperationName.setError(true);
Expand Down

0 comments on commit 6b92151

Please sign in to comment.