Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure WorkflowTaskExecutionFailureCounter is called with a tag #1658

Merged
merged 2 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
}
defer func() {
if p := recover(); p != nil {
weh.metricsHandler.Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1)
incrementWorkflowTaskFailureCounter(weh.metricsHandler, "NonDeterminismError")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the panics here are always caused just by non determinism, what is users workflow code panics because of a code bug?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I am pretty sure we can still end up emitting emitting this metric twice , that should be addressed to close #1450

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the panics here are always caused just by non determinism, what is users workflow code panics because of a code bug?

From my testing, when a user's workflow code panics, we trigger the call in internal_task_pollers.go. The only way I saw us recover a panic from the defer call is when we panicIllegalState from the state machine. But to be safe, I can add a check to check the panic message for TMPRL1100 to use NonDeterminismError and default to WorkflowError for all other scenarios.

Also I am pretty sure we can still end up emitting emitting this metric twice , that should be addressed to close #1450

Do you have any idea what scenario could cause this double logging? I've tried to hit this scenario the last few days and haven't been able to come up with anything.

The issue itself is on Prometheus erroring out due to the metric being emitted twice, once due to having a tag and the other without, which is now resolved with this PR (calling with different logs doesn't cause Prometheus to throw an error). But I'm happy to address this double logging, I'm just not able to find a scenario where we double log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure let me try to come up with a counter example

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got a scenario that hits both ProcessEvent and RespondTaskCompletedWithMetrics. It looks like when both of these functions emit this metric, only the RespondTaskCompletedWithMetrics inc gets counted.

A few scenarios I ran:

  1. When RespondTaskCompletedWithMetrics is the only one emitting the metric, the count is 1
  2. When ProcessEvent is the only one emitting the metric, the count is 0
  3. When neither emit the metric, the metric doesn't get emitted to http://localhost:9090/metrics altogether.

Is there some kind of scoping or something that's causing the earlier ProcessEvent metric to get reset? I'm using the metrics sample to emit metrics to localhost://9090/metrics https://github.com/yuandrew/playground/tree/duplicate_logging

topLine := fmt.Sprintf("process event for %s [panic]:", weh.workflowInfo.TaskQueueName)
st := getStackTraceRaw(topLine, 7, 0)
weh.Complete(nil, newWorkflowPanicError(p, st))
Expand Down Expand Up @@ -1373,7 +1373,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessMessage(
) error {
defer func() {
if p := recover(); p != nil {
weh.metricsHandler.Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1)
incrementWorkflowTaskFailureCounter(weh.metricsHandler, "NonDeterminismError")
topLine := fmt.Sprintf("process message for %s [panic]:", weh.workflowInfo.TaskQueueName)
st := getStackTraceRaw(topLine, 7, 0)
weh.Complete(nil, newWorkflowPanicError(p, st))
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(
if failWorkflowTask.Cause == enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR {
failureReason = "NonDeterminismError"
}
metricsHandler.WithTags(metrics.WorkflowTaskFailedTags(failureReason)).Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1)
incrementWorkflowTaskFailureCounter(metricsHandler, failureReason)
completedRequest = failWorkflowTask
}

Expand Down
4 changes: 4 additions & 0 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1960,3 +1960,7 @@ func (s *semaphoreImpl) Release(n int64) {
panic("Semaphore.Release() released more than held")
}
}

func incrementWorkflowTaskFailureCounter(metricsHandler metrics.Handler, failureReason string) {
metricsHandler.WithTags(metrics.WorkflowTaskFailedTags(failureReason)).Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1)
}
Loading