Skip to content

Commit

Permalink
Move logger/handler into newBaseWorker & limit session creation
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Jul 31, 2024
1 parent 1fb6014 commit 69a7a34
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 51 deletions.
10 changes: 3 additions & 7 deletions internal/internal_nexus_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ package internal
import (
"github.com/nexus-rpc/sdk-go/nexus"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/log"
)

type nexusWorkerOptions struct {
Expand Down Expand Up @@ -63,21 +61,19 @@ func newNexusWorker(opts nexusWorkerOptions) (*nexusWorker, error) {
params,
)

workerType := "NexusWorker"
logger := log.With(params.Logger, tagWorkerType, workerType)
metricsHandler := params.MetricsHandler.WithTags(metrics.WorkerTags(workerType))
baseWorker := newBaseWorker(baseWorkerOptions{
pollerCount: params.MaxConcurrentNexusTaskQueuePollers,
pollerRate: defaultPollerRate,
slotSupplier: params.Tuner.GetNexusSlotSupplier(),
maxTaskPerSecond: defaultWorkerTaskExecutionRate,
taskWorker: poller,
workerType: "NexusWorker",
identity: params.Identity,
buildId: params.getBuildID(),
logger: logger,
logger: params.Logger,
stopTimeout: params.WorkerStopTimeout,
fatalErrCb: params.WorkerFatalErrorCallback,
metricsHandler: metricsHandler,
metricsHandler: params.MetricsHandler,
slotReservationData: slotReservationData{
taskQueue: params.TaskQueue,
},
Expand Down
74 changes: 40 additions & 34 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type (
workerOverrides struct {
workflowTaskHandler WorkflowTaskHandler
activityTaskHandler ActivityTaskHandler
slotSupplier SlotSupplier
}

// workerExecutionParameters defines worker configure/execution options.
Expand Down Expand Up @@ -318,21 +319,19 @@ func newWorkflowTaskWorkerInternal(
) *workflowWorker {
ensureRequiredParams(&params)
poller := newWorkflowTaskPoller(taskHandler, contextManager, service, params)
workerType := "WorkflowWorker"
logger := log.With(params.Logger, tagWorkerType, workerType)
metricsHandler := params.MetricsHandler.WithTags(metrics.WorkerTags(workerType))
worker := newBaseWorker(baseWorkerOptions{
pollerCount: params.MaxConcurrentWorkflowTaskQueuePollers,
pollerRate: defaultPollerRate,
slotSupplier: params.Tuner.GetWorkflowTaskSlotSupplier(),
maxTaskPerSecond: defaultWorkerTaskExecutionRate,
taskWorker: poller,
workerType: "WorkflowWorker",
identity: params.Identity,
buildId: params.getBuildID(),
logger: logger,
logger: params.Logger,
stopTimeout: params.WorkerStopTimeout,
fatalErrCb: params.WorkerFatalErrorCallback,
metricsHandler: metricsHandler,
metricsHandler: params.MetricsHandler,
slotReservationData: slotReservationData{
taskQueue: params.TaskQueue,
},
Expand All @@ -349,20 +348,18 @@ func newWorkflowTaskWorkerInternal(

// 2) local activity task poller will poll from laTunnel, and result will be pushed to laTunnel
localActivityTaskPoller := newLocalActivityPoller(params, laTunnel, interceptors)
workerType = "LocalActivityWorker"
logger = log.With(params.Logger, tagWorkerType, workerType)
metricsHandler = params.MetricsHandler.WithTags(metrics.WorkerTags(workerType))
localActivityWorker := newBaseWorker(baseWorkerOptions{
pollerCount: 1, // 1 poller (from local channel) is enough for local activity
slotSupplier: params.Tuner.GetLocalActivitySlotSupplier(),
maxTaskPerSecond: params.WorkerLocalActivitiesPerSecond,
taskWorker: localActivityTaskPoller,
workerType: "LocalActivityWorker",
identity: params.Identity,
buildId: params.getBuildID(),
logger: logger,
logger: params.Logger,
stopTimeout: params.WorkerStopTimeout,
fatalErrCb: params.WorkerFatalErrorCallback,
metricsHandler: metricsHandler,
metricsHandler: params.MetricsHandler,
slotReservationData: slotReservationData{
taskQueue: params.TaskQueue,
},
Expand Down Expand Up @@ -419,6 +416,10 @@ func newSessionWorker(service workflowservice.WorkflowServiceClient, params work

params.MaxConcurrentActivityTaskQueuePollers = 1
params.TaskQueue = creationTaskqueue
if overrides == nil {
overrides = &workerOverrides{}
}
overrides.slotSupplier, _ = NewFixedSizeSlotSupplier(maxConcurrentSessionExecutionSize)
creationWorker := newActivityWorker(service, params, overrides, env, sessionEnvironment.GetTokenBucket())

return &sessionWorker{
Expand Down Expand Up @@ -446,7 +447,13 @@ func (sw *sessionWorker) Stop() {
sw.activityWorker.Stop()
}

func newActivityWorker(service workflowservice.WorkflowServiceClient, params workerExecutionParameters, overrides *workerOverrides, env *registry, sessionTokenBucket *sessionTokenBucket) *activityWorker {
func newActivityWorker(
service workflowservice.WorkflowServiceClient,
params workerExecutionParameters,
overrides *workerOverrides,
env *registry,
sessionTokenBucket *sessionTokenBucket,
) *activityWorker {
workerStopChannel := make(chan struct{}, 1)
params.WorkerStopChannel = getReadOnlyChannel(workerStopChannel)
ensureRequiredParams(&params)
Expand All @@ -458,44 +465,43 @@ func newActivityWorker(service workflowservice.WorkflowServiceClient, params wor
} else {
taskHandler = newActivityTaskHandler(service, params, env)
}
return newActivityTaskWorker(taskHandler, service, params, sessionTokenBucket, workerStopChannel)
}

func newActivityTaskWorker(taskHandler ActivityTaskHandler, service workflowservice.WorkflowServiceClient, workerParams workerExecutionParameters, sessionTokenBucket *sessionTokenBucket, stopC chan struct{}) (worker *activityWorker) {
ensureRequiredParams(&workerParams)
poller := newActivityTaskPoller(taskHandler, service, params)
var slotSupplier SlotSupplier
if overrides != nil && overrides.slotSupplier != nil {
slotSupplier = overrides.slotSupplier
} else {
slotSupplier = params.Tuner.GetActivityTaskSlotSupplier()
}

poller := newActivityTaskPoller(taskHandler, service, workerParams)
workerType := "ActivityWorker"
logger := log.With(workerParams.Logger, tagWorkerType, workerType)
metricsHandler := workerParams.MetricsHandler.WithTags(metrics.WorkerTags(workerType))
base := newBaseWorker(
baseWorkerOptions{
pollerCount: workerParams.MaxConcurrentActivityTaskQueuePollers,
pollerCount: params.MaxConcurrentActivityTaskQueuePollers,
pollerRate: defaultPollerRate,
slotSupplier: workerParams.Tuner.GetActivityTaskSlotSupplier(),
maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond,
slotSupplier: slotSupplier,
maxTaskPerSecond: params.WorkerActivitiesPerSecond,
taskWorker: poller,
identity: workerParams.Identity,
buildId: workerParams.getBuildID(),
logger: logger,
stopTimeout: workerParams.WorkerStopTimeout,
fatalErrCb: workerParams.WorkerFatalErrorCallback,
userContextCancel: workerParams.UserContextCancel,
metricsHandler: metricsHandler,
workerType: "ActivityWorker",
identity: params.Identity,
buildId: params.getBuildID(),
logger: params.Logger,
stopTimeout: params.WorkerStopTimeout,
fatalErrCb: params.WorkerFatalErrorCallback,
userContextCancel: params.UserContextCancel,
metricsHandler: params.MetricsHandler,
sessionTokenBucket: sessionTokenBucket,
slotReservationData: slotReservationData{
taskQueue: workerParams.TaskQueue,
taskQueue: params.TaskQueue,
},
},
)

return &activityWorker{
executionParameters: workerParams,
executionParameters: params,
workflowService: service,
worker: base,
poller: poller,
identity: workerParams.Identity,
stopC: stopC,
identity: params.Identity,
stopC: workerStopChannel,
}
}

Expand Down
11 changes: 7 additions & 4 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ type (
slotSupplier SlotSupplier
maxTaskPerSecond float64
taskWorker taskPoller
workerType string
identity string
buildId string
logger log.Logger
Expand Down Expand Up @@ -276,9 +277,11 @@ func newBaseWorker(
options baseWorkerOptions,
) *baseWorker {
ctx, cancel := context.WithCancel(context.Background())
logger := log.With(options.logger, tagWorkerType, options.workerType)
metricsHandler := options.metricsHandler.WithTags(metrics.WorkerTags(options.workerType))
tss := newTrackingSlotSupplier(options.slotSupplier, trackingSlotSupplierOptions{
logger: options.logger,
metricsHandler: options.metricsHandler,
logger: logger,
metricsHandler: metricsHandler,
workerBuildId: options.buildId,
workerIdentity: options.identity,
})
Expand All @@ -287,8 +290,8 @@ func newBaseWorker(
stopCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: options.logger,
metricsHandler: options.metricsHandler,
logger: logger,
metricsHandler: metricsHandler,

slotSupplier: tss,
// No buffer, so pollers are only able to poll for new tasks after the previous one is
Expand Down
10 changes: 4 additions & 6 deletions internal/tuning.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,8 @@ func NewFixedSizeTuner(options FixedSizeTunerOptions) (WorkerTuner, error) {
// FixedSizeSlotSupplier is a slot supplier that will only ever issue at most a fixed number of
// slots.
type FixedSizeSlotSupplier struct {
// The maximum number of slots that this supplier will ever issue.
NumSlots int

sem *semaphore.Weighted
numSlots int
sem *semaphore.Weighted
}

// NewFixedSizeSlotSupplier creates a new FixedSizeSlotSupplier with the given number of slots.
Expand All @@ -258,7 +256,7 @@ func NewFixedSizeSlotSupplier(numSlots int) (*FixedSizeSlotSupplier, error) {
return nil, fmt.Errorf("NumSlots must be positive")
}
return &FixedSizeSlotSupplier{
NumSlots: numSlots,
numSlots: numSlots,
sem: semaphore.NewWeighted(int64(numSlots)),
}, nil
}
Expand All @@ -283,7 +281,7 @@ func (f *FixedSizeSlotSupplier) ReleaseSlot(SlotReleaseInfo) {
f.sem.Release(1)
}
func (f *FixedSizeSlotSupplier) MaxSlots() int {
return f.NumSlots
return f.numSlots
}

type slotReservationData struct {
Expand Down

0 comments on commit 69a7a34

Please sign in to comment.