From 9643c0a5842b2fa54c1909161e008fe5af08eee9 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 25 Jul 2024 14:55:21 -0700 Subject: [PATCH] Docstrings / visibility / package organization / construction tests --- contrib/resourcetuner/resourcetuner.go | 147 +++++++++++++++----- contrib/resourcetuner/resourcetuner_test.go | 6 +- internal/cmd/tools/copyright/licensegen.go | 2 +- internal/tuning.go | 86 +++++++----- test/integration_test.go | 40 +----- test/worker_tuner_test.go | 108 ++++++++++++++ test/workflow_test.go | 32 +++++ worker/tuning.go | 61 ++++++++ worker/worker.go | 25 ---- 9 files changed, 372 insertions(+), 135 deletions(-) create mode 100644 test/worker_tuner_test.go create mode 100644 worker/tuning.go diff --git a/contrib/resourcetuner/resourcetuner.go b/contrib/resourcetuner/resourcetuner.go index a05d877db..f1d61f6b3 100644 --- a/contrib/resourcetuner/resourcetuner.go +++ b/contrib/resourcetuner/resourcetuner.go @@ -33,36 +33,85 @@ import ( "go.temporal.io/sdk/worker" ) +// CreateResourceBasedTuner creates a WorkerTuner that dynamically adjusts the number of slots based +// on system resources. Specify the target CPU and memory usage as a value between 0 and 1. +// +// WARNING: Resource based tuning is currently experimental. func CreateResourceBasedTuner(targetCpu, targetMem float64) (worker.WorkerTuner, error) { options := DefaultResourceControllerOptions() - options.memTargetPercent = targetMem - options.cpuTargetPercent = targetCpu - controller := newResourceController(options, &psUtilSystemInfoSupplier{}) - // TODO: configurable - wfSS := &ResourceBasedSlotSupplier{controller: controller, minSlots: 5, maxSlots: 1000} - actSS := &ResourceBasedSlotSupplier{controller: controller, minSlots: 1, maxSlots: 1000} - laSS := &ResourceBasedSlotSupplier{controller: controller, minSlots: 1, maxSlots: 1000} + options.MemTargetPercent = targetMem + options.CpuTargetPercent = targetCpu + controller := NewResourceControllerWithInfoSupplier(options, &psUtilSystemInfoSupplier{}) + wfSS := &ResourceBasedSlotSupplier{controller: controller, + options: defaultWorkflowResourceBasedSlotSupplierOptions()} + actSS := &ResourceBasedSlotSupplier{controller: controller, + options: defaultActivityResourceBasedSlotSupplierOptions()} + laSS := &ResourceBasedSlotSupplier{controller: controller, + options: defaultActivityResourceBasedSlotSupplierOptions()} compositeTuner := worker.CreateCompositeTuner(wfSS, actSS, laSS) return compositeTuner, nil } +// ResourceBasedSlotSupplierOptions configures a particular ResourceBasedSlotSupplier. +// +// WARNING: Resource based tuning is currently experimental. +type ResourceBasedSlotSupplierOptions struct { + // MinSlots is minimum number of slots that will be issued without any resource checks. + MinSlots int + // MaxSlots is the maximum number of slots that will ever be issued. + MaxSlots int + // RampThrottle is time to wait between slot issuance. This value matters (particularly for + // activities) because how many resources a task will use cannot be determined ahead of time, + // and thus the system should wait to see how much resources are used before issuing more slots. + RampThrottle time.Duration +} + +func defaultWorkflowResourceBasedSlotSupplierOptions() ResourceBasedSlotSupplierOptions { + return ResourceBasedSlotSupplierOptions{ + MinSlots: 5, + MaxSlots: 1000, + RampThrottle: 0 * time.Second, + } +} +func defaultActivityResourceBasedSlotSupplierOptions() ResourceBasedSlotSupplierOptions { + return ResourceBasedSlotSupplierOptions{ + MinSlots: 1, + MaxSlots: 1000, + RampThrottle: 50 * time.Millisecond, + } +} + +// ResourceBasedSlotSupplier is a worker.SlotSupplier that issues slots based on system resource +// usage. +// +// WARNING: Resource based tuning is currently experimental. type ResourceBasedSlotSupplier struct { - controller *resourceController - minSlots int - maxSlots int - rampThrottle time.Duration + controller *ResourceController + options ResourceBasedSlotSupplierOptions lastIssuedMu sync.Mutex lastSlotIssuedAt time.Time } +// NewResourceBasedSlotSupplier creates a ResourceBasedSlotSupplier given the provided +// ResourceController and ResourceBasedSlotSupplierOptions. All ResourceBasedSlotSupplier instances +// must use the same ResourceController. +// +// WARNING: Resource based tuning is currently experimental. +func NewResourceBasedSlotSupplier( + controller *ResourceController, + options ResourceBasedSlotSupplierOptions, +) *ResourceBasedSlotSupplier { + return &ResourceBasedSlotSupplier{controller: controller, options: options} +} + func (r *ResourceBasedSlotSupplier) ReserveSlot(ctx context.Context, reserveCtx worker.SlotReserveContext) (*worker.SlotPermit, error) { for { - if reserveCtx.NumIssuedSlots() < r.minSlots { + if reserveCtx.NumIssuedSlots() < r.options.MinSlots { return &worker.SlotPermit{}, nil } r.lastIssuedMu.Lock() - mustWaitFor := r.rampThrottle - time.Since(r.lastSlotIssuedAt) + mustWaitFor := r.options.RampThrottle - time.Since(r.lastSlotIssuedAt) r.lastIssuedMu.Unlock() if mustWaitFor > 0 { select { @@ -89,8 +138,8 @@ func (r *ResourceBasedSlotSupplier) TryReserveSlot(reserveCtx worker.SlotReserve defer r.lastIssuedMu.Unlock() numIssued := reserveCtx.NumIssuedSlots() - if numIssued < r.minSlots || (numIssued < r.maxSlots && - time.Since(r.lastSlotIssuedAt) > r.rampThrottle) { + if numIssued < r.options.MinSlots || (numIssued < r.options.MaxSlots && + time.Since(r.lastSlotIssuedAt) > r.options.RampThrottle) { decision, err := r.controller.pidDecision() if err != nil { // TODO: log @@ -110,6 +159,9 @@ func (r *ResourceBasedSlotSupplier) MaximumSlots() int { return 0 } +// SystemInfoSupplier implementations provide information about system resources. +// +// WARNING: Resource based tuning is currently experimental. type SystemInfoSupplier interface { // GetMemoryUsage returns the current system memory usage as a fraction of total memory between // 0 and 1. @@ -119,12 +171,21 @@ type SystemInfoSupplier interface { GetCpuUsage() (float64, error) } +// ResourceControllerOptions contains configurable parameters for a ResourceController. +// It is recommended to use DefaultResourceControllerOptions to create a ResourceControllerOptions +// and only modify the mem/cpu target percent fields. +// +// WARNING: Resource based tuning is currently experimental. type ResourceControllerOptions struct { - memTargetPercent float64 - cpuTargetPercent float64 + // MemTargetPercent is the target overall system memory usage as value 0 and 1 that the + // controller will attempt to maintain. + MemTargetPercent float64 + // CpuTargetPercent is the target overall system CPU usage as value 0 and 1 that the controller + // will attempt to maintain. + CpuTargetPercent float64 - memOutputThreshold float64 - cpuOutputThreshold float64 + MemOutputThreshold float64 + CpuOutputThreshold float64 MemPGain float64 MemIGain float64 @@ -134,12 +195,15 @@ type ResourceControllerOptions struct { CpuDGain float64 } +// DefaultResourceControllerOptions returns a ResourceControllerOptions with default values. +// +// WARNING: Resource based tuning is currently experimental. func DefaultResourceControllerOptions() ResourceControllerOptions { return ResourceControllerOptions{ - memTargetPercent: 0.8, - cpuTargetPercent: 0.9, - memOutputThreshold: 0.25, - cpuOutputThreshold: 0.05, + MemTargetPercent: 0.8, + CpuTargetPercent: 0.9, + MemOutputThreshold: 0.25, + CpuOutputThreshold: 0.05, MemPGain: 5, MemIGain: 0, MemDGain: 1, @@ -149,7 +213,11 @@ func DefaultResourceControllerOptions() ResourceControllerOptions { } } -type resourceController struct { +// A ResourceController is used by ResourceBasedSlotSupplier to make decisions about whether slots +// should be issued based on system resource usage. +// +// WARNING: Resource based tuning is currently experimental. +type ResourceController struct { options ResourceControllerOptions mu sync.Mutex @@ -159,11 +227,26 @@ type resourceController struct { cpuPid *pid.Controller } -func newResourceController( +// NewResourceController creates a new ResourceController with the provided options. +// WARNING: It is important that you do not create multiple ResourceController instances. Since +// the controller looks at overall system resources, multiple instances with different configs can +// only conflict with one another. +// +// WARNING: Resource based tuning is currently experimental. +func NewResourceController(options ResourceControllerOptions) *ResourceController { + return NewResourceControllerWithInfoSupplier(options, &psUtilSystemInfoSupplier{}) +} + +// NewResourceControllerWithInfoSupplier creates a new ResourceController with the provided options +// and system information supplier. Only use this if you need to override the default system info +// supplier. +// +// WARNING: Resource based tuning is currently experimental. +func NewResourceControllerWithInfoSupplier( options ResourceControllerOptions, infoSupplier SystemInfoSupplier, -) *resourceController { - return &resourceController{ +) *ResourceController { + return &ResourceController{ options: options, infoSupplier: infoSupplier, memPid: &pid.Controller{ @@ -183,7 +266,7 @@ func newResourceController( } } -func (rc *resourceController) pidDecision() (bool, error) { +func (rc *ResourceController) pidDecision() (bool, error) { rc.mu.Lock() defer rc.mu.Unlock() @@ -202,19 +285,19 @@ func (rc *resourceController) pidDecision() (bool, error) { elapsedTime = 1 * time.Millisecond } rc.memPid.Update(pid.ControllerInput{ - ReferenceSignal: rc.options.memTargetPercent, + ReferenceSignal: rc.options.MemTargetPercent, ActualSignal: memUsage, SamplingInterval: elapsedTime, }) rc.cpuPid.Update(pid.ControllerInput{ - ReferenceSignal: rc.options.cpuTargetPercent, + ReferenceSignal: rc.options.CpuTargetPercent, ActualSignal: cpuUsage, SamplingInterval: elapsedTime, }) rc.lastRefresh = time.Now() - return rc.memPid.State.ControlSignal > rc.options.memOutputThreshold && - rc.cpuPid.State.ControlSignal > rc.options.cpuOutputThreshold, nil + return rc.memPid.State.ControlSignal > rc.options.MemOutputThreshold && + rc.cpuPid.State.ControlSignal > rc.options.CpuOutputThreshold, nil } type psUtilSystemInfoSupplier struct { diff --git a/contrib/resourcetuner/resourcetuner_test.go b/contrib/resourcetuner/resourcetuner_test.go index d1efe7395..c103ff816 100644 --- a/contrib/resourcetuner/resourcetuner_test.go +++ b/contrib/resourcetuner/resourcetuner_test.go @@ -44,9 +44,9 @@ func (f FakeSystemInfoSupplier) GetCpuUsage() (float64, error) { func TestPidDecisions(t *testing.T) { fakeSupplier := &FakeSystemInfoSupplier{memUse: 0.5, cpuUse: 0.5} rcOpts := DefaultResourceControllerOptions() - rcOpts.memTargetPercent = 0.8 - rcOpts.cpuTargetPercent = 0.9 - rc := newResourceController(rcOpts, fakeSupplier) + rcOpts.MemTargetPercent = 0.8 + rcOpts.CpuTargetPercent = 0.9 + rc := NewResourceControllerWithInfoSupplier(rcOpts, fakeSupplier) for i := 0; i < 10; i++ { decision, err := rc.pidDecision() diff --git a/internal/cmd/tools/copyright/licensegen.go b/internal/cmd/tools/copyright/licensegen.go index 788ee588b..0e5b2d3d3 100644 --- a/internal/cmd/tools/copyright/licensegen.go +++ b/internal/cmd/tools/copyright/licensegen.go @@ -150,7 +150,7 @@ func (task *addLicenseHeaderTask) handleFile(path string, fileInfo os.FileInfo, // at this point, src file is missing the header if task.config.verifyOnly { if !isFileAutogenerated(path) { - return fmt.Errorf("%v missing license header", path) + return fmt.Errorf("%v missing license header, go run ./internal/cmd/tools/copyright/licensegen.go to fix", path) } } diff --git a/internal/tuning.go b/internal/tuning.go index 132d77d62..b522c6ceb 100644 --- a/internal/tuning.go +++ b/internal/tuning.go @@ -28,56 +28,30 @@ import ( "sync" "sync/atomic" - "go.temporal.io/sdk/internal/common/metrics" - "golang.org/x/sync/semaphore" + + "go.temporal.io/sdk/internal/common/metrics" ) // WorkerTuner allows for the dynamic customization of some aspects of worker behavior. +// +// WARNING: Custom implementations of SlotSupplier are currently experimental. type WorkerTuner interface { GetWorkflowTaskSlotSupplier() SlotSupplier GetActivityTaskSlotSupplier() SlotSupplier GetLocalActivitySlotSupplier() SlotSupplier } -// CompositeTuner allows you to build a tuner from multiple slot suppliers. -type CompositeTuner struct { - workflowSlotSupplier SlotSupplier - activitySlotSupplier SlotSupplier - localActivitySlotSupplier SlotSupplier -} - -func (c *CompositeTuner) GetWorkflowTaskSlotSupplier() SlotSupplier { - return c.workflowSlotSupplier -} -func (c *CompositeTuner) GetActivityTaskSlotSupplier() SlotSupplier { - return c.activitySlotSupplier -} -func (c *CompositeTuner) GetLocalActivitySlotSupplier() SlotSupplier { - return c.localActivitySlotSupplier -} - -func CreateFixedSizeTuner(numWorkflowSlots, numActivitySlots, numLocalActivitySlots int) WorkerTuner { - return &CompositeTuner{ - workflowSlotSupplier: NewFixedSizeSlotSupplier(numWorkflowSlots), - activitySlotSupplier: NewFixedSizeSlotSupplier(numActivitySlots), - localActivitySlotSupplier: NewFixedSizeSlotSupplier(numLocalActivitySlots), - } -} - -func CreateCompositeTuner(workflowSlotSupplier, activitySlotSupplier, localActivitySlotSupplier SlotSupplier) WorkerTuner { - return &CompositeTuner{ - workflowSlotSupplier: workflowSlotSupplier, - activitySlotSupplier: activitySlotSupplier, - localActivitySlotSupplier: localActivitySlotSupplier, - } -} - +// SlotPermit is a permit to use a slot. +// +// WARNING: Custom implementations of SlotSupplier are currently experimental. type SlotPermit struct { //lint:ignore U1000 pointless to guarantee uniqueness for now int } +// SlotReserveContext contains information that SlotSupplier instances can use during +// reservation calls. type SlotReserveContext interface { TaskQueue() string NumIssuedSlots() int @@ -88,6 +62,8 @@ type SlotReserveContext interface { // // Currently, you cannot implement your own slot supplier. You can use the provided // FixedSizeSlotSupplier and ResourceBasedSlotSupplier slot suppliers. +// +// WARNING: Custom implementations of SlotSupplier are currently experimental. type SlotSupplier interface { // ReserveSlot is called before polling for new tasks. The implementation should block until // a slot is available, then return a permit to use that slot. Implementations must be @@ -110,6 +86,45 @@ type SlotSupplier interface { MaximumSlots() int } +// CompositeTuner allows you to build a tuner from multiple slot suppliers. +// +// WARNING: Custom implementations of SlotSupplier are currently experimental. +type CompositeTuner struct { + workflowSlotSupplier SlotSupplier + activitySlotSupplier SlotSupplier + localActivitySlotSupplier SlotSupplier +} + +func (c *CompositeTuner) GetWorkflowTaskSlotSupplier() SlotSupplier { + return c.workflowSlotSupplier +} +func (c *CompositeTuner) GetActivityTaskSlotSupplier() SlotSupplier { + return c.activitySlotSupplier +} +func (c *CompositeTuner) GetLocalActivitySlotSupplier() SlotSupplier { + return c.localActivitySlotSupplier +} + +// CreateCompositeTuner creates a WorkerTuner that uses a combination of slot suppliers. +// +// WARNING: Custom implementations of SlotSupplier are currently experimental. +func CreateCompositeTuner(workflowSlotSupplier, activitySlotSupplier, localActivitySlotSupplier SlotSupplier) WorkerTuner { + return &CompositeTuner{ + workflowSlotSupplier: workflowSlotSupplier, + activitySlotSupplier: activitySlotSupplier, + localActivitySlotSupplier: localActivitySlotSupplier, + } +} + +// CreateFixedSizeTuner creates a WorkerTuner that uses fixed size slot suppliers. +func CreateFixedSizeTuner(numWorkflowSlots, numActivitySlots, numLocalActivitySlots int) WorkerTuner { + return &CompositeTuner{ + workflowSlotSupplier: NewFixedSizeSlotSupplier(numWorkflowSlots), + activitySlotSupplier: NewFixedSizeSlotSupplier(numActivitySlots), + localActivitySlotSupplier: NewFixedSizeSlotSupplier(numLocalActivitySlots), + } +} + // FixedSizeSlotSupplier is a slot supplier that will only ever issue at most a fixed number of // slots. type FixedSizeSlotSupplier struct { @@ -119,6 +134,7 @@ type FixedSizeSlotSupplier struct { sem *semaphore.Weighted } +// NewFixedSizeSlotSupplier creates a new FixedSizeSlotSupplier with the given number of slots. func NewFixedSizeSlotSupplier(numSlots int) *FixedSizeSlotSupplier { return &FixedSizeSlotSupplier{ NumSlots: numSlots, diff --git a/test/integration_test.go b/test/integration_test.go index 2ea9258ec..df0ceb8df 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -3039,49 +3039,11 @@ func (ts *IntegrationTestSuite) TestResourceBasedSlotSupplierWorks() { laWorkertags := []string{"worker_type", "LocalActivityWorker", "task_queue", ts.taskQueueName} wfWorkertags := []string{"worker_type", "WorkflowWorker", "task_queue", ts.taskQueueName} - actRunning := atomic.Int32{} - laRunning := atomic.Int32{} - actStruct := &highWaterMarkActivities{currentlyRunning: &actRunning, maxConcurrent: 900} - laStruct := &highWaterMarkActivities{currentlyRunning: &laRunning, maxConcurrent: 900} - - noExceedLimitsWf := func(ctx workflow.Context) error { - futures := make([]workflow.Future, 0) - for i := 0; i < 5; i++ { - ao := workflow.LocalActivityOptions{ - StartToCloseTimeout: time.Minute, - RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 3, InitialInterval: time.Millisecond, BackoffCoefficient: 1}, - } - ctx = workflow.WithLocalActivityOptions(ctx, ao) - a := workflow.ExecuteLocalActivity(ctx, func(ctx context.Context, i int) error { return laStruct.DoActivity(ctx, i) }, i) - futures = append(futures, a) - } - for i := 0; i < 5; i++ { - ao := workflow.ActivityOptions{ - StartToCloseTimeout: time.Minute, - RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 3, InitialInterval: time.Millisecond, BackoffCoefficient: 1}, - } - ctx = workflow.WithActivityOptions(ctx, ao) - a := workflow.ExecuteActivity(ctx, actStruct.DoActivity, i) - futures = append(futures, a) - } - - for _, f := range futures { - err := f.Get(ctx, nil) - if err != nil { - return err - } - } - return nil - } - - ts.worker.RegisterWorkflow(noExceedLimitsWf) - ts.worker.RegisterActivity(actStruct) - wfRuns := make([]client.WorkflowRun, 0) for i := 0; i < 1; i++ { run, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("resource-based-slot-supplier"+strconv.Itoa(i)), - noExceedLimitsWf) + ts.workflows.RunsLocalAndNonlocalActsWithRetries, 2) ts.NoError(err) ts.NotNil(run) ts.NoError(err) diff --git a/test/worker_tuner_test.go b/test/worker_tuner_test.go new file mode 100644 index 000000000..9f5dd567d --- /dev/null +++ b/test/worker_tuner_test.go @@ -0,0 +1,108 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package test_test + +import ( + "context" + "testing" + + "go.temporal.io/sdk/worker" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/contrib/resourcetuner" +) + +type WorkerTunerTestSuite struct { + *require.Assertions + suite.Suite + ConfigAndClientSuiteBase + workflows *Workflows + activities *Activities +} + +func TestWorkerTunerTestSuite(t *testing.T) { + suite.Run(t, new(WorkerTunerTestSuite)) +} + +func (ts *WorkerTunerTestSuite) SetupSuite() { + ts.Assertions = require.New(ts.T()) + ts.workflows = &Workflows{} + ts.activities = &Activities{} + ts.NoError(ts.InitConfigAndNamespace()) + ts.NoError(ts.InitClient()) +} + +func (ts *WorkerTunerTestSuite) TearDownSuite() { + ts.Assertions = require.New(ts.T()) + ts.client.Close() +} + +func (ts *WorkerTunerTestSuite) SetupTest() { + ts.taskQueueName = taskQueuePrefix + "-" + ts.T().Name() +} + +func (ts *WorkerTunerTestSuite) TestFixedSizeWorkerTuner() { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + tuner := worker.CreateFixedSizeTuner(10, 10, 5) + + ts.runTheWorkflow(tuner, ctx) +} + +func (ts *WorkerTunerTestSuite) TestCompositeWorkerTuner() { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + wfSS := worker.NewFixedSizeSlotSupplier(10) + controllerOpts := resourcetuner.DefaultResourceControllerOptions() + controllerOpts.MemTargetPercent = 0.8 + controllerOpts.CpuTargetPercent = 0.9 + controller := resourcetuner.NewResourceController(controllerOpts) + actSS := resourcetuner.NewResourceBasedSlotSupplier(controller, + resourcetuner.ResourceBasedSlotSupplierOptions{ + MinSlots: 10, + MaxSlots: 20, + RampThrottle: 0, + }) + laCss := worker.NewFixedSizeSlotSupplier(5) + tuner := worker.CreateCompositeTuner(wfSS, actSS, laCss) + + ts.runTheWorkflow(tuner, ctx) +} + +func (ts *WorkerTunerTestSuite) runTheWorkflow(tuner worker.WorkerTuner, ctx context.Context) { + workerOptions := worker.Options{Tuner: tuner} + myWorker := worker.New(ts.client, ts.taskQueueName, workerOptions) + ts.workflows.register(myWorker) + ts.activities.register(myWorker) + ts.NoError(myWorker.Start()) + defer myWorker.Stop() + + handle, err := ts.client.ExecuteWorkflow(ctx, + ts.startWorkflowOptions(ts.T().Name()), + ts.workflows.RunsLocalAndNonlocalActsWithRetries, 2) + ts.NoError(err) + ts.NoError(handle.Get(ctx, nil)) +} diff --git a/test/workflow_test.go b/test/workflow_test.go index 9bc3007cf..a34df8352 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -3011,6 +3011,37 @@ func (w *Workflows) UpsertMemo(ctx workflow.Context, memo map[string]interface{} return workflow.GetInfo(ctx).Memo, nil } +func (w *Workflows) RunsLocalAndNonlocalActsWithRetries(ctx workflow.Context, actFailTimes int) error { + var activities *Activities + futures := make([]workflow.Future, 0) + for i := 0; i < 5; i++ { + ao := workflow.LocalActivityOptions{ + StartToCloseTimeout: time.Minute, + RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 3, InitialInterval: time.Millisecond, BackoffCoefficient: 1}, + } + ctx = workflow.WithLocalActivityOptions(ctx, ao) + a := workflow.ExecuteLocalActivity(ctx, activities.failNTimes, actFailTimes, i) + futures = append(futures, a) + } + for i := 0; i < 5; i++ { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute, + RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 3, InitialInterval: time.Millisecond, BackoffCoefficient: 1}, + } + ctx = workflow.WithActivityOptions(ctx, ao) + a := workflow.ExecuteActivity(ctx, activities.failNTimes, actFailTimes, i) + futures = append(futures, a) + } + + for _, f := range futures { + err := f.Get(ctx, nil) + if err != nil { + return err + } + } + return nil +} + func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.ActivityCancelRepro) worker.RegisterWorkflow(w.ActivityCompletionUsingID) @@ -3140,6 +3171,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.UpdateOrdering) worker.RegisterWorkflow(w.UpdateSetHandlerOnly) worker.RegisterWorkflow(w.Echo) + worker.RegisterWorkflow(w.RunsLocalAndNonlocalActsWithRetries) } func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions { diff --git a/worker/tuning.go b/worker/tuning.go new file mode 100644 index 000000000..26e57b171 --- /dev/null +++ b/worker/tuning.go @@ -0,0 +1,61 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package worker + +import ( + "go.temporal.io/sdk/internal" +) + +// WorkerTuner allows for the dynamic customization of some aspects of worker behavior. +type WorkerTuner = internal.WorkerTuner + +// SlotPermit is a permit to use a slot. +type SlotPermit = internal.SlotPermit + +// SlotSupplier controls how slots are handed out for workflow and activity tasks as well as +// local activities when used in conjunction with a WorkerTuner. +// +// Currently, you cannot implement your own slot supplier. You can use the provided +// FixedSizeSlotSupplier and ResourceBasedSlotSupplier slot suppliers. +type SlotSupplier = internal.SlotSupplier + +// SlotReserveContext contains information that SlotSupplier instances can use during +// reservation calls. +type SlotReserveContext = internal.SlotReserveContext + +// CreateFixedSizeTuner creates a WorkerTuner that uses fixed size slot suppliers. +func CreateFixedSizeTuner(numWorkflowSlots, numActivitySlots, numLocalActivitySlots int) WorkerTuner { + return internal.CreateFixedSizeTuner(numWorkflowSlots, numActivitySlots, numLocalActivitySlots) +} + +// CreateCompositeTuner creates a WorkerTuner that uses a combination of slot suppliers. +func CreateCompositeTuner(workflowSlotSupplier, activitySlotSupplier, localActivitySlotSupplier SlotSupplier) WorkerTuner { + return internal.CreateCompositeTuner(workflowSlotSupplier, activitySlotSupplier, localActivitySlotSupplier) +} + +// NewFixedSizeSlotSupplier creates a new FixedSizeSlotSupplier with the given number of slots. +func NewFixedSizeSlotSupplier(numSlots int) SlotSupplier { + return internal.NewFixedSizeSlotSupplier(numSlots) +} diff --git a/worker/worker.go b/worker/worker.go index 887b604fc..dd4f8687a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -226,21 +226,6 @@ type ( // ReplayWorkflowHistoryOptions are options for replaying a workflow. ReplayWorkflowHistoryOptions = internal.ReplayWorkflowHistoryOptions - - // WorkerTuner allows for the dynamic customization of some aspects of worker behavior. - WorkerTuner = internal.WorkerTuner - - // SlotPermit is a permit to use a slot. - SlotPermit = internal.SlotPermit - - // SlotSupplier controls how slots are handed out for workflow and activity tasks as well as - // local activities when used in conjunction with a WorkerTuner. - // - // Currently, you cannot implement your own slot supplier. You can use the provided - // FixedSizeSlotSupplier and ResourceBasedSlotSupplier slot suppliers. - SlotSupplier = internal.SlotSupplier - - SlotReserveContext = internal.SlotReserveContext ) const ( @@ -320,13 +305,3 @@ func SetBinaryChecksum(checksum string) { func InterruptCh() <-chan interface{} { return internal.InterruptCh() } - -// CreateFixedSizeTuner creates a WorkerTuner that uses fixed size slot suppliers. -func CreateFixedSizeTuner(numWorkflowSlots, numActivitySlots, numLocalActivitySlots int) WorkerTuner { - return internal.CreateFixedSizeTuner(numWorkflowSlots, numActivitySlots, numLocalActivitySlots) -} - -// CreateCompositeTuner creates a WorkerTuner that uses a combination of slot suppliers. -func CreateCompositeTuner(workflowSlotSupplier, activitySlotSupplier, localActivitySlotSupplier SlotSupplier) WorkerTuner { - return internal.CreateCompositeTuner(workflowSlotSupplier, activitySlotSupplier, localActivitySlotSupplier) -}