Skip to content

Commit

Permalink
Docstrings / visibility / package organization / construction tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Jul 25, 2024
1 parent f394158 commit 9643c0a
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 135 deletions.
147 changes: 115 additions & 32 deletions contrib/resourcetuner/resourcetuner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,85 @@ import (
"go.temporal.io/sdk/worker"
)

// CreateResourceBasedTuner creates a WorkerTuner that dynamically adjusts the number of slots based
// on system resources. Specify the target CPU and memory usage as a value between 0 and 1.
//
// WARNING: Resource based tuning is currently experimental.
func CreateResourceBasedTuner(targetCpu, targetMem float64) (worker.WorkerTuner, error) {
options := DefaultResourceControllerOptions()
options.memTargetPercent = targetMem
options.cpuTargetPercent = targetCpu
controller := newResourceController(options, &psUtilSystemInfoSupplier{})
// TODO: configurable
wfSS := &ResourceBasedSlotSupplier{controller: controller, minSlots: 5, maxSlots: 1000}
actSS := &ResourceBasedSlotSupplier{controller: controller, minSlots: 1, maxSlots: 1000}
laSS := &ResourceBasedSlotSupplier{controller: controller, minSlots: 1, maxSlots: 1000}
options.MemTargetPercent = targetMem
options.CpuTargetPercent = targetCpu
controller := NewResourceControllerWithInfoSupplier(options, &psUtilSystemInfoSupplier{})
wfSS := &ResourceBasedSlotSupplier{controller: controller,
options: defaultWorkflowResourceBasedSlotSupplierOptions()}
actSS := &ResourceBasedSlotSupplier{controller: controller,
options: defaultActivityResourceBasedSlotSupplierOptions()}
laSS := &ResourceBasedSlotSupplier{controller: controller,
options: defaultActivityResourceBasedSlotSupplierOptions()}
compositeTuner := worker.CreateCompositeTuner(wfSS, actSS, laSS)
return compositeTuner, nil
}

// ResourceBasedSlotSupplierOptions configures a particular ResourceBasedSlotSupplier.
//
// WARNING: Resource based tuning is currently experimental.
type ResourceBasedSlotSupplierOptions struct {
// MinSlots is minimum number of slots that will be issued without any resource checks.
MinSlots int
// MaxSlots is the maximum number of slots that will ever be issued.
MaxSlots int
// RampThrottle is time to wait between slot issuance. This value matters (particularly for
// activities) because how many resources a task will use cannot be determined ahead of time,
// and thus the system should wait to see how much resources are used before issuing more slots.
RampThrottle time.Duration
}

func defaultWorkflowResourceBasedSlotSupplierOptions() ResourceBasedSlotSupplierOptions {
return ResourceBasedSlotSupplierOptions{
MinSlots: 5,
MaxSlots: 1000,
RampThrottle: 0 * time.Second,
}
}
func defaultActivityResourceBasedSlotSupplierOptions() ResourceBasedSlotSupplierOptions {
return ResourceBasedSlotSupplierOptions{
MinSlots: 1,
MaxSlots: 1000,
RampThrottle: 50 * time.Millisecond,
}
}

// ResourceBasedSlotSupplier is a worker.SlotSupplier that issues slots based on system resource
// usage.
//
// WARNING: Resource based tuning is currently experimental.
type ResourceBasedSlotSupplier struct {
controller *resourceController
minSlots int
maxSlots int
rampThrottle time.Duration
controller *ResourceController
options ResourceBasedSlotSupplierOptions

lastIssuedMu sync.Mutex
lastSlotIssuedAt time.Time
}

// NewResourceBasedSlotSupplier creates a ResourceBasedSlotSupplier given the provided
// ResourceController and ResourceBasedSlotSupplierOptions. All ResourceBasedSlotSupplier instances
// must use the same ResourceController.
//
// WARNING: Resource based tuning is currently experimental.
func NewResourceBasedSlotSupplier(
controller *ResourceController,
options ResourceBasedSlotSupplierOptions,
) *ResourceBasedSlotSupplier {
return &ResourceBasedSlotSupplier{controller: controller, options: options}
}

func (r *ResourceBasedSlotSupplier) ReserveSlot(ctx context.Context, reserveCtx worker.SlotReserveContext) (*worker.SlotPermit, error) {
for {
if reserveCtx.NumIssuedSlots() < r.minSlots {
if reserveCtx.NumIssuedSlots() < r.options.MinSlots {
return &worker.SlotPermit{}, nil
}
r.lastIssuedMu.Lock()
mustWaitFor := r.rampThrottle - time.Since(r.lastSlotIssuedAt)
mustWaitFor := r.options.RampThrottle - time.Since(r.lastSlotIssuedAt)
r.lastIssuedMu.Unlock()
if mustWaitFor > 0 {
select {
Expand All @@ -89,8 +138,8 @@ func (r *ResourceBasedSlotSupplier) TryReserveSlot(reserveCtx worker.SlotReserve
defer r.lastIssuedMu.Unlock()

numIssued := reserveCtx.NumIssuedSlots()
if numIssued < r.minSlots || (numIssued < r.maxSlots &&
time.Since(r.lastSlotIssuedAt) > r.rampThrottle) {
if numIssued < r.options.MinSlots || (numIssued < r.options.MaxSlots &&
time.Since(r.lastSlotIssuedAt) > r.options.RampThrottle) {
decision, err := r.controller.pidDecision()
if err != nil {
// TODO: log
Expand All @@ -110,6 +159,9 @@ func (r *ResourceBasedSlotSupplier) MaximumSlots() int {
return 0
}

// SystemInfoSupplier implementations provide information about system resources.
//
// WARNING: Resource based tuning is currently experimental.
type SystemInfoSupplier interface {
// GetMemoryUsage returns the current system memory usage as a fraction of total memory between
// 0 and 1.
Expand All @@ -119,12 +171,21 @@ type SystemInfoSupplier interface {
GetCpuUsage() (float64, error)
}

// ResourceControllerOptions contains configurable parameters for a ResourceController.
// It is recommended to use DefaultResourceControllerOptions to create a ResourceControllerOptions
// and only modify the mem/cpu target percent fields.
//
// WARNING: Resource based tuning is currently experimental.
type ResourceControllerOptions struct {
memTargetPercent float64
cpuTargetPercent float64
// MemTargetPercent is the target overall system memory usage as value 0 and 1 that the
// controller will attempt to maintain.
MemTargetPercent float64
// CpuTargetPercent is the target overall system CPU usage as value 0 and 1 that the controller
// will attempt to maintain.
CpuTargetPercent float64

memOutputThreshold float64
cpuOutputThreshold float64
MemOutputThreshold float64
CpuOutputThreshold float64

MemPGain float64
MemIGain float64
Expand All @@ -134,12 +195,15 @@ type ResourceControllerOptions struct {
CpuDGain float64
}

// DefaultResourceControllerOptions returns a ResourceControllerOptions with default values.
//
// WARNING: Resource based tuning is currently experimental.
func DefaultResourceControllerOptions() ResourceControllerOptions {
return ResourceControllerOptions{
memTargetPercent: 0.8,
cpuTargetPercent: 0.9,
memOutputThreshold: 0.25,
cpuOutputThreshold: 0.05,
MemTargetPercent: 0.8,
CpuTargetPercent: 0.9,
MemOutputThreshold: 0.25,
CpuOutputThreshold: 0.05,
MemPGain: 5,
MemIGain: 0,
MemDGain: 1,
Expand All @@ -149,7 +213,11 @@ func DefaultResourceControllerOptions() ResourceControllerOptions {
}
}

type resourceController struct {
// A ResourceController is used by ResourceBasedSlotSupplier to make decisions about whether slots
// should be issued based on system resource usage.
//
// WARNING: Resource based tuning is currently experimental.
type ResourceController struct {
options ResourceControllerOptions

mu sync.Mutex
Expand All @@ -159,11 +227,26 @@ type resourceController struct {
cpuPid *pid.Controller
}

func newResourceController(
// NewResourceController creates a new ResourceController with the provided options.
// WARNING: It is important that you do not create multiple ResourceController instances. Since
// the controller looks at overall system resources, multiple instances with different configs can
// only conflict with one another.
//
// WARNING: Resource based tuning is currently experimental.
func NewResourceController(options ResourceControllerOptions) *ResourceController {
return NewResourceControllerWithInfoSupplier(options, &psUtilSystemInfoSupplier{})
}

// NewResourceControllerWithInfoSupplier creates a new ResourceController with the provided options
// and system information supplier. Only use this if you need to override the default system info
// supplier.
//
// WARNING: Resource based tuning is currently experimental.
func NewResourceControllerWithInfoSupplier(
options ResourceControllerOptions,
infoSupplier SystemInfoSupplier,
) *resourceController {
return &resourceController{
) *ResourceController {
return &ResourceController{
options: options,
infoSupplier: infoSupplier,
memPid: &pid.Controller{
Expand All @@ -183,7 +266,7 @@ func newResourceController(
}
}

func (rc *resourceController) pidDecision() (bool, error) {
func (rc *ResourceController) pidDecision() (bool, error) {
rc.mu.Lock()
defer rc.mu.Unlock()

Expand All @@ -202,19 +285,19 @@ func (rc *resourceController) pidDecision() (bool, error) {
elapsedTime = 1 * time.Millisecond
}
rc.memPid.Update(pid.ControllerInput{
ReferenceSignal: rc.options.memTargetPercent,
ReferenceSignal: rc.options.MemTargetPercent,
ActualSignal: memUsage,
SamplingInterval: elapsedTime,
})
rc.cpuPid.Update(pid.ControllerInput{
ReferenceSignal: rc.options.cpuTargetPercent,
ReferenceSignal: rc.options.CpuTargetPercent,
ActualSignal: cpuUsage,
SamplingInterval: elapsedTime,
})
rc.lastRefresh = time.Now()

return rc.memPid.State.ControlSignal > rc.options.memOutputThreshold &&
rc.cpuPid.State.ControlSignal > rc.options.cpuOutputThreshold, nil
return rc.memPid.State.ControlSignal > rc.options.MemOutputThreshold &&
rc.cpuPid.State.ControlSignal > rc.options.CpuOutputThreshold, nil
}

type psUtilSystemInfoSupplier struct {
Expand Down
6 changes: 3 additions & 3 deletions contrib/resourcetuner/resourcetuner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ func (f FakeSystemInfoSupplier) GetCpuUsage() (float64, error) {
func TestPidDecisions(t *testing.T) {
fakeSupplier := &FakeSystemInfoSupplier{memUse: 0.5, cpuUse: 0.5}
rcOpts := DefaultResourceControllerOptions()
rcOpts.memTargetPercent = 0.8
rcOpts.cpuTargetPercent = 0.9
rc := newResourceController(rcOpts, fakeSupplier)
rcOpts.MemTargetPercent = 0.8
rcOpts.CpuTargetPercent = 0.9
rc := NewResourceControllerWithInfoSupplier(rcOpts, fakeSupplier)

for i := 0; i < 10; i++ {
decision, err := rc.pidDecision()
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/tools/copyright/licensegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (task *addLicenseHeaderTask) handleFile(path string, fileInfo os.FileInfo,
// at this point, src file is missing the header
if task.config.verifyOnly {
if !isFileAutogenerated(path) {
return fmt.Errorf("%v missing license header", path)
return fmt.Errorf("%v missing license header, go run ./internal/cmd/tools/copyright/licensegen.go to fix", path)
}
}

Expand Down
86 changes: 51 additions & 35 deletions internal/tuning.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,56 +28,30 @@ import (
"sync"
"sync/atomic"

"go.temporal.io/sdk/internal/common/metrics"

"golang.org/x/sync/semaphore"

"go.temporal.io/sdk/internal/common/metrics"
)

// WorkerTuner allows for the dynamic customization of some aspects of worker behavior.
//
// WARNING: Custom implementations of SlotSupplier are currently experimental.
type WorkerTuner interface {
GetWorkflowTaskSlotSupplier() SlotSupplier
GetActivityTaskSlotSupplier() SlotSupplier
GetLocalActivitySlotSupplier() SlotSupplier
}

// CompositeTuner allows you to build a tuner from multiple slot suppliers.
type CompositeTuner struct {
workflowSlotSupplier SlotSupplier
activitySlotSupplier SlotSupplier
localActivitySlotSupplier SlotSupplier
}

func (c *CompositeTuner) GetWorkflowTaskSlotSupplier() SlotSupplier {
return c.workflowSlotSupplier
}
func (c *CompositeTuner) GetActivityTaskSlotSupplier() SlotSupplier {
return c.activitySlotSupplier
}
func (c *CompositeTuner) GetLocalActivitySlotSupplier() SlotSupplier {
return c.localActivitySlotSupplier
}

func CreateFixedSizeTuner(numWorkflowSlots, numActivitySlots, numLocalActivitySlots int) WorkerTuner {
return &CompositeTuner{
workflowSlotSupplier: NewFixedSizeSlotSupplier(numWorkflowSlots),
activitySlotSupplier: NewFixedSizeSlotSupplier(numActivitySlots),
localActivitySlotSupplier: NewFixedSizeSlotSupplier(numLocalActivitySlots),
}
}

func CreateCompositeTuner(workflowSlotSupplier, activitySlotSupplier, localActivitySlotSupplier SlotSupplier) WorkerTuner {
return &CompositeTuner{
workflowSlotSupplier: workflowSlotSupplier,
activitySlotSupplier: activitySlotSupplier,
localActivitySlotSupplier: localActivitySlotSupplier,
}
}

// SlotPermit is a permit to use a slot.
//
// WARNING: Custom implementations of SlotSupplier are currently experimental.
type SlotPermit struct {
//lint:ignore U1000 pointless to guarantee uniqueness for now
int
}

// SlotReserveContext contains information that SlotSupplier instances can use during
// reservation calls.
type SlotReserveContext interface {
TaskQueue() string
NumIssuedSlots() int
Expand All @@ -88,6 +62,8 @@ type SlotReserveContext interface {
//
// Currently, you cannot implement your own slot supplier. You can use the provided
// FixedSizeSlotSupplier and ResourceBasedSlotSupplier slot suppliers.
//
// WARNING: Custom implementations of SlotSupplier are currently experimental.
type SlotSupplier interface {
// ReserveSlot is called before polling for new tasks. The implementation should block until
// a slot is available, then return a permit to use that slot. Implementations must be
Expand All @@ -110,6 +86,45 @@ type SlotSupplier interface {
MaximumSlots() int
}

// CompositeTuner allows you to build a tuner from multiple slot suppliers.
//
// WARNING: Custom implementations of SlotSupplier are currently experimental.
type CompositeTuner struct {
workflowSlotSupplier SlotSupplier
activitySlotSupplier SlotSupplier
localActivitySlotSupplier SlotSupplier
}

func (c *CompositeTuner) GetWorkflowTaskSlotSupplier() SlotSupplier {
return c.workflowSlotSupplier
}
func (c *CompositeTuner) GetActivityTaskSlotSupplier() SlotSupplier {
return c.activitySlotSupplier
}
func (c *CompositeTuner) GetLocalActivitySlotSupplier() SlotSupplier {
return c.localActivitySlotSupplier
}

// CreateCompositeTuner creates a WorkerTuner that uses a combination of slot suppliers.
//
// WARNING: Custom implementations of SlotSupplier are currently experimental.
func CreateCompositeTuner(workflowSlotSupplier, activitySlotSupplier, localActivitySlotSupplier SlotSupplier) WorkerTuner {
return &CompositeTuner{
workflowSlotSupplier: workflowSlotSupplier,
activitySlotSupplier: activitySlotSupplier,
localActivitySlotSupplier: localActivitySlotSupplier,
}
}

// CreateFixedSizeTuner creates a WorkerTuner that uses fixed size slot suppliers.
func CreateFixedSizeTuner(numWorkflowSlots, numActivitySlots, numLocalActivitySlots int) WorkerTuner {
return &CompositeTuner{
workflowSlotSupplier: NewFixedSizeSlotSupplier(numWorkflowSlots),
activitySlotSupplier: NewFixedSizeSlotSupplier(numActivitySlots),
localActivitySlotSupplier: NewFixedSizeSlotSupplier(numLocalActivitySlots),
}
}

// FixedSizeSlotSupplier is a slot supplier that will only ever issue at most a fixed number of
// slots.
type FixedSizeSlotSupplier struct {
Expand All @@ -119,6 +134,7 @@ type FixedSizeSlotSupplier struct {
sem *semaphore.Weighted
}

// NewFixedSizeSlotSupplier creates a new FixedSizeSlotSupplier with the given number of slots.
func NewFixedSizeSlotSupplier(numSlots int) *FixedSizeSlotSupplier {
return &FixedSizeSlotSupplier{
NumSlots: numSlots,
Expand Down
Loading

0 comments on commit 9643c0a

Please sign in to comment.