Skip to content

Commit

Permalink
Client APIs for workflow update (#989)
Browse files Browse the repository at this point in the history
Update sdk-go for latest interaction API

- Adds UpdateWorkflowExecution to the client API and relevant
  interceptors
- Adopts expectation that interaction invocations will be attached to a
  PollWorkflowTaskQueueResponse (had previously been attached to the WFT
  Started event).
- Comment formatting for Go 1.19+
- Adopts a handle-based approach to Update client API
- Replace deprecated uses of io/ioutil
  • Loading branch information
Matt McShane authored Jan 4, 2023
1 parent 06e474c commit 50f633a
Show file tree
Hide file tree
Showing 18 changed files with 772 additions and 302 deletions.
43 changes: 37 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type (

// ScheduleDescription describes the current Schedule details from ScheduleHandle.Describe.
// NOTE: Experimental
ScheduleDescription = internal.ScheduleDescription
ScheduleDescription = internal.ScheduleDescription

// Schedule describes a created schedule.
// NOTE: Experimental
Expand Down Expand Up @@ -187,6 +187,16 @@ type (
// NOTE: Experimental
ScheduleBackfillOptions = internal.ScheduleBackfillOptions

// UpdateWorkflowWithOptionsRequest encapsulates the parameters for
// sending an update to a workflow execution.
// NOTE: Experimental
UpdateWorkflowWithOptionsRequest = internal.UpdateWorkflowWithOptionsRequest

// WorkflowUpdateHandle represents a running or completed workflow
// execution update and gives the holder access to the outcome of the same.
// NOTE: Experimental
WorkflowUpdateHandle = internal.WorkflowUpdateHandle

// Client is the client for starting and getting information about a workflow executions as well as
// completing activities asynchronously.
Client interface {
Expand Down Expand Up @@ -477,6 +487,25 @@ type (
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)

// UpdateWorkflow issues an update request to the specified
// workflow execution and returns the result synchronously. Calling this
// function is equivalent to calling UpdateWorkflowOptions with
// the same arguments and indicating that the RPC call should wait for
// completion of the update process.
// NOTE: Experimental
UpdateWorkflow(ctx context.Context, workflowID string, workflowRunID string, updateName string, args ...interface{}) (WorkflowUpdateHandle, error)

// UpdateWorkflowWithOptions issues an update request to the
// specified workflow execution and returns a handle to the update that
// is running in in parallel with the calling thread. Errors returned
// from the server will be exposed through the return value of
// WorkflowUpdateHandle.Get(). Errors that occur before the
// update is requested (e.g. if the required workflow ID field is
// missing from the UpdateWorkflowWithOptionsRequest) are returned
// directly from this function call.
// NOTE: Experimental
UpdateWorkflowWithOptions(ctx context.Context, request *UpdateWorkflowWithOptionsRequest) (WorkflowUpdateHandle, error)

// WorkflowService provides access to the underlying gRPC service. This should only be used for advanced use cases
// that cannot be accomplished via other Client methods. Unlike calls to other Client methods, calls directly to the
// service are not configured with internal semantics such as automatic retries.
Expand Down Expand Up @@ -611,8 +640,9 @@ var _ internal.NamespaceClient = NamespaceClient(nil)
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat") and then got response from calling Client.DescribeWorkflowExecution.
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
// which can be decoded by using:
// var result string // This need to be same type as the one passed to RecordHeartbeat
// NewValue(data).Get(&result)
//
// var result string // This need to be same type as the one passed to RecordHeartbeat
// NewValue(data).Get(&result)
func NewValue(data *commonpb.Payloads) converter.EncodedValue {
return internal.NewValue(data)
}
Expand All @@ -621,9 +651,10 @@ func NewValue(data *commonpb.Payloads) converter.EncodedValue {
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat", 123) and then got response from calling Client.DescribeWorkflowExecution.
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
// which can be decoded by using:
// var result1 string
// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
// NewValues(data).Get(&result1, &result2)
//
// var result1 string
// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
// NewValues(data).Get(&result1, &result2)
func NewValues(data *commonpb.Payloads) converter.EncodedValues {
return internal.NewValues(data)
}
Expand Down
4 changes: 2 additions & 2 deletions contrib/tools/workflowcheck/workflow/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"fmt"
"go/ast"
"go/types"
"io/ioutil"
"log"
"os"
"regexp"
"strings"

Expand Down Expand Up @@ -202,7 +202,7 @@ func (configFileFlag) String() string { return "<built-in>" }

func (c configFileFlag) Set(flag string) error {
// Load the file into YAML
b, err := ioutil.ReadFile(flag)
b, err := os.ReadFile(flag)
if err != nil {
return fmt.Errorf("failed reading config: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions converter/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"

Expand Down Expand Up @@ -117,7 +116,7 @@ func (*zlibCodec) Decode(payloads []*commonpb.Payload) ([]*commonpb.Payload, err
return payloads, err
}
// Read all and unmarshal
b, err := ioutil.ReadAll(r)
b, err := io.ReadAll(r)
if closeErr := r.Close(); closeErr != nil && err == nil {
err = closeErr
}
Expand Down
4 changes: 4 additions & 0 deletions interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ type ClientQueryWorkflowInput = internal.ClientQueryWorkflowInput
// ScheduleClientInterceptor.CreateSchedule.
type ScheduleClientCreateInput = internal.ScheduleClientCreateInput

// ClientUpdateWorkflowInput is input for
// ClientOutoundInterceptor.UpdateWorkflow.
type ClientUpdateWorkflowInput = internal.ClientUpdateWorkflowInput

// Header provides Temporal header information from the context for reading or
// writing during specific interceptor calls.
//
Expand Down
31 changes: 26 additions & 5 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,25 @@ type (
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)

// UpdateWorkflow issues an update request to the specified
// workflow execution and returns the result synchronously. Calling this
// function is equivalent to calling UpdateWorkflowWithOptions with
// the same arguments and indicating that the RPC call should wait for
// completion of the update process.
// NOTE: Experimental
UpdateWorkflow(ctx context.Context, workflowID string, workflowRunID string, updateName string, args ...interface{}) (WorkflowUpdateHandle, error)

// UpdateWorkflowWithOptions issues an update request to the
// specified workflow execution and returns a handle to the update that
// is running in in parallel with the calling thread. Errors returned
// from the server will be exposed through the return value of
// WorkflowExecutionUpdateHandle.Get(). Errors that occur before the
// update is requested (e.g. if the required workflow ID field is
// missing from the UpdateWorkflowWithOptionsRequest) are returned
// directly from this function call.
// NOTE: Experimental
UpdateWorkflowWithOptions(ctx context.Context, request *UpdateWorkflowWithOptionsRequest) (WorkflowUpdateHandle, error)

// WorkflowService provides access to the underlying gRPC service. This should only be used for advanced use cases
// that cannot be accomplished via other Client methods. Unlike calls to other Client methods, calls directly to the
// service are not configured with internal semantics such as automatic retries.
Expand Down Expand Up @@ -829,8 +848,9 @@ func newNamespaceServiceClient(workflowServiceClient workflowservice.WorkflowSer
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat") and then got response from calling Client.DescribeWorkflowExecution.
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
// which can be decoded by using:
// var result string // This need to be same type as the one passed to RecordHeartbeat
// NewValue(data).Get(&result)
//
// var result string // This need to be same type as the one passed to RecordHeartbeat
// NewValue(data).Get(&result)
func NewValue(data *commonpb.Payloads) converter.EncodedValue {
return newEncodedValue(data, nil)
}
Expand All @@ -839,9 +859,10 @@ func NewValue(data *commonpb.Payloads) converter.EncodedValue {
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat", 123) and then got response from calling Client.DescribeWorkflowExecution.
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
// which can be decoded by using:
// var result1 string
// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
// NewValues(data).Get(&result1, &result2)
//
// var result1 string
// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
// NewValues(data).Get(&result1, &result2)
func NewValues(data *commonpb.Payloads) converter.EncodedValues {
return newEncodedValues(data, nil)
}
9 changes: 4 additions & 5 deletions internal/cmd/tools/copyright/licensegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"bufio"
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -69,7 +68,7 @@ var (
// command line utility that adds license header
// to the source files. Usage as follows:
//
// ./cmd/tools/copyright/licensegen.go
// ./cmd/tools/copyright/licensegen.go
func main() {

var cfg config
Expand All @@ -92,7 +91,7 @@ func newAddLicenseHeaderTask(cfg *config) *addLicenseHeaderTask {
}

func (task *addLicenseHeaderTask) run() error {
data, err := ioutil.ReadFile(task.config.rootDir + "/" + licenseFileName)
data, err := os.ReadFile(task.config.rootDir + "/" + licenseFileName)
if err != nil {
return fmt.Errorf("error reading license file, errr=%v", err.Error())
}
Expand Down Expand Up @@ -157,12 +156,12 @@ func (task *addLicenseHeaderTask) handleFile(path string, fileInfo os.FileInfo,

// Used as part of the cli to write licence headers on files, does not use user supplied input so marked as nosec
// #nosec
data, err := ioutil.ReadFile(path)
data, err := os.ReadFile(path)
if err != nil {
return err
}

return ioutil.WriteFile(path, []byte(task.license+string(data)), defaultFilePerms)
return os.WriteFile(path, []byte(task.license+string(data)), defaultFilePerms)
}

func isFileAutogenerated(_ string) bool {
Expand Down
27 changes: 26 additions & 1 deletion internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,10 @@ type WorkflowOutboundInterceptor interface {
// SetQueryHandler intercepts workflow.SetQueryHandler.
SetQueryHandler(ctx Context, queryType string, handler interface{}) error

SetUpdateHandler(ctx Context, updateName string, handler interface{}, opts UpdateOptions) error
// SetUpdateHandler intercepts workflow.SetUpdateHandler.
//
// NOTE: Experimental
SetUpdateHandler(ctx Context, updateName string, handler interface{}, opts UpdateHandlerOptions) error

// IsReplaying intercepts workflow.IsReplaying.
IsReplaying(ctx Context) bool
Expand Down Expand Up @@ -299,9 +302,31 @@ type ClientOutboundInterceptor interface {
// interceptor.Header will return a non-nil map for this context.
QueryWorkflow(context.Context, *ClientQueryWorkflowInput) (converter.EncodedValue, error)

// UpdateWorkflow intercepts client.Client.UpdateWorkflow
// interceptor.Header will return a non-nil map for this context.
//
// NOTE: Experimental
UpdateWorkflow(context.Context, *ClientUpdateWorkflowInput) (WorkflowUpdateHandle, error)

mustEmbedClientOutboundInterceptorBase()
}

// ClientUpdateWorkflowInput is the input to
// ClientOutboundInterceptor.UpdateWorkflow
//
// NOTE: Experimental
type ClientUpdateWorkflowInput struct {
UpdateID string
WorkflowID string
UpdateName string
Args []interface{}
RunID string
FirstExecutionRunID string

// this isn't upstream in API yet
// WaitFor enumspb.WorkflowExecutionUpdateWaitEvent
}

// ScheduleClientCreateInput is the input to
// ClientOutboundInterceptor.CreateSchedule.
type ScheduleClientCreateInput struct {
Expand Down
9 changes: 8 additions & 1 deletion internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (w *WorkflowOutboundInterceptorBase) SetQueryHandler(ctx Context, queryType
}

// SetUpdateHandler implements WorkflowOutboundInterceptor.SetUpdateHandler.
func (w *WorkflowOutboundInterceptorBase) SetUpdateHandler(ctx Context, updateName string, handler interface{}, opts UpdateOptions) error {
func (w *WorkflowOutboundInterceptorBase) SetUpdateHandler(ctx Context, updateName string, handler interface{}, opts UpdateHandlerOptions) error {
return w.Next.SetUpdateHandler(ctx, updateName, handler, opts)
}

Expand Down Expand Up @@ -389,6 +389,13 @@ type ClientOutboundInterceptorBase struct {

var _ ClientOutboundInterceptor = &ClientOutboundInterceptorBase{}

func (c *ClientOutboundInterceptorBase) UpdateWorkflow(
ctx context.Context,
in *ClientUpdateWorkflowInput,
) (WorkflowUpdateHandle, error) {
return c.Next.UpdateWorkflow(ctx, in)
}

// ExecuteWorkflow implements ClientOutboundInterceptor.ExecuteWorkflow.
func (c *ClientOutboundInterceptorBase) ExecuteWorkflow(
ctx context.Context,
Expand Down
1 change: 0 additions & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,6 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessInteraction(
commands: weh.commandsHelper,
},
)
weh.workflowDefinition.OnWorkflowTaskStarted(weh.deadlockDetectionTimeout)
return nil
}

Expand Down
5 changes: 0 additions & 5 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,9 +1183,6 @@ func skipDeterministicCheckForCommand(d *commandpb.Command) bool {
if markerName == versionMarkerName || markerName == mutableSideEffectMarkerName {
return true
}
case enumspb.COMMAND_TYPE_ACCEPT_WORKFLOW_UPDATE,
enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION:
return true
}
return false
}
Expand All @@ -1197,8 +1194,6 @@ func skipDeterministicCheckForEvent(e *historypb.HistoryEvent) bool {
if markerName == versionMarkerName || markerName == mutableSideEffectMarkerName {
return true
}
// case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED:
// return true
}
return false
}
Expand Down
19 changes: 19 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
interactionpb "go.temporal.io/api/interaction/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/api/workflowservicemock/v1"

Expand Down Expand Up @@ -1210,6 +1211,23 @@ func (aw *WorkflowReplayer) ReplayWorkflowExecution(ctx context.Context, service
return aw.replayWorkflowHistory(logger, service, namespace, &history)
}

// inferInvocations extracts the set of *interactionpb.Invocation objects that
// should be attached to a workflow task (i.e. the
// PollWorkflowTaskQueueResponse.Interactions) if that task were to carry the
// provided slice of history events.
func inferInvocations(events []*historypb.HistoryEvent) []*interactionpb.Invocation {
var invocations []*interactionpb.Invocation
for _, e := range events {
if attrs := e.GetWorkflowUpdateAcceptedEventAttributes(); attrs != nil {
invocations = append(invocations, &interactionpb.Invocation{
Meta: attrs.Meta,
Input: attrs.Input,
})
}
}
return invocations
}

func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service workflowservice.WorkflowServiceClient, namespace string, history *historypb.History) error {
taskQueue := "ReplayTaskQueue"
events := history.Events
Expand Down Expand Up @@ -1245,6 +1263,7 @@ func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service wor
WorkflowExecution: execution,
History: history,
PreviousStartedEventId: math.MaxInt64,
Interactions: inferInvocations(history.Events),
}

iterator := &historyIteratorImpl{
Expand Down
Loading

0 comments on commit 50f633a

Please sign in to comment.