Skip to content

Commit

Permalink
Propagate cancellation to the client if server is unavailable (#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitarb authored Jun 7, 2021
1 parent 5606103 commit 95f8a07
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 21 deletions.
33 changes: 14 additions & 19 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1670,7 +1670,7 @@ func (i *temporalInvoker) internalHeartBeat(ctx context.Context, details *common
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

err := recordActivityHeartbeat(ctx, i.service, i.metricsScope, i.identity, i.taskToken, details)
err := recordActivityHeartbeat(ctx, i.service, i.identity, i.taskToken, details)

switch err.(type) {
case *CanceledError:
Expand All @@ -1683,10 +1683,19 @@ func (i *temporalInvoker) internalHeartBeat(ctx context.Context, details *common
// later when we have setter on cancel handler.
i.cancelHandler()
isActivityCanceled = true
case nil:
// No error, do nothing.
default:
// Transient errors are getting retried for the duration of the heartbeat timeout.
// The fact that error has been returned means that activity should now be timed out, hence we should
// propagate cancellation to the handler.
if isServiceTransientError(err) {
i.cancelHandler()
isActivityCanceled = true
}
}

// We don't want to bubble temporary errors to the user.
// This error won't be return to user check RecordActivityHeartbeat().
// This error won't be returned to user check RecordActivityHeartbeat().
return isActivityCanceled, err
}

Expand Down Expand Up @@ -1854,14 +1863,7 @@ func createNewCommand(commandType enumspb.CommandType) *commandpb.Command {
}
}

func recordActivityHeartbeat(
ctx context.Context,
service workflowservice.WorkflowServiceClient,
metricsScope tally.Scope,
identity string,
taskToken []byte,
details *commonpb.Payloads,
) error {
func recordActivityHeartbeat(ctx context.Context, service workflowservice.WorkflowServiceClient, identity string, taskToken []byte, details *commonpb.Payloads) error {

namespace := getNamespaceFromActivityCtx(ctx)
request := &workflowservice.RecordActivityTaskHeartbeatRequest{
Expand Down Expand Up @@ -1889,14 +1891,7 @@ func recordActivityHeartbeat(
return heartbeatErr
}

func recordActivityHeartbeatByID(
ctx context.Context,
service workflowservice.WorkflowServiceClient,
metricsScope tally.Scope,
identity string,
namespace, workflowID, runID, activityID string,
details *commonpb.Payloads,
) error {
func recordActivityHeartbeatByID(ctx context.Context, service workflowservice.WorkflowServiceClient, identity, namespace, workflowID, runID, activityID string, details *commonpb.Payloads) error {
request := &workflowservice.RecordActivityTaskHeartbeatByIdRequest{
Namespace: namespace,
WorkflowId: workflowID,
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ func (wc *WorkflowClient) RecordActivityHeartbeat(ctx context.Context, taskToken
if err != nil {
return err
}
return recordActivityHeartbeat(ctx, wc.workflowService, wc.metricsScope, wc.identity, taskToken, data)
return recordActivityHeartbeat(ctx, wc.workflowService, wc.identity, taskToken, data)
}

// RecordActivityHeartbeatByID records heartbeat for an activity.
Expand All @@ -670,7 +670,7 @@ func (wc *WorkflowClient) RecordActivityHeartbeatByID(ctx context.Context,
if err != nil {
return err
}
return recordActivityHeartbeatByID(ctx, wc.workflowService, wc.metricsScope, wc.identity, namespace, workflowID, runID, activityID, data)
return recordActivityHeartbeatByID(ctx, wc.workflowService, wc.identity, namespace, workflowID, runID, activityID, data)
}

// ListClosedWorkflow gets closed workflow executions based on request filters
Expand Down

0 comments on commit 95f8a07

Please sign in to comment.