Skip to content

Commit

Permalink
bugfix: consistently plumb failureConverter (#956)
Browse files Browse the repository at this point in the history
  • Loading branch information
asonawalla authored Nov 15, 2022
1 parent 34f71ac commit 68010f8
Showing 1 changed file with 59 additions and 55 deletions.
114 changes: 59 additions & 55 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,11 @@ type (
// reaches the end state, such as workflow finished successfully or timeout.
// The user can use this to start using a functor like below and get the workflow execution result, as EncodedValue
// Either by
// ExecuteWorkflow(options, "workflowTypeName", arg1, arg2, arg3)
// or
// ExecuteWorkflow(options, workflowExecuteFn, arg1, arg2, arg3)
//
// ExecuteWorkflow(options, "workflowTypeName", arg1, arg2, arg3)
// or
// ExecuteWorkflow(options, workflowExecuteFn, arg1, arg2, arg3)
//
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
// subjected to change in the future.
// NOTE: the context.Context should have a fairly large timeout, since workflow execution may take a while to be finished
Expand Down Expand Up @@ -265,12 +267,13 @@ func (wc *WorkflowClient) GetWorkflow(ctx context.Context, workflowID string, ru
}

return &workflowRunImpl{
workflowID: workflowID,
firstRunID: runID,
currentRunID: &runIDCell,
iterFn: iterFn,
dataConverter: wc.dataConverter,
registry: wc.registry,
workflowID: workflowID,
firstRunID: runID,
currentRunID: &runIDCell,
iterFn: iterFn,
dataConverter: wc.dataConverter,
failureConverter: wc.failureConverter,
registry: wc.registry,
}
}

Expand Down Expand Up @@ -532,10 +535,10 @@ func (wc *WorkflowClient) RecordActivityHeartbeatByID(ctx context.Context,

// ListClosedWorkflow gets closed workflow executions based on request filters
// The errors it can throw:
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NamespaceNotFound
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NamespaceNotFound
func (wc *WorkflowClient) ListClosedWorkflow(ctx context.Context, request *workflowservice.ListClosedWorkflowExecutionsRequest) (*workflowservice.ListClosedWorkflowExecutionsResponse, error) {
if err := wc.ensureInitialized(); err != nil {
return nil, err
Expand All @@ -555,10 +558,10 @@ func (wc *WorkflowClient) ListClosedWorkflow(ctx context.Context, request *workf

// ListOpenWorkflow gets open workflow executions based on request filters
// The errors it can throw:
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NamespaceNotFound
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NamespaceNotFound
func (wc *WorkflowClient) ListOpenWorkflow(ctx context.Context, request *workflowservice.ListOpenWorkflowExecutionsRequest) (*workflowservice.ListOpenWorkflowExecutionsResponse, error) {
if err := wc.ensureInitialized(); err != nil {
return nil, err
Expand Down Expand Up @@ -677,10 +680,10 @@ func (wc *WorkflowClient) GetSearchAttributes(ctx context.Context) (*workflowser

// DescribeWorkflowExecution returns information about the specified workflow execution.
// The errors it can return:
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NotFound
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NotFound
func (wc *WorkflowClient) DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*workflowservice.DescribeWorkflowExecutionResponse, error) {
if err := wc.ensureInitialized(); err != nil {
return nil, err
Expand Down Expand Up @@ -710,11 +713,11 @@ func (wc *WorkflowClient) DescribeWorkflowExecution(ctx context.Context, workflo
// - queryType is the type of the query.
// - args... are the optional query parameters.
// The errors it can return:
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NotFound
// - serviceerror.QueryFailed
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NotFound
// - serviceerror.QueryFailed
func (wc *WorkflowClient) QueryWorkflow(ctx context.Context, workflowID string, runID string, queryType string, args ...interface{}) (converter.EncodedValue, error) {
if err := wc.ensureInitialized(); err != nil {
return nil, err
Expand Down Expand Up @@ -772,11 +775,11 @@ type QueryWorkflowWithOptionsResponse struct {
// QueryWorkflowWithOptions queries a given workflow execution and returns the query result synchronously.
// See QueryWorkflowWithOptionsRequest and QueryWorkflowWithOptionsResult for more information.
// The errors it can return:
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NotFound
// - serviceerror.QueryFailed
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NotFound
// - serviceerror.QueryFailed
func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request *QueryWorkflowWithOptionsRequest) (*QueryWorkflowWithOptionsResponse, error) {
if err := wc.ensureInitialized(); err != nil {
return nil, err
Expand Down Expand Up @@ -827,10 +830,10 @@ func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request
// - taskqueue name of taskqueue
// - taskqueueType type of taskqueue, can be workflow or activity
// The errors it can return:
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NotFound
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NotFound
func (wc *WorkflowClient) DescribeTaskQueue(ctx context.Context, taskQueue string, taskQueueType enumspb.TaskQueueType) (*workflowservice.DescribeTaskQueueResponse, error) {
if err := wc.ensureInitialized(); err != nil {
return nil, err
Expand Down Expand Up @@ -955,7 +958,7 @@ func (wc *WorkflowClient) ensureInitialized() error {

// ScheduleClient implements Client.ScheduleClient.
func (wc *WorkflowClient) ScheduleClient() ScheduleClient {
return &scheduleClient {
return &scheduleClient{
workflowClient: wc,
}
}
Expand Down Expand Up @@ -986,10 +989,10 @@ func (wc *WorkflowClient) Close() {

// Register a namespace with temporal server
// The errors it can throw:
// - NamespaceAlreadyExistsError
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - NamespaceAlreadyExistsError
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
func (nc *namespaceClient) Register(ctx context.Context, request *workflowservice.RegisterNamespaceRequest) error {
grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
defer cancel()
Expand All @@ -1003,10 +1006,10 @@ func (nc *namespaceClient) Register(ctx context.Context, request *workflowservic
// NamespaceConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics.
// ReplicationConfiguration - replication config like clusters and active cluster name
// The errors it can throw:
// - serviceerror.NamespaceNotFound
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NamespaceNotFound
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
func (nc *namespaceClient) Describe(ctx context.Context, namespace string) (*workflowservice.DescribeNamespaceResponse, error) {
request := &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
Expand All @@ -1023,10 +1026,10 @@ func (nc *namespaceClient) Describe(ctx context.Context, namespace string) (*wor

// Update a namespace.
// The errors it can throw:
// - serviceerror.NamespaceNotFound
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
// - serviceerror.NamespaceNotFound
// - serviceerror.InvalidArgument
// - serviceerror.Internal
// - serviceerror.Unavailable
func (nc *namespaceClient) Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error {
grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
defer cancel()
Expand Down Expand Up @@ -1447,13 +1450,14 @@ func (w *workflowClientInterceptor) SignalWithStartWorkflow(

curRunIDCell := util.PopulatedOnceCell(response.GetRunId())
return &workflowRunImpl{
workflowType: in.WorkflowType,
workflowID: in.Options.ID,
firstRunID: response.GetRunId(),
currentRunID: &curRunIDCell,
iterFn: iterFn,
dataConverter: w.client.dataConverter,
registry: w.client.registry,
workflowType: in.WorkflowType,
workflowID: in.Options.ID,
firstRunID: response.GetRunId(),
currentRunID: &curRunIDCell,
iterFn: iterFn,
dataConverter: w.client.dataConverter,
failureConverter: w.client.failureConverter,
registry: w.client.registry,
}, nil
}

Expand Down

0 comments on commit 68010f8

Please sign in to comment.