diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 8c9a04d..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "cSpell.words": ["stretchr"] -} diff --git a/aggregatedpool/handler.go b/aggregatedpool/handler.go index 7129bb1..b79de75 100644 --- a/aggregatedpool/handler.go +++ b/aggregatedpool/handler.go @@ -146,7 +146,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error { case *internal.ExecuteLocalActivity: wp.log.Debug("local activity request", zap.Uint64("ID", msg.ID)) - params := command.LocalActivityParams(wp.env, NewLocalActivityFn(msg.Header, wp.codec, wp.pool, wp.log).execute, msg.Payloads, msg.Header) + params := command.LocalActivityParams(wp.env, wp.la, msg.Payloads, msg.Header) activityID := wp.env.ExecuteLocalActivity(params, wp.createLocalActivityCallback(msg.ID)) wp.canceller.Register(msg.ID, func() error { wp.log.Debug("registering local activity canceller", zap.String("activityID", activityID.String())) diff --git a/aggregatedpool/local_activity.go b/aggregatedpool/local_activity.go index 7453f9c..c922b1a 100644 --- a/aggregatedpool/local_activity.go +++ b/aggregatedpool/local_activity.go @@ -18,23 +18,21 @@ import ( ) type LocalActivityFn struct { - header *commonpb.Header - codec common.Codec - pool common.Pool - log *zap.Logger - seqID uint64 + codec common.Codec + pool common.Pool + log *zap.Logger + seqID uint64 } -func NewLocalActivityFn(header *commonpb.Header, codec common.Codec, pool common.Pool, log *zap.Logger) *LocalActivityFn { +func NewLocalActivityFn(codec common.Codec, pool common.Pool, log *zap.Logger) *LocalActivityFn { return &LocalActivityFn{ - header: header, - codec: codec, - pool: pool, - log: log, + codec: codec, + pool: pool, + log: log, } } -func (la *LocalActivityFn) execute(ctx context.Context, args *commonpb.Payloads) (*commonpb.Payloads, error) { +func (la *LocalActivityFn) ExecuteLA(ctx context.Context, hdr *commonpb.Header, args *commonpb.Payloads) (*commonpb.Payloads, error) { const op = errors.Op("activity_pool_execute_activity") var info = tActivity.GetInfo(ctx) @@ -53,7 +51,7 @@ func (la *LocalActivityFn) execute(ctx context.Context, args *commonpb.Payloads) Info: info, }, Payloads: args, - Header: la.header, + Header: hdr, } la.log.Debug("executing local activity fn", zap.Uint64("ID", msg.ID), zap.String("task-queue", info.TaskQueue), zap.String("la ID", info.ActivityID)) diff --git a/aggregatedpool/workflow.go b/aggregatedpool/workflow.go index 92f5d40..2e79c7f 100644 --- a/aggregatedpool/workflow.go +++ b/aggregatedpool/workflow.go @@ -1,6 +1,7 @@ package aggregatedpool import ( + "context" "fmt" "sync" "sync/atomic" @@ -32,6 +33,7 @@ import ( */ type Callback func() error +type LaFn func(ctx context.Context, hdr *commonpb.Header, args *commonpb.Payloads) (*commonpb.Payloads, error) // seqID is global sequence ID var seqID uint64 //nolint:gochecknoglobals @@ -45,6 +47,9 @@ type Workflow struct { pool common.Pool rrID string + // LocalActivityFn + la LaFn + env bindings.WorkflowEnvironment header *commonpb.Header mq *queue.MessageQueue @@ -67,10 +72,12 @@ type Workflow struct { pldPool *sync.Pool } -func NewWorkflowDefinition(codec common.Codec, pool common.Pool, log *zap.Logger) *Workflow { +// NewWorkflowDefinition ... WorkflowDefinition Constructor +func NewWorkflowDefinition(codec common.Codec, la LaFn, pool common.Pool, log *zap.Logger) *Workflow { return &Workflow{ rrID: uuid.NewString(), log: log, + la: la, codec: codec, pool: pool, pldPool: &sync.Pool{ @@ -83,9 +90,12 @@ func NewWorkflowDefinition(codec common.Codec, pool common.Pool, log *zap.Logger // NewWorkflowDefinition ... Workflow should match the WorkflowDefinitionFactory interface (sdk-go/internal/internal_worker.go:463, RegisterWorkflowWithOptions func) // DO NOT USE THIS FUNCTION DIRECTLY!!!! +// This function called after the constructor above, it is safe to assign fields like that func (wp *Workflow) NewWorkflowDefinition() bindings.WorkflowDefinition { return &Workflow{ rrID: uuid.NewString(), + // LocalActivity + la: wp.la, // updates logic updateCompleteCb: make(map[string]func(res *internal.Message)), updateValidateCb: make(map[string]func(res *internal.Message)), @@ -143,7 +153,7 @@ func (wp *Workflow) Execute(env bindings.WorkflowEnvironment, header *commonpb.H ) } -// OnWorkflowTaskStarted is called for each non timed out startWorkflowTask event. +// OnWorkflowTaskStarted is called for each non-timed out startWorkflowTask event. // Executed after all history events since the previous commands are applied to WorkflowDefinition // Application level code must be executed from this function only. // Execute call as well as callbacks called from WorkflowEnvironment functions can only schedule callbacks diff --git a/internal.go b/internal.go index c3965bb..89caf35 100644 --- a/internal.go +++ b/internal.go @@ -28,7 +28,10 @@ func (p *Plugin) initPool() error { dc := data_converter.NewDataConverter(converter.GetDefaultDataConverter()) codec := proto.NewCodec(p.log, dc) + // LA + A definitions actDef := aggregatedpool.NewActivityDefinition(codec, ap, p.log) + laDef := aggregatedpool.NewLocalActivityFn(codec, ap, p.log) + // ------------------ // ---------- WORKFLOW POOL ------------- wp, err := p.server.NewPool( @@ -48,7 +51,7 @@ func (p *Plugin) initPool() error { return err } - wfDef := aggregatedpool.NewWorkflowDefinition(codec, wp, p.log) + wfDef := aggregatedpool.NewWorkflowDefinition(codec, laDef.ExecuteLA, wp, p.log) // get worker information wi, err := WorkerInfo(codec, wp, p.rrVersion) diff --git a/internal/protocol.go b/internal/protocol.go index 12a8a8c..50da6d1 100644 --- a/internal/protocol.go +++ b/internal/protocol.go @@ -347,11 +347,12 @@ func (cmd ExecuteLocalActivity) LocalActivityParams(env bindings.WorkflowEnviron truTemOptions.RetryPolicy = rp } + // TODO: should be careful here: header + inputArgs header are pointers and might be changed independently which will cause race params := bindings.ExecuteLocalActivityParams{ ExecuteLocalActivityOptions: truTemOptions, ActivityFn: fn, ActivityType: cmd.Name, - InputArgs: []any{payloads}, + InputArgs: []any{header, payloads}, WorkflowInfo: env.WorkflowInfo(), ScheduledTime: time.Now(), Header: header,