diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 3a149e69c..f2c8bafea 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1112,6 +1112,13 @@ type WorkflowReplayerOptions struct { EnableLoggingInReplay bool } +// ReplayWorkflowHistoryOptions are options for replaying a workflow. +type ReplayWorkflowHistoryOptions struct { + // OriginalExecution - Overide the workflow execution details used for replay. + // Optional + OriginalExecution WorkflowExecution +} + // NewWorkflowReplayer creates an instance of the WorkflowReplayer. func NewWorkflowReplayer(options WorkflowReplayerOptions) (*WorkflowReplayer, error) { registry := newRegistryWithOptions(registryOptions{disableAliasing: options.DisableRegistrationAliasing}) @@ -1134,10 +1141,10 @@ func (aw *WorkflowReplayer) RegisterWorkflowWithOptions(w interface{}, options R aw.registry.RegisterWorkflowWithOptions(w, options) } -// ReplayWorkflowHistory executes a single workflow task for the given history. +// ReplayWorkflowHistoryWithOptions executes a single workflow task for the given history. // Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. // The logger is an optional parameter. Defaults to the noop logger. -func (aw *WorkflowReplayer) ReplayWorkflowHistory(logger log.Logger, history *historypb.History) error { +func (aw *WorkflowReplayer) ReplayWorkflowHistoryWithOptions(logger log.Logger, history *historypb.History, options ReplayWorkflowHistoryOptions) error { if logger == nil { logger = ilog.NewDefaultLogger() } @@ -1145,7 +1152,14 @@ func (aw *WorkflowReplayer) ReplayWorkflowHistory(logger log.Logger, history *hi controller := gomock.NewController(ilog.NewTestReporter(logger)) service := workflowservicemock.NewMockWorkflowServiceClient(controller) - return aw.replayWorkflowHistory(logger, service, ReplayNamespace, history) + return aw.replayWorkflowHistory(logger, service, ReplayNamespace, options.OriginalExecution, history) +} + +// ReplayWorkflowHistory executes a single workflow task for the given history. +// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. +// The logger is an optional parameter. Defaults to the noop logger. +func (aw *WorkflowReplayer) ReplayWorkflowHistory(logger log.Logger, history *historypb.History) error { + return aw.ReplayWorkflowHistoryWithOptions(logger, history, ReplayWorkflowHistoryOptions{}) } // ReplayWorkflowHistoryFromJSONFile executes a single workflow task for the given json history file. @@ -1172,7 +1186,7 @@ func (aw *WorkflowReplayer) ReplayPartialWorkflowHistoryFromJSONFile(logger log. controller := gomock.NewController(ilog.NewTestReporter(logger)) service := workflowservicemock.NewMockWorkflowServiceClient(controller) - return aw.replayWorkflowHistory(logger, service, ReplayNamespace, history) + return aw.replayWorkflowHistory(logger, service, ReplayNamespace, WorkflowExecution{}, history) } // ReplayWorkflowExecution replays workflow execution loading it from Temporal service. @@ -1211,7 +1225,7 @@ func (aw *WorkflowReplayer) ReplayWorkflowExecution(ctx context.Context, service } request.NextPageToken = resp.NextPageToken } - return aw.replayWorkflowHistory(logger, service, namespace, &history) + return aw.replayWorkflowHistory(logger, service, namespace, execution, &history) } // inferInvocations extracts the set of *interactionpb.Invocation objects that @@ -1231,7 +1245,7 @@ func inferInvocations(events []*historypb.HistoryEvent) []*interactionpb.Invocat return invocations } -func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service workflowservice.WorkflowServiceClient, namespace string, history *historypb.History) error { +func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service workflowservice.WorkflowServiceClient, namespace string, originalExecution WorkflowExecution, history *historypb.History) error { taskQueue := "ReplayTaskQueue" events := history.Events if events == nil { @@ -1255,7 +1269,12 @@ func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service wor RunId: uuid.NewRandom().String(), WorkflowId: "ReplayId", } - if first.GetWorkflowExecutionStartedEventAttributes().GetOriginalExecutionRunId() != "" { + if originalExecution.ID != "" { + execution.WorkflowId = originalExecution.ID + } + if originalExecution.RunID != "" { + execution.RunId = originalExecution.RunID + } else if first.GetWorkflowExecutionStartedEventAttributes().GetOriginalExecutionRunId() != "" { execution.RunId = first.GetWorkflowExecutionStartedEventAttributes().GetOriginalExecutionRunId() } diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index d51174a7d..5e44e0763 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -1379,6 +1379,24 @@ func testReplayWorkflowSideEffect(ctx Context) error { return nil } +func testReplayRunID(ctx Context) error { + info := GetWorkflowInfo(ctx) + if info.WorkflowExecution.ID == "TestID" && info.WorkflowExecution.RunID == "TestRunID" { + ao := ActivityOptions{ + ScheduleToStartTimeout: time.Second, + StartToCloseTimeout: time.Second, + } + ctx = WithActivityOptions(ctx, ao) + var A1Result string + if err := ExecuteActivity(ctx, "A1", "first").Get(ctx, &A1Result); err != nil { + return err + } + } + + getLogger().Info("workflow completed.") + return nil +} + func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_SideEffect() { taskQueue := "taskQueue1" testEvents := []*historypb.HistoryEvent{ @@ -1465,6 +1483,54 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_SideEffect() { require.NoError(s.T(), err) } +func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_ReplayRunID() { + taskQueue := "taskQueue1" + testEvents := []*historypb.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &commonpb.WorkflowType{Name: "testReplayRunID"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + Input: testEncodeFunctionArgs(converter.GetDefaultDataConverter()), + }), + createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}), + createTestEventWorkflowTaskStarted(3), + createTestEventWorkflowTaskCompleted(4, &historypb.WorkflowTaskCompletedEventAttributes{}), + createTestEventActivityTaskScheduled(5, &historypb.ActivityTaskScheduledEventAttributes{ + ActivityId: "5", + ActivityType: &commonpb.ActivityType{Name: "A1"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + }), + createTestEventActivityTaskStarted(6, &historypb.ActivityTaskStartedEventAttributes{ + ScheduledEventId: 5, + }), + createTestEventActivityTaskCompleted(7, &historypb.ActivityTaskCompletedEventAttributes{ + ScheduledEventId: 5, + StartedEventId: 6, + }), + createTestEventWorkflowTaskScheduled(8, &historypb.WorkflowTaskScheduledEventAttributes{}), + createTestEventWorkflowTaskStarted(9), + createTestEventWorkflowTaskCompleted(10, &historypb.WorkflowTaskCompletedEventAttributes{ + ScheduledEventId: 8, + StartedEventId: 9, + }), + createTestEventWorkflowExecutionCompleted(11, &historypb.WorkflowExecutionCompletedEventAttributes{ + WorkflowTaskCompletedEventId: 10, + }), + } + + history := &historypb.History{Events: testEvents} + logger := getLogger() + replayer, err := NewWorkflowReplayer(WorkflowReplayerOptions{}) + require.NoError(s.T(), err) + replayer.RegisterWorkflow(testReplayRunID) + err = replayer.ReplayWorkflowHistoryWithOptions(logger, history, ReplayWorkflowHistoryOptions{ + OriginalExecution: WorkflowExecution{ + ID: "TestID", + RunID: "TestRunID", + }, + }) + require.NoError(s.T(), err) +} + func (s *internalWorkerTestSuite) TestReplayWorkflowHistoryFromFileParent() { logger := getLogger() replayer, err := NewWorkflowReplayer(WorkflowReplayerOptions{}) diff --git a/worker/worker.go b/worker/worker.go index 73becabf6..ae0076fb6 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -171,6 +171,14 @@ type ( // History can be loaded from a reader with client.HistoryFromJSON. ReplayWorkflowHistory(logger log.Logger, history *historypb.History) error + // ReplayWorkflowHistoryWithOptions executes a single workflow task for the given json history file. + // Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. + // The logger is an optional parameter. Defaults to the noop logger. Options allow aditional customization when + // replaying this history. + // + // History can be loaded from a reader with client.HistoryFromJSON. + ReplayWorkflowHistoryWithOptions(logger log.Logger, history *historypb.History, options ReplayWorkflowHistoryOptions) error + // ReplayWorkflowHistoryFromJSONFile executes a single workflow task for the json history file downloaded from the cli. // To download the history file: temporal workflow showid -of // See https://github.com/temporalio/temporal/blob/master/tools/cli/README.md for full documentation @@ -188,7 +196,8 @@ type ( // ReplayWorkflowExecution loads a workflow execution history from the Temporal service and executes a single workflow task for it. // Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. - // The logger is the only optional parameter. Defaults to the noop logger. + // The logger is the only optional parameter. Defaults to the noop logger. The Run ID and Workflow ID used during replay are derived + // from execution. ReplayWorkflowExecution(ctx context.Context, service workflowservice.WorkflowServiceClient, logger log.Logger, namespace string, execution workflow.Execution) error } @@ -204,6 +213,9 @@ type ( // WorkflowReplayerOptions are options used for // NewWorkflowReplayerWithOptions. WorkflowReplayerOptions = internal.WorkflowReplayerOptions + + // ReplayWorkflowHistoryOptions are options for replaying a workflow. + ReplayWorkflowHistoryOptions = internal.ReplayWorkflowHistoryOptions ) const ( @@ -219,11 +231,12 @@ const ( ) // New creates an instance of worker for managing workflow and activity executions. -// client - the client for use by the worker -// taskQueue - is the task queue name you use to identify your client worker, also -// identifies group of workflow and activity implementations that are -// hosted by a single worker process -// options - configure any worker specific options like logger, metrics, identity +// +// client - the client for use by the worker +// taskQueue - is the task queue name you use to identify your client worker, also +// identifies group of workflow and activity implementations that are +// hosted by a single worker process +// options - configure any worker specific options like logger, metrics, identity func New( client client.Client, taskQueue string,