Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add versioning API methods to client #920

Merged
merged 19 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,17 @@ type (
// NOTE: Experimental
WorkflowUpdateHandle = internal.WorkflowUpdateHandle

// UpdateWorkerBuildIDCompatabilityOptions is the input to Client.UpdateWorkerBuildIdCompatability.
// NOTE: Experimental
UpdateWorkerBuildIDCompatabilityOptions = internal.UpdateWorkerBuildIDCompatabilityOptions

// GetWorkerBuildIDCompatabilityOptions is the input to Client.GetWorkerBuildIdCompatability.
// NOTE: Experimental
GetWorkerBuildIDCompatabilityOptions = internal.GetWorkerBuildIDCompatabilityOptions

// WorkerBuildIDVersionSets is the response for Client.GetWorkerBuildIdCompatability.
WorkerBuildIDVersionSets = internal.WorkerBuildIDVersionSets

// Client is the client for starting and getting information about a workflow executions as well as
// completing activities asynchronously.
Client interface {
Expand Down Expand Up @@ -478,11 +489,22 @@ type (
// - serviceerror.NotFound
DescribeTaskQueue(ctx context.Context, taskqueue string, taskqueueType enumspb.TaskQueueType) (*workflowservice.DescribeTaskQueueResponse, error)

// ResetWorkflowExecution reset an existing workflow execution to WorkflowTaskFinishEventId(exclusive).
// ResetWorkflowExecution resets an existing workflow execution to WorkflowTaskFinishEventId(exclusive).
// And it will immediately terminating the current execution instance.
// RequestId is used to deduplicate requests. It will be autogenerated if not set.
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

// UpdateWorkerBuildIDCompatability
// Allows you to update the worker-build-id based version sets for a particular task queue. This is used in
// conjunction with workers who specify their build id and thus opt into the feature.
// NOTE: Experimental
UpdateWorkerBuildIDCompatability(ctx context.Context, options *UpdateWorkerBuildIDCompatabilityOptions) error

// GetWorkerBuildIDCompatability
// Returns the worker-build-id based version sets for a particular task queue.
// NOTE: Experimental
GetWorkerBuildIDCompatability(ctx context.Context, options *GetWorkerBuildIDCompatabilityOptions) (*WorkerBuildIDVersionSets, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ require (
google.golang.org/protobuf v1.28.1
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
)

replace go.temporal.io/api => /mnt/chonky/dev/temporal/api-go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chonky

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄 (of course will go away once API change is merged)

2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,6 @@ go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.temporal.io/api v1.18.1 h1:kC3TS0iVXaAKe12L39I37b9X59x4p7E6512EfG/+RCU=
go.temporal.io/api v1.18.1/go.mod h1:VWdEGKUWRYKMkKJPo3wONuGZKay+d5L641b+JRuw0Bo=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down
8 changes: 8 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,14 @@ type (
// RequestId is used to deduplicate requests. It will be autogenerated if not set.
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

// UpdateWorkerBuildIDCompatability allows you to update the worker-build-id based version sets for a particular
// task queue. This is used in conjunction with workers who specify their build id and thus opt into the
// feature.
UpdateWorkerBuildIDCompatability(ctx context.Context, options *UpdateWorkerBuildIDCompatabilityOptions) error

// GetWorkerBuildIDCompatability returns the worker-build-id based version sets for a particular task queue.
GetWorkerBuildIDCompatability(ctx context.Context, options *GetWorkerBuildIDCompatabilityOptions) (*WorkerBuildIDVersionSets, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)
Expand Down
41 changes: 16 additions & 25 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type (
ppMgr pressurePointMgr
logger log.Logger
identity string
workerBuildID string
enableLoggingInReplay bool
registry *registry
laTunnel *localActivityTunnel
Expand Down Expand Up @@ -416,6 +417,7 @@ func newWorkflowTaskHandler(params workerExecutionParameters, ppMgr pressurePoin
ppMgr: ppMgr,
metricsHandler: params.MetricsHandler,
identity: params.Identity,
workerBuildID: params.WorkerBuildID,
enableLoggingInReplay: params.EnableLoggingInReplay,
registry: registry,
workflowPanicPolicy: params.WorkflowPanicPolicy,
Expand Down Expand Up @@ -874,7 +876,7 @@ ProcessEvents:
break ProcessEvents
}
if binaryChecksum == "" {
w.workflowInfo.BinaryChecksum = getBinaryChecksum()
w.workflowInfo.BinaryChecksum = w.wth.getBuildID()
} else {
w.workflowInfo.BinaryChecksum = binaryChecksum
}
Expand Down Expand Up @@ -1599,45 +1601,27 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
langUsedFlags = append(langUsedFlags, uint32(flag))
}

return &workflowservice.RespondWorkflowTaskCompletedRequest{
builtRequest := &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: task.TaskToken,
Commands: commands,
Messages: messages,
Identity: wth.identity,
ReturnNewWorkflowTask: true,
ForceCreateNewWorkflowTask: forceNewWorkflowTask,
BinaryChecksum: getBinaryChecksum(),
BinaryChecksum: wth.getBuildID(),
QueryResults: queryResults,
Namespace: wth.namespace,
MeteringMetadata: &commonpb.MeteringMetadata{NonfirstLocalActivityExecutionAttempts: nonfirstLAAttempts},
SdkMetadata: &sdk.WorkflowTaskCompletedMetadata{
LangUsedFlags: langUsedFlags,
},
}
}

func errorToFailWorkflowTask(taskToken []byte, err error, identity string, dataConverter converter.DataConverter,
failureConverter converter.FailureConverter, namespace string,
) *workflowservice.RespondWorkflowTaskFailedRequest {
cause := enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE
// If it was a panic due to a bad state machine or if it was a history
// mismatch error, mark as non-deterministic
if panicErr, _ := err.(*workflowPanicError); panicErr != nil {
if _, badStateMachine := panicErr.value.(stateMachineIllegalStatePanic); badStateMachine {
cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR
if wth.workerBuildID != "" {
builtRequest.WorkerVersionStamp = &commonpb.WorkerVersionStamp{
BuildId: wth.workerBuildID,
}
} else if _, mismatch := err.(historyMismatchError); mismatch {
cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR
}

return &workflowservice.RespondWorkflowTaskFailedRequest{
TaskToken: taskToken,
Cause: cause,
Failure: failureConverter.ErrorToFailure(err),
Identity: identity,
BinaryChecksum: getBinaryChecksum(),
Namespace: namespace,
}
return builtRequest
}

func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *historypb.HistoryEvent, isInReplay bool) error {
Expand All @@ -1656,6 +1640,13 @@ func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *historypb.Hi
return nil
}

func (wth *workflowTaskHandlerImpl) getBuildID() string {
if wth.workerBuildID != "" {
return wth.workerBuildID
}
return getBinaryChecksum()
}

func newActivityTaskHandler(
service workflowservice.WorkflowServiceClient,
params workerExecutionParameters,
Expand Down
19 changes: 19 additions & 0 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,25 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_BinaryChecksum() {
t.Equal(getBinaryChecksum(), checksums[2])
}

func (t *TaskHandlersTestSuite) TestRespondsToWFTWithWorkerBinaryID() {
taskQueue := "tq1"
workerBuildID := "yaaaay"
testEvents := []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
createTestEventWorkflowTaskStarted(3),
}
task := createWorkflowTask(testEvents, 0, "HelloWorld_Workflow")
params := t.getTestWorkerExecutionParams()
params.WorkerBuildID = workerBuildID
taskHandler := newWorkflowTaskHandler(params, nil, t.registry)
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest)
t.NoError(err)
t.NotNil(response)
t.Equal(workerBuildID, response.GetWorkerVersionStamp().GetBuildId())
}

func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() {
// Schedule an activity and see if we complete workflow.
taskQueue := "tq1"
Expand Down
61 changes: 55 additions & 6 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type (
basePoller struct {
metricsHandler metrics.Handler // base metric handler used for rpc calls
stopC <-chan struct{}
// Is set if this worker has opted in to build-id based versioning
workerBuildID string
}

// numPollerMetric tracks the number of active pollers and publishes a metric on it.
Expand Down Expand Up @@ -256,7 +258,11 @@ func newWorkflowTaskPoller(
params workerExecutionParameters,
) *workflowTaskPoller {
return &workflowTaskPoller{
basePoller: basePoller{metricsHandler: params.MetricsHandler, stopC: params.WorkerStopChannel},
basePoller: basePoller{
metricsHandler: params.MetricsHandler,
stopC: params.WorkerStopChannel,
workerBuildID: params.WorkerBuildID,
},
service: service,
namespace: params.Namespace,
taskQueueName: params.TaskQueue,
Expand Down Expand Up @@ -377,8 +383,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(
tagRunID, task.WorkflowExecution.GetRunId(),
tagAttempt, task.Attempt,
tagError, taskErr)
// convert err to WorkflowTaskFailed
completedRequest = errorToFailWorkflowTask(task.TaskToken, taskErr, wtp.identity, wtp.dataConverter, wtp.failureConverter, wtp.namespace)
completedRequest = wtp.errorToFailWorkflowTask(task.TaskToken, taskErr)
}

metricsHandler.Timer(metrics.WorkflowTaskExecutionLatency).Record(time.Since(startTime))
Expand Down Expand Up @@ -439,6 +444,35 @@ func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}
return
}

func (wtp *workflowTaskPoller) errorToFailWorkflowTask(taskToken []byte, err error) *workflowservice.RespondWorkflowTaskFailedRequest {
cause := enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE
// If it was a panic due to a bad state machine or if it was a history
// mismatch error, mark as non-deterministic
if panicErr, _ := err.(*workflowPanicError); panicErr != nil {
if _, badStateMachine := panicErr.value.(stateMachineIllegalStatePanic); badStateMachine {
cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR
}
} else if _, mismatch := err.(historyMismatchError); mismatch {
cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR
}

return &workflowservice.RespondWorkflowTaskFailedRequest{
TaskToken: taskToken,
Cause: cause,
Failure: wtp.failureConverter.ErrorToFailure(err),
Identity: wtp.identity,
BinaryChecksum: wtp.getBuildID(),
Namespace: wtp.namespace,
}
}

func (bp *basePoller) getBuildID() string {
if bp.workerBuildID == "" {
return getBinaryChecksum()
}
return bp.workerBuildID
}

func newLocalActivityPoller(
params workerExecutionParameters,
laTunnel *localActivityTunnel,
Expand Down Expand Up @@ -659,12 +693,18 @@ func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.Po
Name: taskQueueName,
Kind: taskQueueKind,
}
return &workflowservice.PollWorkflowTaskQueueRequest{
builtRequest := &workflowservice.PollWorkflowTaskQueueRequest{
Namespace: wtp.namespace,
TaskQueue: taskQueue,
Identity: wtp.identity,
BinaryChecksum: getBinaryChecksum(),
BinaryChecksum: wtp.getBuildID(),
}
if wtp.workerBuildID != "" {
builtRequest.WorkerVersionCapabilities = &commonpb.WorkerVersionCapabilities{
BuildId: wtp.workerBuildID,
}
}
return builtRequest
}

// Poll the workflow task queue and update the num_poller metric
Expand Down Expand Up @@ -824,7 +864,11 @@ func newGetHistoryPageFunc(

func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowservice.WorkflowServiceClient, params workerExecutionParameters) *activityTaskPoller {
return &activityTaskPoller{
basePoller: basePoller{metricsHandler: params.MetricsHandler, stopC: params.WorkerStopChannel},
basePoller: basePoller{
metricsHandler: params.MetricsHandler,
stopC: params.WorkerStopChannel,
workerBuildID: params.WorkerBuildID,
},
taskHandler: taskHandler,
service: service,
namespace: params.Namespace,
Expand Down Expand Up @@ -855,6 +899,11 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (interface{}, error) {
Identity: atp.identity,
TaskQueueMetadata: &taskqueuepb.TaskQueueMetadata{MaxTasksPerSecond: &types.DoubleValue{Value: atp.activitiesPerSecond}},
}
if atp.workerBuildID != "" {
request.WorkerVersionCapabilities = &commonpb.WorkerVersionCapabilities{
BuildId: atp.workerBuildID,
}
}

response, err := atp.pollActivityTaskQueue(ctx, request)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ type (
// a default option.
Identity string

// The worker's build ID used for versioning, if one was set.
WorkerBuildID string

MetricsHandler metrics.Handler

Logger log.Logger
Expand Down Expand Up @@ -1499,6 +1502,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
ConcurrentWorkflowTaskExecutionSize: options.MaxConcurrentWorkflowTaskExecutionSize,
MaxConcurrentWorkflowTaskQueuePollers: options.MaxConcurrentWorkflowTaskPollers,
Identity: client.identity,
WorkerBuildID: options.BuildIDForVersioning,
MetricsHandler: client.metricsHandler.WithTags(metrics.TaskQueueTags(taskQueue)),
Logger: client.logger,
EnableLoggingInReplay: options.EnableLoggingInReplay,
Expand Down
45 changes: 45 additions & 0 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,51 @@ func (wc *WorkflowClient) ResetWorkflowExecution(ctx context.Context, request *w
return resp, nil
}

// UpdateWorkerBuildIDCompatability allows you to update the worker-build-id based version sets for a particular
// task queue. This is used in conjunction with workers who specify their build id and thus opt into the
// feature.
func (wc *WorkflowClient) UpdateWorkerBuildIDCompatability(ctx context.Context, options *UpdateWorkerBuildIDCompatabilityOptions) error {
if err := wc.ensureInitialized(); err != nil {
return err
}

request, err := options.validateAndConvertToProto()
if err != nil {
return err
}
request.Namespace = wc.namespace

grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
defer cancel()
_, err = wc.workflowService.UpdateWorkerBuildIdCompatability(grpcCtx, request)
return err
}

// GetWorkerBuildIDCompatability returns the worker-build-id based version sets for a particular task queue.
func (wc *WorkflowClient) GetWorkerBuildIDCompatability(ctx context.Context, options *GetWorkerBuildIDCompatabilityOptions) (*WorkerBuildIDVersionSets, error) {
if options.MaxSets < 0 {
return nil, errors.New("maxDepth must be >= 0")
}
if err := wc.ensureInitialized(); err != nil {
return nil, err
}

grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
defer cancel()

request := &workflowservice.GetWorkerBuildIdCompatabilityRequest{
Namespace: wc.namespace,
TaskQueue: options.TaskQueue,
MaxSets: int32(options.MaxSets),
}
resp, err := wc.workflowService.GetWorkerBuildIdCompatability(grpcCtx, request)
if err != nil {
return nil, err
}
converted := workerVersionSetsFromProtoResponse(resp)
return converted, nil
}

func (wc *WorkflowClient) UpdateWorkflowWithOptions(
ctx context.Context,
req *UpdateWorkflowWithOptionsRequest,
Expand Down
6 changes: 6 additions & 0 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ type (
// and aliased names when not using string names when executing child
// workflow or activities.
DisableRegistrationAliasing bool

// Optional: If set, opts this worker into the worker build id based versioning
// feature. It will only operate on workflows it claims to be compatible with.
// Additionally, if this is set it will serve as the binary checksum for the worker.
// NOTE: Experimental
BuildIDForVersioning string
}
)

Expand Down
Loading