Skip to content

Commit

Permalink
Tag workflow_task_execution_failed with error type (#1295)
Browse files Browse the repository at this point in the history
Tag workflow_task_execution_failed with error type. Change some panics
to illegal state panics so they are captured as non determinism.
  • Loading branch information
Quinn-With-Two-Ns authored Nov 30, 2023
1 parent 89c8dba commit a309e59
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 16 deletions.
19 changes: 10 additions & 9 deletions internal/common/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,16 @@ const (

// Metric tag keys
const (
NamespaceTagName = "namespace"
ClientTagName = "client_name"
PollerTypeTagName = "poller_type"
WorkerTypeTagName = "worker_type"
WorkflowTypeNameTagName = "workflow_type"
ActivityTypeNameTagName = "activity_type"
TaskQueueTagName = "task_queue"
OperationTagName = "operation"
CauseTagName = "cause"
NamespaceTagName = "namespace"
ClientTagName = "client_name"
PollerTypeTagName = "poller_type"
WorkerTypeTagName = "worker_type"
WorkflowTypeNameTagName = "workflow_type"
ActivityTypeNameTagName = "activity_type"
TaskQueueTagName = "task_queue"
OperationTagName = "operation"
CauseTagName = "cause"
WorkflowTaskFailureReason = "failure_reason"
)

// Metric tag values
Expand Down
7 changes: 7 additions & 0 deletions internal/common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,10 @@ func PollerTags(pollerType string) map[string]string {
PollerTypeTagName: pollerType,
}
}

// WorkflowTaskFailedTags returns a set of tags for a workflow task failure.
func WorkflowTaskFailedTags(reason string) map[string]string {
return map[string]string{
WorkflowTaskFailureReason: reason,
}
}
8 changes: 4 additions & 4 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,12 +776,12 @@ func (wc *workflowEnvironmentImpl) RequestCancelTimer(timerID TimerID) {

func validateVersion(changeID string, version, minSupported, maxSupported Version) {
if version < minSupported {
panic(fmt.Sprintf("Workflow code removed support of version %v. "+
panicIllegalState(fmt.Sprintf("Workflow code removed support of version %v. "+
"for \"%v\" changeID. The oldest supported version is %v",
version, changeID, minSupported))
}
if version > maxSupported {
panic(fmt.Sprintf("Workflow code is too old to support version %v "+
panicIllegalState(fmt.Sprintf("Workflow code is too old to support version %v "+
"for \"%v\" changeID. The maximum supported version is %v",
version, changeID, maxSupported))
}
Expand Down Expand Up @@ -857,7 +857,7 @@ func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, erro
for k := range wc.sideEffectResult {
keys = append(keys, k)
}
panic(fmt.Sprintf("No cached result found for side effectID=%v. KnownSideEffects=%v",
panicIllegalState(fmt.Sprintf("No cached result found for side effectID=%v. KnownSideEffects=%v",
sideEffectID, keys))
}

Expand Down Expand Up @@ -937,7 +937,7 @@ func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interfa

if wc.isReplay {
// This should not happen
panic(fmt.Sprintf("Non deterministic workflow code change detected. MutableSideEffect API call doesn't have a correspondent event in the workflow history. MutableSideEffect ID: %s", id))
panicIllegalState(fmt.Sprintf("Non deterministic workflow code change detected. MutableSideEffect API call doesn't have a correspondent event in the workflow history. MutableSideEffect ID: %s", id))
}

return wc.recordMutableSideEffect(id, callCount, wc.encodeValue(f()))
Expand Down
9 changes: 7 additions & 2 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,19 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(
) (response *workflowservice.RespondWorkflowTaskCompletedResponse, err error) {
metricsHandler := wtp.metricsHandler.WithTags(metrics.WorkflowTags(task.WorkflowType.GetName()))
if taskErr != nil {
metricsHandler.Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1)
wtp.logger.Warn("Failed to process workflow task.",
tagWorkflowType, task.WorkflowType.GetName(),
tagWorkflowID, task.WorkflowExecution.GetWorkflowId(),
tagRunID, task.WorkflowExecution.GetRunId(),
tagAttempt, task.Attempt,
tagError, taskErr)
completedRequest = wtp.errorToFailWorkflowTask(task.TaskToken, taskErr)
failWorkflowTask := wtp.errorToFailWorkflowTask(task.TaskToken, taskErr)
failureReason := "WorkflowError"
if failWorkflowTask.Cause == enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR {
failureReason = "NonDeterminismError"
}
metricsHandler.WithTags(metrics.WorkflowTaskFailedTags(failureReason)).Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1)
completedRequest = failWorkflowTask
}

metricsHandler.Timer(metrics.WorkflowTaskExecutionLatency).Record(time.Since(startTime))
Expand Down
19 changes: 18 additions & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ import (
"testing"
"time"

"go.opentelemetry.io/otel/baggage"

"github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally/v4"
"go.opentelemetry.io/otel/baggage"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -2625,6 +2626,20 @@ func (ts *IntegrationTestSuite) testNonDeterminismFailureCause(historyMismatch b
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

fetchMetrics := func() (localMetric int64) {
for _, counter := range ts.metricsHandler.Counters() {
counter := counter
if counter.Name == "temporal_workflow_task_execution_failed" && counter.Tags["failure_reason"] == "NonDeterminismError" {
localMetric = counter.Value()
}
}
return
}

// Confirm no metrics to start
taskFailedMetric := fetchMetrics()
ts.Zero(taskFailedMetric)

// Start workflow
forcedNonDeterminismCounter = 0
run, err := ts.client.ExecuteWorkflow(
Expand Down Expand Up @@ -2673,6 +2688,8 @@ func (ts *IntegrationTestSuite) testNonDeterminismFailureCause(historyMismatch b
// Check the task has the expected cause
ts.NoError(histErr)
ts.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR, taskFailed.Cause)
taskFailedMetric = fetchMetrics()
ts.True(taskFailedMetric > 1)
}

func (ts *IntegrationTestSuite) TestDeterminismUpsertSearchAttributesConditional() {
Expand Down

0 comments on commit a309e59

Please sign in to comment.