diff --git a/contrib/resourcetuner/resourcetuner.go b/contrib/resourcetuner/resourcetuner.go index a52a53e28..302af9cb7 100644 --- a/contrib/resourcetuner/resourcetuner.go +++ b/contrib/resourcetuner/resourcetuner.go @@ -77,11 +77,14 @@ func NewResourceBasedTuner(opts ResourceBasedTunerOptions) (worker.WorkerTuner, } nexusSS := &ResourceBasedSlotSupplier{controller: controller, options: defaultWorkflowResourceBasedSlotSupplierOptions()} + sessSS := &ResourceBasedSlotSupplier{controller: controller, + options: defaultActivityResourceBasedSlotSupplierOptions()} compositeTuner, err := worker.NewCompositeTuner(worker.CompositeTunerOptions{ - WorkflowSlotSupplier: wfSS, - ActivitySlotSupplier: actSS, - LocalActivitySlotSupplier: laSS, - NexusSlotSupplier: nexusSS, + WorkflowSlotSupplier: wfSS, + ActivitySlotSupplier: actSS, + LocalActivitySlotSupplier: laSS, + NexusSlotSupplier: nexusSS, + SessionActivitySlotSupplier: sessSS, }) if err != nil { return nil, err diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 61f324f91..288e2f665 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -399,7 +399,7 @@ func (ww *workflowWorker) Stop() { ww.worker.Stop() } -func newSessionWorker(service workflowservice.WorkflowServiceClient, params workerExecutionParameters, overrides *workerOverrides, env *registry, maxConcurrentSessionExecutionSize int) *sessionWorker { +func newSessionWorker(service workflowservice.WorkflowServiceClient, params workerExecutionParameters, env *registry, maxConcurrentSessionExecutionSize int) *sessionWorker { if params.Identity == "" { params.Identity = getWorkerIdentity(params.TaskQueue) } @@ -412,15 +412,14 @@ func newSessionWorker(service workflowservice.WorkflowServiceClient, params work creationTaskqueue := getCreationTaskqueue(params.TaskQueue) params.UserContext = context.WithValue(params.UserContext, sessionEnvironmentContextKey, sessionEnvironment) params.TaskQueue = sessionEnvironment.GetResourceSpecificTaskqueue() - activityWorker := newActivityWorker(service, params, overrides, env, nil) + activityWorker := newActivityWorker(service, params, + &workerOverrides{slotSupplier: params.Tuner.GetSessionActivitySlotSupplier()}, env, nil) params.MaxConcurrentActivityTaskQueuePollers = 1 params.TaskQueue = creationTaskqueue - if overrides == nil { - overrides = &workerOverrides{} - } // Although we have session token bucket to limit session size across creation // and recreation, we also limit it here for creation only + overrides := &workerOverrides{} overrides.slotSupplier, _ = NewFixedSizeSlotSupplier(maxConcurrentSessionExecutionSize) creationWorker := newActivityWorker(service, params, overrides, env, sessionEnvironment.GetTokenBucket()) @@ -1758,7 +1757,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke var sessionWorker *sessionWorker if options.EnableSessionWorker && !options.LocalActivityWorkerOnly { - sessionWorker = newSessionWorker(client.workflowService, workerParams, nil, registry, options.MaxConcurrentSessionExecutionSize) + sessionWorker = newSessionWorker(client.workflowService, workerParams, registry, options.MaxConcurrentSessionExecutionSize) registry.RegisterActivityWithOptions(sessionCreationActivity, RegisterActivityOptions{ Name: sessionCreationActivityName, }) diff --git a/internal/tuning.go b/internal/tuning.go index faf490475..3a36efe62 100644 --- a/internal/tuning.go +++ b/internal/tuning.go @@ -46,6 +46,8 @@ type WorkerTuner interface { GetLocalActivitySlotSupplier() SlotSupplier // GetNexusSlotSupplier returns the SlotSupplier used for nexus tasks. GetNexusSlotSupplier() SlotSupplier + // GetSessionActivitySlotSupplier returns the SlotSupplier used for activities within sessions. + GetSessionActivitySlotSupplier() SlotSupplier } // SlotPermit is a permit to use a slot. @@ -150,10 +152,11 @@ type SlotSupplier interface { // // WARNING: Custom implementations of SlotSupplier are currently experimental. type CompositeTuner struct { - workflowSlotSupplier SlotSupplier - activitySlotSupplier SlotSupplier - localActivitySlotSupplier SlotSupplier - nexusSlotSupplier SlotSupplier + workflowSlotSupplier SlotSupplier + activitySlotSupplier SlotSupplier + localActivitySlotSupplier SlotSupplier + nexusSlotSupplier SlotSupplier + sessionActivitySlotSupplier SlotSupplier } func (c *CompositeTuner) GetWorkflowTaskSlotSupplier() SlotSupplier { @@ -168,6 +171,9 @@ func (c *CompositeTuner) GetLocalActivitySlotSupplier() SlotSupplier { func (c *CompositeTuner) GetNexusSlotSupplier() SlotSupplier { return c.nexusSlotSupplier } +func (c *CompositeTuner) GetSessionActivitySlotSupplier() SlotSupplier { + return c.sessionActivitySlotSupplier +} // CompositeTunerOptions are the options used by NewCompositeTuner. type CompositeTunerOptions struct { @@ -179,6 +185,8 @@ type CompositeTunerOptions struct { LocalActivitySlotSupplier SlotSupplier // NexusSlotSupplier is the SlotSupplier used for nexus tasks. NexusSlotSupplier SlotSupplier + // SessionActivitySlotSupplier is the SlotSupplier used for activities within sessions. + SessionActivitySlotSupplier SlotSupplier } // NewCompositeTuner creates a WorkerTuner that uses a combination of slot suppliers. @@ -186,10 +194,11 @@ type CompositeTunerOptions struct { // WARNING: Custom implementations of SlotSupplier are currently experimental. func NewCompositeTuner(options CompositeTunerOptions) (WorkerTuner, error) { return &CompositeTuner{ - workflowSlotSupplier: options.WorkflowSlotSupplier, - activitySlotSupplier: options.ActivitySlotSupplier, - localActivitySlotSupplier: options.LocalActivitySlotSupplier, - nexusSlotSupplier: options.NexusSlotSupplier, + workflowSlotSupplier: options.WorkflowSlotSupplier, + activitySlotSupplier: options.ActivitySlotSupplier, + localActivitySlotSupplier: options.LocalActivitySlotSupplier, + nexusSlotSupplier: options.NexusSlotSupplier, + sessionActivitySlotSupplier: options.SessionActivitySlotSupplier, }, nil } @@ -235,11 +244,16 @@ func NewFixedSizeTuner(options FixedSizeTunerOptions) (WorkerTuner, error) { if err != nil { return nil, err } + sessSS, err := NewFixedSizeSlotSupplier(options.NumActivitySlots) + if err != nil { + return nil, err + } return &CompositeTuner{ - workflowSlotSupplier: wfSS, - activitySlotSupplier: actSS, - localActivitySlotSupplier: laSS, - nexusSlotSupplier: nexusSS, + workflowSlotSupplier: wfSS, + activitySlotSupplier: actSS, + localActivitySlotSupplier: laSS, + nexusSlotSupplier: nexusSS, + sessionActivitySlotSupplier: sessSS, }, nil } diff --git a/test/integration_test.go b/test/integration_test.go index 4a3fcad9b..bf6b48df2 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -274,6 +274,11 @@ func (ts *IntegrationTestSuite) SetupTest() { ts.NoError(err) options.Tuner = tuner } + if strings.Contains(ts.T().Name(), "SlotSuppliersWithSession") { + options.MaxConcurrentActivityExecutionSize = 1 + // Apparently this is on by default in these tests anyway, but to be explicit + options.EnableSessionWorker = true + } ts.worker = worker.New(ts.client, ts.taskQueueName, options) ts.workerStopped = false @@ -3269,6 +3274,27 @@ func (ts *IntegrationTestSuite) TestResourceBasedSlotSupplierManyActs() { ts.assertMetricGaugeEventually(metrics.WorkerTaskSlotsUsed, wfWorkertags, 0) } +func (ts *IntegrationTestSuite) TestSlotSuppliersWithSessionAndOneConcurrentMax() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + // Activities time out without the fix, since obtaining a slot takes too long + wfRuns := make([]client.WorkflowRun, 0) + for i := 0; i < 3; i++ { + opts := ts.startWorkflowOptions("slot-suppliers-with-session" + strconv.Itoa(i)) + opts.WorkflowExecutionTimeout = 1 * time.Minute + run, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.Echo, "hi") + ts.NoError(err) + ts.NotNil(run) + ts.NoError(err) + wfRuns = append(wfRuns, run) + } + + for _, run := range wfRuns { + ts.NoError(run.Get(ctx, nil)) + } +} + func (ts *IntegrationTestSuite) TestTooFewParams() { var res ParamsValue // Only give first param