Skip to content

Commit

Permalink
[#527]: fix: use LA pool for the local activities execution
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Jun 10, 2024
2 parents 64752ca + 80d80b2 commit 595f4fc
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 20 deletions.
3 changes: 0 additions & 3 deletions .vscode/settings.json

This file was deleted.

2 changes: 1 addition & 1 deletion aggregatedpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {

case *internal.ExecuteLocalActivity:
wp.log.Debug("local activity request", zap.Uint64("ID", msg.ID))
params := command.LocalActivityParams(wp.env, NewLocalActivityFn(msg.Header, wp.codec, wp.pool, wp.log).execute, msg.Payloads, msg.Header)
params := command.LocalActivityParams(wp.env, wp.la, msg.Payloads, msg.Header)
activityID := wp.env.ExecuteLocalActivity(params, wp.createLocalActivityCallback(msg.ID))
wp.canceller.Register(msg.ID, func() error {
wp.log.Debug("registering local activity canceller", zap.String("activityID", activityID.String()))
Expand Down
22 changes: 10 additions & 12 deletions aggregatedpool/local_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@ import (
)

type LocalActivityFn struct {
header *commonpb.Header
codec common.Codec
pool common.Pool
log *zap.Logger
seqID uint64
codec common.Codec
pool common.Pool
log *zap.Logger
seqID uint64
}

func NewLocalActivityFn(header *commonpb.Header, codec common.Codec, pool common.Pool, log *zap.Logger) *LocalActivityFn {
func NewLocalActivityFn(codec common.Codec, pool common.Pool, log *zap.Logger) *LocalActivityFn {
return &LocalActivityFn{
header: header,
codec: codec,
pool: pool,
log: log,
codec: codec,
pool: pool,
log: log,
}
}

func (la *LocalActivityFn) execute(ctx context.Context, args *commonpb.Payloads) (*commonpb.Payloads, error) {
func (la *LocalActivityFn) ExecuteLA(ctx context.Context, hdr *commonpb.Header, args *commonpb.Payloads) (*commonpb.Payloads, error) {
const op = errors.Op("activity_pool_execute_activity")

var info = tActivity.GetInfo(ctx)
Expand All @@ -53,7 +51,7 @@ func (la *LocalActivityFn) execute(ctx context.Context, args *commonpb.Payloads)
Info: info,
},
Payloads: args,
Header: la.header,
Header: hdr,
}

la.log.Debug("executing local activity fn", zap.Uint64("ID", msg.ID), zap.String("task-queue", info.TaskQueue), zap.String("la ID", info.ActivityID))
Expand Down
14 changes: 12 additions & 2 deletions aggregatedpool/workflow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package aggregatedpool

import (
"context"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -32,6 +33,7 @@ import (
*/

type Callback func() error
type LaFn func(ctx context.Context, hdr *commonpb.Header, args *commonpb.Payloads) (*commonpb.Payloads, error)

// seqID is global sequence ID
var seqID uint64 //nolint:gochecknoglobals
Expand All @@ -45,6 +47,9 @@ type Workflow struct {
pool common.Pool
rrID string

// LocalActivityFn
la LaFn

env bindings.WorkflowEnvironment
header *commonpb.Header
mq *queue.MessageQueue
Expand All @@ -67,10 +72,12 @@ type Workflow struct {
pldPool *sync.Pool
}

func NewWorkflowDefinition(codec common.Codec, pool common.Pool, log *zap.Logger) *Workflow {
// NewWorkflowDefinition ... WorkflowDefinition Constructor
func NewWorkflowDefinition(codec common.Codec, la LaFn, pool common.Pool, log *zap.Logger) *Workflow {
return &Workflow{
rrID: uuid.NewString(),
log: log,
la: la,
codec: codec,
pool: pool,
pldPool: &sync.Pool{
Expand All @@ -83,9 +90,12 @@ func NewWorkflowDefinition(codec common.Codec, pool common.Pool, log *zap.Logger

// NewWorkflowDefinition ... Workflow should match the WorkflowDefinitionFactory interface (sdk-go/internal/internal_worker.go:463, RegisterWorkflowWithOptions func)
// DO NOT USE THIS FUNCTION DIRECTLY!!!!
// This function called after the constructor above, it is safe to assign fields like that
func (wp *Workflow) NewWorkflowDefinition() bindings.WorkflowDefinition {
return &Workflow{
rrID: uuid.NewString(),
// LocalActivity
la: wp.la,
// updates logic
updateCompleteCb: make(map[string]func(res *internal.Message)),
updateValidateCb: make(map[string]func(res *internal.Message)),
Expand Down Expand Up @@ -143,7 +153,7 @@ func (wp *Workflow) Execute(env bindings.WorkflowEnvironment, header *commonpb.H
)
}

// OnWorkflowTaskStarted is called for each non timed out startWorkflowTask event.
// OnWorkflowTaskStarted is called for each non-timed out startWorkflowTask event.
// Executed after all history events since the previous commands are applied to WorkflowDefinition
// Application level code must be executed from this function only.
// Execute call as well as callbacks called from WorkflowEnvironment functions can only schedule callbacks
Expand Down
5 changes: 4 additions & 1 deletion internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ func (p *Plugin) initPool() error {
dc := data_converter.NewDataConverter(converter.GetDefaultDataConverter())
codec := proto.NewCodec(p.log, dc)

// LA + A definitions
actDef := aggregatedpool.NewActivityDefinition(codec, ap, p.log)
laDef := aggregatedpool.NewLocalActivityFn(codec, ap, p.log)
// ------------------

// ---------- WORKFLOW POOL -------------
wp, err := p.server.NewPool(
Expand All @@ -48,7 +51,7 @@ func (p *Plugin) initPool() error {
return err
}

wfDef := aggregatedpool.NewWorkflowDefinition(codec, wp, p.log)
wfDef := aggregatedpool.NewWorkflowDefinition(codec, laDef.ExecuteLA, wp, p.log)

// get worker information
wi, err := WorkerInfo(codec, wp, p.rrVersion)
Expand Down
3 changes: 2 additions & 1 deletion internal/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,12 @@ func (cmd ExecuteLocalActivity) LocalActivityParams(env bindings.WorkflowEnviron
truTemOptions.RetryPolicy = rp
}

// TODO: should be careful here: header + inputArgs header are pointers and might be changed independently which will cause race
params := bindings.ExecuteLocalActivityParams{
ExecuteLocalActivityOptions: truTemOptions,
ActivityFn: fn,
ActivityType: cmd.Name,
InputArgs: []any{payloads},
InputArgs: []any{header, payloads},
WorkflowInfo: env.WorkflowInfo(),
ScheduledTime: time.Now(),
Header: header,
Expand Down

0 comments on commit 595f4fc

Please sign in to comment.