Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/client-versioning-api-methods' i…
Browse files Browse the repository at this point in the history
…nto versioning-api-refactor

# Conflicts:
#	go.mod
#	go.sum
#	test/go.mod
#	test/go.sum
#	test/integration_test.go
  • Loading branch information
Sushisource committed Sep 27, 2022
2 parents adce1bf + a02b764 commit bbbbfd4
Show file tree
Hide file tree
Showing 17 changed files with 902 additions and 565 deletions.
35 changes: 29 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ type (
// CheckHealthResponse is a response for Client.CheckHealth.
CheckHealthResponse = internal.CheckHealthResponse

// UpdateWorkerBuildIDOrderingOptions is the input to Client.UpdateWorkerBuildIDOrdering.
UpdateWorkerBuildIDOrderingOptions = internal.UpdateWorkerBuildIDOrderingOptions

// GetWorkerBuildIDOrderingOptions is the input to Client.GetWorkerBuildIDOrdering.
GetWorkerBuildIDOrderingOptions = internal.GetWorkerBuildIDOrderingOptions

// WorkerBuildIDVersionGraph is the response for Client.GetWorkerBuildIdOrdering
WorkerBuildIDVersionGraph = internal.WorkerBuildIDVersionGraph

// WorkerVersionIDNode represents a node in the WorkerBuildIDVersionGraph
WorkerVersionIDNode = internal.WorkerVersionIDNode

// Client is the client for starting and getting information about a workflow executions as well as
// completing activities asynchronously.
Client interface {
Expand Down Expand Up @@ -372,11 +384,20 @@ type (
// - serviceerror.NotFound
DescribeTaskQueue(ctx context.Context, taskqueue string, taskqueueType enumspb.TaskQueueType) (*workflowservice.DescribeTaskQueueResponse, error)

// ResetWorkflowExecution reset an existing workflow execution to WorkflowTaskFinishEventId(exclusive).
// ResetWorkflowExecution resets an existing workflow execution to WorkflowTaskFinishEventId(exclusive).
// And it will immediately terminating the current execution instance.
// RequestId is used to deduplicate requests. It will be autogenerated if not set.
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

// UpdateWorkerBuildIDOrdering **IN DEVELOPMENT**
// Allows you to update the worker-build-id based version graph for a particular task queue. This is used in
// conjunction with workers who specify their build id and thus opt into the feature. For more, see: <doc link>
UpdateWorkerBuildIDOrdering(ctx context.Context, options *UpdateWorkerBuildIDOrderingOptions) error

// GetWorkerBuildIDOrdering **IN DEVELOPMENT**
// Returns the worker-build-id based version graph for a particular task queue.
GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionGraph, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)
Expand Down Expand Up @@ -511,8 +532,9 @@ var _ internal.NamespaceClient = NamespaceClient(nil)
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat") and then got response from calling Client.DescribeWorkflowExecution.
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
// which can be decoded by using:
// var result string // This need to be same type as the one passed to RecordHeartbeat
// NewValue(data).Get(&result)
//
// var result string // This need to be same type as the one passed to RecordHeartbeat
// NewValue(data).Get(&result)
func NewValue(data *commonpb.Payloads) converter.EncodedValue {
return internal.NewValue(data)
}
Expand All @@ -521,9 +543,10 @@ func NewValue(data *commonpb.Payloads) converter.EncodedValue {
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat", 123) and then got response from calling Client.DescribeWorkflowExecution.
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
// which can be decoded by using:
// var result1 string
// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
// NewValues(data).Get(&result1, &result2)
//
// var result1 string
// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
// NewValues(data).Get(&result1, &result2)
func NewValues(data *commonpb.Payloads) converter.EncodedValues {
return internal.NewValues(data)
}
Expand Down
20 changes: 15 additions & 5 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,14 @@ type (
// RequestId is used to deduplicate requests. It will be autogenerated if not set.
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

// UpdateWorkerBuildIDOrdering allows you to update the worker-build-id based version graph for a particular
// task queue. This is used in conjunction with workers who specify their build id and thus opt into the
// feature. For more, see: <doc link>
UpdateWorkerBuildIDOrdering(ctx context.Context, options *UpdateWorkerBuildIDOrderingOptions) error

// GetWorkerBuildIDOrdering returns the worker-build-id based version graph for a particular task queue.
GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionGraph, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)
Expand Down Expand Up @@ -816,8 +824,9 @@ func newNamespaceServiceClient(workflowServiceClient workflowservice.WorkflowSer
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat") and then got response from calling Client.DescribeWorkflowExecution.
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
// which can be decoded by using:
// var result string // This need to be same type as the one passed to RecordHeartbeat
// NewValue(data).Get(&result)
//
// var result string // This need to be same type as the one passed to RecordHeartbeat
// NewValue(data).Get(&result)
func NewValue(data *commonpb.Payloads) converter.EncodedValue {
return newEncodedValue(data, nil)
}
Expand All @@ -826,9 +835,10 @@ func NewValue(data *commonpb.Payloads) converter.EncodedValue {
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat", 123) and then got response from calling Client.DescribeWorkflowExecution.
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
// which can be decoded by using:
// var result1 string
// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
// NewValues(data).Get(&result1, &result2)
//
// var result1 string
// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
// NewValues(data).Get(&result1, &result2)
func NewValues(data *commonpb.Payloads) converter.EncodedValues {
return newEncodedValues(data, nil)
}
41 changes: 15 additions & 26 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type (
ppMgr pressurePointMgr
logger log.Logger
identity string
workerBuildID string
enableLoggingInReplay bool
registry *registry
laTunnel *localActivityTunnel
Expand Down Expand Up @@ -401,6 +402,7 @@ func newWorkflowTaskHandler(params workerExecutionParameters, ppMgr pressurePoin
ppMgr: ppMgr,
metricsHandler: params.MetricsHandler,
identity: params.Identity,
workerBuildID: params.WorkerBuildID,
enableLoggingInReplay: params.EnableLoggingInReplay,
registry: registry,
workflowPanicPolicy: params.WorkflowPanicPolicy,
Expand Down Expand Up @@ -827,7 +829,7 @@ ProcessEvents:
break ProcessEvents
}
if binaryChecksum == "" {
w.workflowInfo.BinaryChecksum = getBinaryChecksum()
w.workflowInfo.BinaryChecksum = w.wth.getBuildID()
} else {
w.workflowInfo.BinaryChecksum = binaryChecksum
}
Expand Down Expand Up @@ -1557,40 +1559,20 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
}
}

return &workflowservice.RespondWorkflowTaskCompletedRequest{
builtRequest := &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: task.TaskToken,
Commands: commands,
Identity: wth.identity,
ReturnNewWorkflowTask: true,
ForceCreateNewWorkflowTask: forceNewWorkflowTask,
BinaryChecksum: getBinaryChecksum(),
BinaryChecksum: wth.getBuildID(),
QueryResults: queryResults,
Namespace: wth.namespace,
}
}

func errorToFailWorkflowTask(taskToken []byte, err error, identity string, dataConverter converter.DataConverter,
namespace string) *workflowservice.RespondWorkflowTaskFailedRequest {

cause := enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE
// If it was a panic due to a bad state machine or if it was a history
// mismatch error, mark as non-deterministic
if panicErr, _ := err.(*workflowPanicError); panicErr != nil {
if _, badStateMachine := panicErr.value.(stateMachineIllegalStatePanic); badStateMachine {
cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR
}
} else if _, mismatch := err.(historyMismatchError); mismatch {
cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR
}

return &workflowservice.RespondWorkflowTaskFailedRequest{
TaskToken: taskToken,
Cause: cause,
Failure: ConvertErrorToFailure(err, dataConverter),
Identity: identity,
BinaryChecksum: getBinaryChecksum(),
Namespace: namespace,
if wth.workerBuildID != "" {
builtRequest.WorkerVersioningId = &taskqueuepb.VersionId{WorkerBuildId: wth.workerBuildID}
}
return builtRequest
}

func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *historypb.HistoryEvent, isInReplay bool) error {
Expand All @@ -1609,6 +1591,13 @@ func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *historypb.Hi
return nil
}

func (wth *workflowTaskHandlerImpl) getBuildID() string {
if wth.workerBuildID != "" {
return wth.workerBuildID
}
return getBinaryChecksum()
}

func newActivityTaskHandler(
service workflowservice.WorkflowServiceClient,
params workerExecutionParameters,
Expand Down
19 changes: 19 additions & 0 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,25 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_BinaryChecksum() {
t.Equal(getBinaryChecksum(), checksums[2])
}

func (t *TaskHandlersTestSuite) TestRespondsToWFTWithWorkerBinaryID() {
taskQueue := "tq1"
workerBuildID := "yaaaay"
testEvents := []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
createTestEventWorkflowTaskStarted(3),
}
task := createWorkflowTask(testEvents, 0, "HelloWorld_Workflow")
params := t.getTestWorkerExecutionParams()
params.WorkerBuildID = workerBuildID
taskHandler := newWorkflowTaskHandler(params, nil, t.registry)
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest)
t.NoError(err)
t.NotNil(response)
t.Equal(workerBuildID, response.WorkerVersioningId.GetWorkerBuildId())
}

func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() {
// Schedule an activity and see if we complete workflow.
taskQueue := "tq1"
Expand Down
65 changes: 56 additions & 9 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type (
basePoller struct {
metricsHandler metrics.Handler // base metric handler used for rpc calls
stopC <-chan struct{}
// Is set if this worker has opted in to build-id based versioning
workerBuildID string
}

// workflowTaskPoller implements polling/processing a workflow task
Expand Down Expand Up @@ -224,7 +226,11 @@ func newWorkflowTaskPoller(
params workerExecutionParameters,
) *workflowTaskPoller {
return &workflowTaskPoller{
basePoller: basePoller{metricsHandler: params.MetricsHandler, stopC: params.WorkerStopChannel},
basePoller: basePoller{
metricsHandler: params.MetricsHandler,
stopC: params.WorkerStopChannel,
workerBuildID: params.WorkerBuildID,
},
service: service,
namespace: params.Namespace,
taskQueueName: params.TaskQueue,
Expand Down Expand Up @@ -340,7 +346,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(
tagAttempt, task.Attempt,
tagError, taskErr)
// convert err to WorkflowTaskFailed
completedRequest = errorToFailWorkflowTask(task.TaskToken, taskErr, wtp.identity, wtp.dataConverter, wtp.namespace)
completedRequest = wtp.errorToFailWorkflowTask(task.TaskToken, taskErr)
}

metricsHandler.Timer(metrics.WorkflowTaskExecutionLatency).Record(time.Since(startTime))
Expand Down Expand Up @@ -401,6 +407,35 @@ func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}
return
}

func (wtp *workflowTaskPoller) errorToFailWorkflowTask(taskToken []byte, err error) *workflowservice.RespondWorkflowTaskFailedRequest {
cause := enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE
// If it was a panic due to a bad state machine or if it was a history
// mismatch error, mark as non-deterministic
if panicErr, _ := err.(*workflowPanicError); panicErr != nil {
if _, badStateMachine := panicErr.value.(stateMachineIllegalStatePanic); badStateMachine {
cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR
}
} else if _, mismatch := err.(historyMismatchError); mismatch {
cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR
}

return &workflowservice.RespondWorkflowTaskFailedRequest{
TaskToken: taskToken,
Cause: cause,
Failure: ConvertErrorToFailure(err, wtp.dataConverter),
Identity: wtp.identity,
BinaryChecksum: wtp.getBuildID(),
Namespace: wtp.namespace,
}
}

func (bp *basePoller) getBuildID() string {
if bp.workerBuildID == "" {
return getBinaryChecksum()
}
return bp.workerBuildID
}

func newLocalActivityPoller(
params workerExecutionParameters,
laTunnel *localActivityTunnel,
Expand Down Expand Up @@ -593,10 +628,11 @@ func (wtp *workflowTaskPoller) updateBacklog(taskQueueKind enumspb.TaskQueueKind

// getNextPollRequest returns appropriate next poll request based on poller configuration.
// Simple rules:
// 1) if sticky execution is disabled, always poll for regular task queue
// 2) otherwise:
// 2.1) if sticky task queue has backlog, always prefer to process sticky task first
// 2.2) poll from the task queue that has less pending requests (prefer sticky when they are the same).
// 1. if sticky execution is disabled, always poll for regular task queue
// 2. otherwise:
// 2.1) if sticky task queue has backlog, always prefer to process sticky task first
// 2.2) poll from the task queue that has less pending requests (prefer sticky when they are the same).
//
// TODO: make this more smart to auto adjust based on poll latency
func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.PollWorkflowTaskQueueRequest) {
taskQueueName := wtp.taskQueueName
Expand All @@ -617,12 +653,16 @@ func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.Po
Name: taskQueueName,
Kind: taskQueueKind,
}
return &workflowservice.PollWorkflowTaskQueueRequest{
builtRequest := &workflowservice.PollWorkflowTaskQueueRequest{
Namespace: wtp.namespace,
TaskQueue: taskQueue,
Identity: wtp.identity,
BinaryChecksum: getBinaryChecksum(),
BinaryChecksum: wtp.getBuildID(),
}
if wtp.workerBuildID != "" {
builtRequest.WorkerVersioningId = &taskqueuepb.VersionId{WorkerBuildId: wtp.workerBuildID}
}
return builtRequest
}

// Poll for a single workflow task from the service
Expand Down Expand Up @@ -769,7 +809,11 @@ func newGetHistoryPageFunc(

func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowservice.WorkflowServiceClient, params workerExecutionParameters) *activityTaskPoller {
return &activityTaskPoller{
basePoller: basePoller{metricsHandler: params.MetricsHandler, stopC: params.WorkerStopChannel},
basePoller: basePoller{
metricsHandler: params.MetricsHandler,
stopC: params.WorkerStopChannel,
workerBuildID: params.WorkerBuildID,
},
taskHandler: taskHandler,
service: service,
namespace: params.Namespace,
Expand All @@ -791,6 +835,9 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (interface{}, error) {
Identity: atp.identity,
TaskQueueMetadata: &taskqueuepb.TaskQueueMetadata{MaxTasksPerSecond: &types.DoubleValue{Value: atp.activitiesPerSecond}},
}
if atp.workerBuildID != "" {
request.WorkerVersioningId = &taskqueuepb.VersionId{WorkerBuildId: atp.workerBuildID}
}

response, err := atp.service.PollActivityTaskQueue(ctx, request)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ type (
// a default option.
Identity string

// The worker's build ID used for versioning, if one was set.
WorkerBuildID string

MetricsHandler metrics.Handler

Logger log.Logger
Expand Down Expand Up @@ -1374,6 +1377,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
ConcurrentWorkflowTaskExecutionSize: options.MaxConcurrentWorkflowTaskExecutionSize,
MaxConcurrentWorkflowTaskQueuePollers: options.MaxConcurrentWorkflowTaskPollers,
Identity: client.identity,
WorkerBuildID: options.BuildIDForVersioning,
MetricsHandler: client.metricsHandler.WithTags(metrics.TaskQueueTags(taskQueue)),
Logger: client.logger,
EnableLoggingInReplay: options.EnableLoggingInReplay,
Expand Down
Loading

0 comments on commit bbbbfd4

Please sign in to comment.