Skip to content

Commit

Permalink
Remove default limit of overall max eager activities and add max per …
Browse files Browse the repository at this point in the history
…task (#952)

Fixes #950
  • Loading branch information
cretz authored Nov 10, 2022
1 parent d82b001 commit 133a6a0
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker/dynamic-config-custom.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
system.forceSearchAttributesCacheRefreshOnRead:
- value: true # Dev setup only. Please don't turn this on in production.
constraints: {}
system.enableActivityLocalDispatch:
system.enableActivityEagerExecution:
- value: true
18 changes: 15 additions & 3 deletions internal/internal_eager_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,30 @@ func newEagerActivityExecutor(options eagerActivityExecutorOptions) *eagerActivi
func (e *eagerActivityExecutor) applyToRequest(
req *workflowservice.RespondWorkflowTaskCompletedRequest,
) (amountActivitySlotsReserved int) {
// Don't allow more than this hardcoded amount per workflow task for now
const maxPerTask = 3

// Go over every command checking for activities that can be eagerly executed
eagerRequestsThisTask := 0
for _, command := range req.Commands {
if attrs := command.GetScheduleActivityTaskCommandAttributes(); attrs != nil {
// If not present, disabled, or on a different task queue, we must mark as
// If not present, disabled, not requested, no activity worker, on a
// different task queue, or reached max for task, we must mark as
// explicitly disabled
if e == nil || e.disabled || e.activityWorker == nil || e.taskQueue != attrs.TaskQueue.GetName() {
eagerDisallowed := e == nil ||
e.disabled ||
!attrs.RequestEagerExecution ||
e.activityWorker == nil ||
e.taskQueue != attrs.TaskQueue.GetName() ||
eagerRequestsThisTask >= maxPerTask
if eagerDisallowed {
attrs.RequestEagerExecution = false
} else if attrs.RequestEagerExecution {
} else {
// If it has been requested, attempt to reserve one pending
attrs.RequestEagerExecution = e.reserveOnePendingSlot()
if attrs.RequestEagerExecution {
amountActivitySlotsReserved++
eagerRequestsThisTask++
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions internal/internal_eager_activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,26 @@ func TestEagerActivityWrongTaskQueue(t *testing.T) {
require.False(t, req.Commands[1].GetScheduleActivityTaskCommandAttributes().RequestEagerExecution)
}

func TestEagerActivityMaxPerTask(t *testing.T) {
exec := newEagerActivityExecutor(eagerActivityExecutorOptions{taskQueue: "task-queue1"})
exec.activityWorker = newActivityWorker(nil,
workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 10}, nil, newRegistry(), nil)
// Fill up the poller request channel
for i := 0; i < 10; i++ {
exec.activityWorker.worker.pollerRequestCh <- struct{}{}
}

// Add 8, but it limits to only the first 3
var req workflowservice.RespondWorkflowTaskCompletedRequest
for i := 0; i < 8; i++ {
addScheduleTaskCommand(&req, "task-queue1")
}
require.Equal(t, 3, exec.applyToRequest(&req))
for i := 0; i < 8; i++ {
require.Equal(t, i < 3, req.Commands[i].GetScheduleActivityTaskCommandAttributes().RequestEagerExecution)
}
}

func TestEagerActivityCounts(t *testing.T) {
// We'll create an eager activity executor with 3 max eager concurrent and 5
// max concurrent
Expand Down
8 changes: 2 additions & 6 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ const (
// center. And the poll API latency is about 5ms. With 2 poller, we could achieve around 300~400 RPS.
defaultConcurrentPollRoutineSize = 2

defaultMaxConcurrentActivityExecutionSize = 1000 // Large concurrent activity execution size (1k)
defaultMaxConcurrentEagerActivityExecutionSize = 3
defaultWorkerActivitiesPerSecond = 100000 // Large activity executions/sec (unlimited)
defaultMaxConcurrentActivityExecutionSize = 1000 // Large concurrent activity execution size (1k)
defaultWorkerActivitiesPerSecond = 100000 // Large activity executions/sec (unlimited)

defaultMaxConcurrentLocalActivityExecutionSize = 1000 // Large concurrent activity execution size (1k)
defaultWorkerLocalActivitiesPerSecond = 100000 // Large activity executions/sec (unlimited)
Expand Down Expand Up @@ -1603,9 +1602,6 @@ func setWorkerOptionsDefaults(options *WorkerOptions) {
if options.MaxHeartbeatThrottleInterval == 0 {
options.MaxHeartbeatThrottleInterval = defaultMaxHeartbeatThrottleInterval
}
if options.MaxConcurrentEagerActivityExecutionSize == 0 {
options.MaxConcurrentEagerActivityExecutionSize = defaultMaxConcurrentEagerActivityExecutionSize
}
}

// setClientDefaults should be needed only in unit tests.
Expand Down
3 changes: 1 addition & 2 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ type (
// workflow task schedules 3 more, only the first 2 will request eager
// execution.
//
// If unset/0, this defaults to 3. Users wanting effectively unlimited can
// set this to a high value. This is still bounded by
// The default of 0 means unlimited and therefore only bound by
// MaxConcurrentActivityExecutionSize.
//
// See DisableEagerActivities for a description of eager activity execution.
Expand Down

0 comments on commit 133a6a0

Please sign in to comment.