diff --git a/internal/common/metrics/constants.go b/internal/common/metrics/constants.go index fa2164111..764695ba4 100644 --- a/internal/common/metrics/constants.go +++ b/internal/common/metrics/constants.go @@ -85,15 +85,16 @@ const ( // Metric tag keys const ( - NamespaceTagName = "namespace" - ClientTagName = "client_name" - PollerTypeTagName = "poller_type" - WorkerTypeTagName = "worker_type" - WorkflowTypeNameTagName = "workflow_type" - ActivityTypeNameTagName = "activity_type" - TaskQueueTagName = "task_queue" - OperationTagName = "operation" - CauseTagName = "cause" + NamespaceTagName = "namespace" + ClientTagName = "client_name" + PollerTypeTagName = "poller_type" + WorkerTypeTagName = "worker_type" + WorkflowTypeNameTagName = "workflow_type" + ActivityTypeNameTagName = "activity_type" + TaskQueueTagName = "task_queue" + OperationTagName = "operation" + CauseTagName = "cause" + WorkflowTaskFailureReason = "failure_reason" ) // Metric tag values diff --git a/internal/common/metrics/tags.go b/internal/common/metrics/tags.go index 8e7e0a1de..28a1657bc 100644 --- a/internal/common/metrics/tags.go +++ b/internal/common/metrics/tags.go @@ -87,3 +87,10 @@ func PollerTags(pollerType string) map[string]string { PollerTypeTagName: pollerType, } } + +// WorkflowTaskFailedTags returns a set of tags for a workflow task failure. +func WorkflowTaskFailedTags(reason string) map[string]string { + return map[string]string{ + WorkflowTaskFailureReason: reason, + } +} diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 647b3e271..7e402b35a 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -776,12 +776,12 @@ func (wc *workflowEnvironmentImpl) RequestCancelTimer(timerID TimerID) { func validateVersion(changeID string, version, minSupported, maxSupported Version) { if version < minSupported { - panic(fmt.Sprintf("Workflow code removed support of version %v. "+ + panicIllegalState(fmt.Sprintf("Workflow code removed support of version %v. "+ "for \"%v\" changeID. The oldest supported version is %v", version, changeID, minSupported)) } if version > maxSupported { - panic(fmt.Sprintf("Workflow code is too old to support version %v "+ + panicIllegalState(fmt.Sprintf("Workflow code is too old to support version %v "+ "for \"%v\" changeID. The maximum supported version is %v", version, changeID, maxSupported)) } @@ -857,7 +857,7 @@ func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, erro for k := range wc.sideEffectResult { keys = append(keys, k) } - panic(fmt.Sprintf("No cached result found for side effectID=%v. KnownSideEffects=%v", + panicIllegalState(fmt.Sprintf("No cached result found for side effectID=%v. KnownSideEffects=%v", sideEffectID, keys)) } @@ -937,7 +937,7 @@ func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interfa if wc.isReplay { // This should not happen - panic(fmt.Sprintf("Non deterministic workflow code change detected. MutableSideEffect API call doesn't have a correspondent event in the workflow history. MutableSideEffect ID: %s", id)) + panicIllegalState(fmt.Sprintf("Non deterministic workflow code change detected. MutableSideEffect API call doesn't have a correspondent event in the workflow history. MutableSideEffect ID: %s", id)) } return wc.recordMutableSideEffect(id, callCount, wc.encodeValue(f())) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index d6db451f6..2cea8f117 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -403,14 +403,19 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics( ) (response *workflowservice.RespondWorkflowTaskCompletedResponse, err error) { metricsHandler := wtp.metricsHandler.WithTags(metrics.WorkflowTags(task.WorkflowType.GetName())) if taskErr != nil { - metricsHandler.Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1) wtp.logger.Warn("Failed to process workflow task.", tagWorkflowType, task.WorkflowType.GetName(), tagWorkflowID, task.WorkflowExecution.GetWorkflowId(), tagRunID, task.WorkflowExecution.GetRunId(), tagAttempt, task.Attempt, tagError, taskErr) - completedRequest = wtp.errorToFailWorkflowTask(task.TaskToken, taskErr) + failWorkflowTask := wtp.errorToFailWorkflowTask(task.TaskToken, taskErr) + failureReason := "WorkflowError" + if failWorkflowTask.Cause == enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR { + failureReason = "NonDeterminismError" + } + metricsHandler.WithTags(metrics.WorkflowTaskFailedTags(failureReason)).Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1) + completedRequest = failWorkflowTask } metricsHandler.Timer(metrics.WorkflowTaskExecutionLatency).Record(time.Since(startTime)) diff --git a/test/integration_test.go b/test/integration_test.go index c9a212255..754a14da7 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -35,13 +35,14 @@ import ( "testing" "time" + "go.opentelemetry.io/otel/baggage" + "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber-go/tally/v4" - "go.opentelemetry.io/otel/baggage" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.opentelemetry.io/otel/trace" @@ -2625,6 +2626,20 @@ func (ts *IntegrationTestSuite) testNonDeterminismFailureCause(historyMismatch b ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + fetchMetrics := func() (localMetric int64) { + for _, counter := range ts.metricsHandler.Counters() { + counter := counter + if counter.Name == "temporal_workflow_task_execution_failed" && counter.Tags["failure_reason"] == "NonDeterminismError" { + localMetric = counter.Value() + } + } + return + } + + // Confirm no metrics to start + taskFailedMetric := fetchMetrics() + ts.Zero(taskFailedMetric) + // Start workflow forcedNonDeterminismCounter = 0 run, err := ts.client.ExecuteWorkflow( @@ -2673,6 +2688,8 @@ func (ts *IntegrationTestSuite) testNonDeterminismFailureCause(historyMismatch b // Check the task has the expected cause ts.NoError(histErr) ts.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR, taskFailed.Cause) + taskFailedMetric = fetchMetrics() + ts.True(taskFailedMetric > 1) } func (ts *IntegrationTestSuite) TestDeterminismUpsertSearchAttributesConditional() {