Skip to content

Commit

Permalink
Add option to overide replayer execution info
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Jan 5, 2023
1 parent 6f4d14f commit 0dadd27
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 13 deletions.
33 changes: 26 additions & 7 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,13 @@ type WorkflowReplayerOptions struct {
EnableLoggingInReplay bool
}

// ReplayWorkflowHistoryOptions are options for replaying a workflow.
type ReplayWorkflowHistoryOptions struct {
// OriginalExecution - Overide the workflow execution details used for replay.
// Optional
OriginalExecution WorkflowExecution
}

// NewWorkflowReplayer creates an instance of the WorkflowReplayer.
func NewWorkflowReplayer(options WorkflowReplayerOptions) (*WorkflowReplayer, error) {
registry := newRegistryWithOptions(registryOptions{disableAliasing: options.DisableRegistrationAliasing})
Expand All @@ -1134,18 +1141,25 @@ func (aw *WorkflowReplayer) RegisterWorkflowWithOptions(w interface{}, options R
aw.registry.RegisterWorkflowWithOptions(w, options)
}

// ReplayWorkflowHistory executes a single workflow task for the given history.
// ReplayWorkflowHistoryWithOptions executes a single workflow task for the given history.
// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger.
// The logger is an optional parameter. Defaults to the noop logger.
func (aw *WorkflowReplayer) ReplayWorkflowHistory(logger log.Logger, history *historypb.History) error {
func (aw *WorkflowReplayer) ReplayWorkflowHistoryWithOptions(logger log.Logger, history *historypb.History, options ReplayWorkflowHistoryOptions) error {
if logger == nil {
logger = ilog.NewDefaultLogger()
}

controller := gomock.NewController(ilog.NewTestReporter(logger))
service := workflowservicemock.NewMockWorkflowServiceClient(controller)

return aw.replayWorkflowHistory(logger, service, ReplayNamespace, history)
return aw.replayWorkflowHistory(logger, service, ReplayNamespace, options.OriginalExecution, history)
}

// ReplayWorkflowHistory executes a single workflow task for the given history.
// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger.
// The logger is an optional parameter. Defaults to the noop logger.
func (aw *WorkflowReplayer) ReplayWorkflowHistory(logger log.Logger, history *historypb.History) error {
return aw.ReplayWorkflowHistoryWithOptions(logger, history, ReplayWorkflowHistoryOptions{})
}

// ReplayWorkflowHistoryFromJSONFile executes a single workflow task for the given json history file.
Expand All @@ -1172,7 +1186,7 @@ func (aw *WorkflowReplayer) ReplayPartialWorkflowHistoryFromJSONFile(logger log.
controller := gomock.NewController(ilog.NewTestReporter(logger))
service := workflowservicemock.NewMockWorkflowServiceClient(controller)

return aw.replayWorkflowHistory(logger, service, ReplayNamespace, history)
return aw.replayWorkflowHistory(logger, service, ReplayNamespace, WorkflowExecution{}, history)
}

// ReplayWorkflowExecution replays workflow execution loading it from Temporal service.
Expand Down Expand Up @@ -1211,7 +1225,7 @@ func (aw *WorkflowReplayer) ReplayWorkflowExecution(ctx context.Context, service
}
request.NextPageToken = resp.NextPageToken
}
return aw.replayWorkflowHistory(logger, service, namespace, &history)
return aw.replayWorkflowHistory(logger, service, namespace, execution, &history)
}

// inferInvocations extracts the set of *interactionpb.Invocation objects that
Expand All @@ -1231,7 +1245,7 @@ func inferInvocations(events []*historypb.HistoryEvent) []*interactionpb.Invocat
return invocations
}

func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service workflowservice.WorkflowServiceClient, namespace string, history *historypb.History) error {
func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service workflowservice.WorkflowServiceClient, namespace string, originalExecution WorkflowExecution, history *historypb.History) error {
taskQueue := "ReplayTaskQueue"
events := history.Events
if events == nil {
Expand All @@ -1255,7 +1269,12 @@ func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service wor
RunId: uuid.NewRandom().String(),
WorkflowId: "ReplayId",
}
if first.GetWorkflowExecutionStartedEventAttributes().GetOriginalExecutionRunId() != "" {
if originalExecution.ID != "" {
execution.WorkflowId = originalExecution.ID
}
if originalExecution.RunID != "" {
execution.RunId = originalExecution.RunID
} else if first.GetWorkflowExecutionStartedEventAttributes().GetOriginalExecutionRunId() != "" {
execution.RunId = first.GetWorkflowExecutionStartedEventAttributes().GetOriginalExecutionRunId()
}

Expand Down
66 changes: 66 additions & 0 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,24 @@ func testReplayWorkflowSideEffect(ctx Context) error {
return nil
}

func testReplayRunID(ctx Context) error {
info := GetWorkflowInfo(ctx)
if info.WorkflowExecution.ID == "TestID" && info.WorkflowExecution.RunID == "TestRunID" {
ao := ActivityOptions{
ScheduleToStartTimeout: time.Second,
StartToCloseTimeout: time.Second,
}
ctx = WithActivityOptions(ctx, ao)
var A1Result string
if err := ExecuteActivity(ctx, "A1", "first").Get(ctx, &A1Result); err != nil {
return err
}
}

getLogger().Info("workflow completed.")
return nil
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_SideEffect() {
taskQueue := "taskQueue1"
testEvents := []*historypb.HistoryEvent{
Expand Down Expand Up @@ -1465,6 +1483,54 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_SideEffect() {
require.NoError(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_ReplayRunID() {
taskQueue := "taskQueue1"
testEvents := []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{
WorkflowType: &commonpb.WorkflowType{Name: "testReplayRunID"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
Input: testEncodeFunctionArgs(converter.GetDefaultDataConverter()),
}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(3),
createTestEventWorkflowTaskCompleted(4, &historypb.WorkflowTaskCompletedEventAttributes{}),
createTestEventActivityTaskScheduled(5, &historypb.ActivityTaskScheduledEventAttributes{
ActivityId: "5",
ActivityType: &commonpb.ActivityType{Name: "A1"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
}),
createTestEventActivityTaskStarted(6, &historypb.ActivityTaskStartedEventAttributes{
ScheduledEventId: 5,
}),
createTestEventActivityTaskCompleted(7, &historypb.ActivityTaskCompletedEventAttributes{
ScheduledEventId: 5,
StartedEventId: 6,
}),
createTestEventWorkflowTaskScheduled(8, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(9),
createTestEventWorkflowTaskCompleted(10, &historypb.WorkflowTaskCompletedEventAttributes{
ScheduledEventId: 8,
StartedEventId: 9,
}),
createTestEventWorkflowExecutionCompleted(11, &historypb.WorkflowExecutionCompletedEventAttributes{
WorkflowTaskCompletedEventId: 10,
}),
}

history := &historypb.History{Events: testEvents}
logger := getLogger()
replayer, err := NewWorkflowReplayer(WorkflowReplayerOptions{})
require.NoError(s.T(), err)
replayer.RegisterWorkflow(testReplayRunID)
err = replayer.ReplayWorkflowHistoryWithOptions(logger, history, ReplayWorkflowHistoryOptions{
OriginalExecution: WorkflowExecution{
ID: "TestID",
RunID: "TestRunID",
},
})
require.NoError(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistoryFromFileParent() {
logger := getLogger()
replayer, err := NewWorkflowReplayer(WorkflowReplayerOptions{})
Expand Down
25 changes: 19 additions & 6 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ type (
// History can be loaded from a reader with client.HistoryFromJSON.
ReplayWorkflowHistory(logger log.Logger, history *historypb.History) error

// ReplayWorkflowHistoryWithOptions executes a single workflow task for the given json history file.
// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger.
// The logger is an optional parameter. Defaults to the noop logger. Options allow aditional customization when
// replaying this history.
//
// History can be loaded from a reader with client.HistoryFromJSON.
ReplayWorkflowHistoryWithOptions(logger log.Logger, history *historypb.History, options ReplayWorkflowHistoryOptions) error

// ReplayWorkflowHistoryFromJSONFile executes a single workflow task for the json history file downloaded from the cli.
// To download the history file: temporal workflow showid <workflow_id> -of <output_filename>
// See https://github.com/temporalio/temporal/blob/master/tools/cli/README.md for full documentation
Expand All @@ -188,7 +196,8 @@ type (

// ReplayWorkflowExecution loads a workflow execution history from the Temporal service and executes a single workflow task for it.
// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger.
// The logger is the only optional parameter. Defaults to the noop logger.
// The logger is the only optional parameter. Defaults to the noop logger. The Run ID and Workflow ID used during replay are derived
// from execution.
ReplayWorkflowExecution(ctx context.Context, service workflowservice.WorkflowServiceClient, logger log.Logger, namespace string, execution workflow.Execution) error
}

Expand All @@ -204,6 +213,9 @@ type (
// WorkflowReplayerOptions are options used for
// NewWorkflowReplayerWithOptions.
WorkflowReplayerOptions = internal.WorkflowReplayerOptions

// ReplayWorkflowHistoryOptions are options for replaying a workflow.
ReplayWorkflowHistoryOptions = internal.ReplayWorkflowHistoryOptions
)

const (
Expand All @@ -219,11 +231,12 @@ const (
)

// New creates an instance of worker for managing workflow and activity executions.
// client - the client for use by the worker
// taskQueue - is the task queue name you use to identify your client worker, also
// identifies group of workflow and activity implementations that are
// hosted by a single worker process
// options - configure any worker specific options like logger, metrics, identity
//
// client - the client for use by the worker
// taskQueue - is the task queue name you use to identify your client worker, also
// identifies group of workflow and activity implementations that are
// hosted by a single worker process
// options - configure any worker specific options like logger, metrics, identity
func New(
client client.Client,
taskQueue string,
Expand Down

0 comments on commit 0dadd27

Please sign in to comment.