diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index fc9bdc477..78875553a 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -194,9 +194,11 @@ type ( // reaches the end state, such as workflow finished successfully or timeout. // The user can use this to start using a functor like below and get the workflow execution result, as EncodedValue // Either by -// ExecuteWorkflow(options, "workflowTypeName", arg1, arg2, arg3) -// or -// ExecuteWorkflow(options, workflowExecuteFn, arg1, arg2, arg3) +// +// ExecuteWorkflow(options, "workflowTypeName", arg1, arg2, arg3) +// or +// ExecuteWorkflow(options, workflowExecuteFn, arg1, arg2, arg3) +// // The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is // subjected to change in the future. // NOTE: the context.Context should have a fairly large timeout, since workflow execution may take a while to be finished @@ -265,12 +267,13 @@ func (wc *WorkflowClient) GetWorkflow(ctx context.Context, workflowID string, ru } return &workflowRunImpl{ - workflowID: workflowID, - firstRunID: runID, - currentRunID: &runIDCell, - iterFn: iterFn, - dataConverter: wc.dataConverter, - registry: wc.registry, + workflowID: workflowID, + firstRunID: runID, + currentRunID: &runIDCell, + iterFn: iterFn, + dataConverter: wc.dataConverter, + failureConverter: wc.failureConverter, + registry: wc.registry, } } @@ -532,10 +535,10 @@ func (wc *WorkflowClient) RecordActivityHeartbeatByID(ctx context.Context, // ListClosedWorkflow gets closed workflow executions based on request filters // The errors it can throw: -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable -// - serviceerror.NamespaceNotFound +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable +// - serviceerror.NamespaceNotFound func (wc *WorkflowClient) ListClosedWorkflow(ctx context.Context, request *workflowservice.ListClosedWorkflowExecutionsRequest) (*workflowservice.ListClosedWorkflowExecutionsResponse, error) { if err := wc.ensureInitialized(); err != nil { return nil, err @@ -555,10 +558,10 @@ func (wc *WorkflowClient) ListClosedWorkflow(ctx context.Context, request *workf // ListOpenWorkflow gets open workflow executions based on request filters // The errors it can throw: -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable -// - serviceerror.NamespaceNotFound +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable +// - serviceerror.NamespaceNotFound func (wc *WorkflowClient) ListOpenWorkflow(ctx context.Context, request *workflowservice.ListOpenWorkflowExecutionsRequest) (*workflowservice.ListOpenWorkflowExecutionsResponse, error) { if err := wc.ensureInitialized(); err != nil { return nil, err @@ -677,10 +680,10 @@ func (wc *WorkflowClient) GetSearchAttributes(ctx context.Context) (*workflowser // DescribeWorkflowExecution returns information about the specified workflow execution. // The errors it can return: -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable -// - serviceerror.NotFound +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable +// - serviceerror.NotFound func (wc *WorkflowClient) DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*workflowservice.DescribeWorkflowExecutionResponse, error) { if err := wc.ensureInitialized(); err != nil { return nil, err @@ -710,11 +713,11 @@ func (wc *WorkflowClient) DescribeWorkflowExecution(ctx context.Context, workflo // - queryType is the type of the query. // - args... are the optional query parameters. // The errors it can return: -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable -// - serviceerror.NotFound -// - serviceerror.QueryFailed +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable +// - serviceerror.NotFound +// - serviceerror.QueryFailed func (wc *WorkflowClient) QueryWorkflow(ctx context.Context, workflowID string, runID string, queryType string, args ...interface{}) (converter.EncodedValue, error) { if err := wc.ensureInitialized(); err != nil { return nil, err @@ -772,11 +775,11 @@ type QueryWorkflowWithOptionsResponse struct { // QueryWorkflowWithOptions queries a given workflow execution and returns the query result synchronously. // See QueryWorkflowWithOptionsRequest and QueryWorkflowWithOptionsResult for more information. // The errors it can return: -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable -// - serviceerror.NotFound -// - serviceerror.QueryFailed +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable +// - serviceerror.NotFound +// - serviceerror.QueryFailed func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request *QueryWorkflowWithOptionsRequest) (*QueryWorkflowWithOptionsResponse, error) { if err := wc.ensureInitialized(); err != nil { return nil, err @@ -827,10 +830,10 @@ func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request // - taskqueue name of taskqueue // - taskqueueType type of taskqueue, can be workflow or activity // The errors it can return: -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable -// - serviceerror.NotFound +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable +// - serviceerror.NotFound func (wc *WorkflowClient) DescribeTaskQueue(ctx context.Context, taskQueue string, taskQueueType enumspb.TaskQueueType) (*workflowservice.DescribeTaskQueueResponse, error) { if err := wc.ensureInitialized(); err != nil { return nil, err @@ -955,7 +958,7 @@ func (wc *WorkflowClient) ensureInitialized() error { // ScheduleClient implements Client.ScheduleClient. func (wc *WorkflowClient) ScheduleClient() ScheduleClient { - return &scheduleClient { + return &scheduleClient{ workflowClient: wc, } } @@ -986,10 +989,10 @@ func (wc *WorkflowClient) Close() { // Register a namespace with temporal server // The errors it can throw: -// - NamespaceAlreadyExistsError -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable +// - NamespaceAlreadyExistsError +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable func (nc *namespaceClient) Register(ctx context.Context, request *workflowservice.RegisterNamespaceRequest) error { grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) defer cancel() @@ -1003,10 +1006,10 @@ func (nc *namespaceClient) Register(ctx context.Context, request *workflowservic // NamespaceConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics. // ReplicationConfiguration - replication config like clusters and active cluster name // The errors it can throw: -// - serviceerror.NamespaceNotFound -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable +// - serviceerror.NamespaceNotFound +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable func (nc *namespaceClient) Describe(ctx context.Context, namespace string) (*workflowservice.DescribeNamespaceResponse, error) { request := &workflowservice.DescribeNamespaceRequest{ Namespace: namespace, @@ -1023,10 +1026,10 @@ func (nc *namespaceClient) Describe(ctx context.Context, namespace string) (*wor // Update a namespace. // The errors it can throw: -// - serviceerror.NamespaceNotFound -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable +// - serviceerror.NamespaceNotFound +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable func (nc *namespaceClient) Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error { grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) defer cancel() @@ -1447,13 +1450,14 @@ func (w *workflowClientInterceptor) SignalWithStartWorkflow( curRunIDCell := util.PopulatedOnceCell(response.GetRunId()) return &workflowRunImpl{ - workflowType: in.WorkflowType, - workflowID: in.Options.ID, - firstRunID: response.GetRunId(), - currentRunID: &curRunIDCell, - iterFn: iterFn, - dataConverter: w.client.dataConverter, - registry: w.client.registry, + workflowType: in.WorkflowType, + workflowID: in.Options.ID, + firstRunID: response.GetRunId(), + currentRunID: &curRunIDCell, + iterFn: iterFn, + dataConverter: w.client.dataConverter, + failureConverter: w.client.failureConverter, + registry: w.client.registry, }, nil }