Skip to content

Commit

Permalink
Added Attempt tag to logs (#320)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfateev authored Dec 29, 2020
1 parent 4f37b7f commit ca5205b
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 29 deletions.
1 change: 1 addition & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func WithActivityTask(
logger = ilog.With(logger,
tagActivityID, task.ActivityId,
tagActivityType, task.ActivityType.Name,
tagAttempt, task.Attempt,
tagWorkflowType, task.WorkflowType.Name,
tagWorkflowID, task.WorkflowExecution.WorkflowId,
tagRunID, task.WorkflowExecution.RunId,
Expand Down
1 change: 1 addition & 0 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func newWorkflowExecutionEventHandler(
tagWorkflowType, workflowInfo.WorkflowType.Name,
tagWorkflowID, workflowInfo.WorkflowExecution.ID,
tagRunID, workflowInfo.WorkflowExecution.RunID,
tagAttempt, workflowInfo.Attempt,
),
&context.isReplay,
&context.enableLoggingInReplay)
Expand Down
45 changes: 26 additions & 19 deletions internal/internal_logging_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,30 @@
package internal

const (
tagActivityID = "ActivityID"
tagActivityType = "ActivityType"
tagNamespace = "Namespace"
tagEventID = "EventID"
tagEventType = "EventType"
tagRunID = "RunID"
tagTaskQueue = "TaskQueue"
tagTimerID = "TimerID"
tagWorkflowID = "WorkflowID"
tagWorkflowType = "WorkflowType"
tagWorkerID = "WorkerID"
tagWorkerType = "WorkerType"
tagSideEffectID = "SideEffectID"
tagChildWorkflowID = "ChildWorkflowID"
tagLocalActivityType = "LocalActivityType"
tagQueryType = "QueryType"
tagResult = "Result"
tagError = "Error"
tagStackTrace = "StackTrace"
tagActivityID = "ActivityID"
tagActivityType = "ActivityType"
tagNamespace = "Namespace"
tagEventID = "EventID"
tagEventType = "EventType"
tagRunID = "RunID"
tagTaskQueue = "TaskQueue"
tagTimerID = "TimerID"
tagWorkflowID = "WorkflowID"
tagWorkflowType = "WorkflowType"
tagWorkerID = "WorkerID"
tagWorkerType = "WorkerType"
tagSideEffectID = "SideEffectID"
tagChildWorkflowID = "ChildWorkflowID"
tagLocalActivityType = "LocalActivityType"
tagQueryType = "QueryType"
tagResult = "Result"
tagError = "Error"
tagStackTrace = "StackTrace"
tagAttempt = "Attempt"
tagTaskFirstEventID = "TaskFirstEventID"
tagTaskStartedEventID = "TaskStartedEventID"
tagPreviousStartedEventID = "PreviousStartedEventID"
tagCachedPreviousStartedEventID = "CachedPreviousStartedEventID"
tagPanicError = "PanicError"
tagPanicStack = "PanicStack"
)
25 changes: 17 additions & 8 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,8 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
tagWorkflowType, task.WorkflowType.GetName(),
tagWorkflowID, workflowID,
tagRunID, runID,
"PreviousStartedEventId", task.GetPreviousStartedEventId())
tagAttempt, task.Attempt,
tagPreviousStartedEventID, task.GetPreviousStartedEventId())
})

workflowContext, err := wth.getOrCreateWorkflowContext(task, workflowTask.historyIterator)
Expand Down Expand Up @@ -948,13 +949,15 @@ func (w *workflowExecutionContextImpl) applyWorkflowPanicPolicy(workflowTask *wo
tagWorkflowType, task.WorkflowType.GetName(),
tagWorkflowID, task.WorkflowExecution.GetWorkflowId(),
tagRunID, task.WorkflowExecution.GetRunId(),
tagAttempt, task.Attempt,
tagError, workflowError,
tagStackTrace, panicErr.StackTrace())
} else {
w.wth.logger.Error("Workflow panic",
tagWorkflowType, task.WorkflowType.GetName(),
tagWorkflowID, task.WorkflowExecution.GetWorkflowId(),
tagRunID, task.WorkflowExecution.GetRunId(),
tagAttempt, task.Attempt,
tagError, workflowError)
}

Expand Down Expand Up @@ -1115,10 +1118,11 @@ func (w *workflowExecutionContextImpl) ResetIfStale(task *workflowservice.PollWo
w.wth.logger.Debug("Cached state staled, new task has unexpected events",
tagWorkflowID, task.WorkflowExecution.GetWorkflowId(),
tagRunID, task.WorkflowExecution.GetRunId(),
"CachedPreviousStartedEventID", w.previousStartedEventID,
"TaskFirstEventID", task.History.Events[0].GetEventId(),
"TaskStartedEventID", task.GetStartedEventId(),
"TaskPreviousStartedEventID", task.GetPreviousStartedEventId())
tagAttempt, task.Attempt,
tagCachedPreviousStartedEventID, w.previousStartedEventID,
tagTaskFirstEventID, task.History.Events[0].GetEventId(),
tagTaskStartedEventID, task.GetStartedEventId(),
tagPreviousStartedEventID, task.GetPreviousStartedEventId())

w.clearState()
return w.resetStateIfDestroyed(task, historyIterator)
Expand Down Expand Up @@ -1735,7 +1739,9 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
ath.logger.Debug("Processing new activity task",
tagWorkflowID, t.WorkflowExecution.GetWorkflowId(),
tagRunID, t.WorkflowExecution.GetRunId(),
tagActivityType, t.ActivityType.GetName())
tagActivityType, t.ActivityType.GetName(),
tagAttempt, t.Attempt,
)
})

rootCtx := ath.userContext
Expand Down Expand Up @@ -1775,8 +1781,9 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
tagWorkflowID, t.WorkflowExecution.GetWorkflowId(),
tagRunID, t.WorkflowExecution.GetRunId(),
tagActivityType, activityType,
"PanicError", fmt.Sprintf("%v", p),
"PanicStack", st)
tagAttempt, t.Attempt,
tagPanicError, fmt.Sprintf("%v", p),
tagPanicStack, st)
activityMetricsScope.Counter(metrics.ActivityTaskErrorCounter).Inc(1)
panicErr := newPanicError(p, st)
result = convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil, panicErr,
Expand Down Expand Up @@ -1806,6 +1813,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
tagWorkflowID, t.WorkflowExecution.GetWorkflowId(),
tagRunID, t.WorkflowExecution.GetRunId(),
tagActivityType, activityType,
tagAttempt, t.Attempt,
tagResult, output,
tagError, err,
)
Expand All @@ -1816,6 +1824,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
tagWorkflowID, t.WorkflowExecution.GetWorkflowId(),
tagRunID, t.WorkflowExecution.GetRunId(),
tagActivityType, activityType,
tagAttempt, t.Attempt,
tagError, err,
)
}
Expand Down
6 changes: 4 additions & 2 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(
tagWorkflowType, task.WorkflowType.GetName(),
tagWorkflowID, task.WorkflowExecution.GetWorkflowId(),
tagRunID, task.WorkflowExecution.GetRunId(),
tagAttempt, task.Attempt,
tagError, taskErr)
// convert err to WorkflowTaskFailed
completedRequest = errorToFailWorkflowTask(task.TaskToken, taskErr, wtp.identity, wtp.dataConverter, wtp.namespace)
Expand Down Expand Up @@ -514,8 +515,9 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
tagWorkflowID, task.params.WorkflowInfo.WorkflowExecution.ID,
tagRunID, task.params.WorkflowInfo.WorkflowExecution.RunID,
tagActivityType, activityType,
"PanicError", fmt.Sprintf("%v", p),
"PanicStack", st)
tagAttempt, task.attempt,
tagPanicError, fmt.Sprintf("%v", p),
tagPanicStack, st)
activityMetricsScope.Counter(metrics.LocalActivityErrorCounter).Inc(1)
panicErr := newPanicError(p, st)
result = &localActivityResult{
Expand Down

0 comments on commit ca5205b

Please sign in to comment.