From 0d3a8bb9037d4b736172b8afcbe6dbe65edb7cf8 Mon Sep 17 00:00:00 2001 From: jazev-stripe <128553781+jazev-stripe@users.noreply.github.com> Date: Tue, 30 Jul 2024 15:20:22 -0700 Subject: [PATCH] Support passing activity task rate limit on worker options (#311) * support passing activity task rate limit on worker options * remove extra space in README --- README.md | 3 +- lib/temporal/activity/poller.rb | 10 ++++- lib/temporal/connection/grpc.rb | 8 +++- lib/temporal/worker.rb | 19 ++++++-- .../unit/lib/temporal/activity/poller_spec.rb | 30 +++++++++++++ spec/unit/lib/temporal/grpc_spec.rb | 43 +++++++++++++++++++ spec/unit/lib/temporal/worker_spec.rb | 35 ++++++++++++--- 7 files changed, 136 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index ff47a2d5..63888d8b 100644 --- a/README.md +++ b/README.md @@ -90,7 +90,8 @@ Temporal::Worker.new( workflow_thread_pool_size: 10, # how many threads poll for workflows binary_checksum: nil, # identifies the version of workflow worker code activity_poll_retry_seconds: 0, # how many seconds to wait after unsuccessful poll for activities - workflow_poll_retry_seconds: 0 # how many seconds to wait after unsuccessful poll for workflows + workflow_poll_retry_seconds: 0, # how many seconds to wait after unsuccessful poll for workflows + activity_max_tasks_per_second: 0 # rate-limit for starting activity tasks (new activities + retries) on the task queue ) ``` diff --git a/lib/temporal/activity/poller.rb b/lib/temporal/activity/poller.rb index 29c0977d..859fb688 100644 --- a/lib/temporal/activity/poller.rb +++ b/lib/temporal/activity/poller.rb @@ -11,7 +11,8 @@ class Activity class Poller DEFAULT_OPTIONS = { thread_pool_size: 20, - poll_retry_seconds: 0 + poll_retry_seconds: 0, + max_tasks_per_second: 0 # unlimited }.freeze def initialize(namespace, task_queue, activity_lookup, config, middleware = [], options = {}) @@ -91,7 +92,8 @@ def poll_loop end def poll_for_task - connection.poll_activity_task_queue(namespace: namespace, task_queue: task_queue) + connection.poll_activity_task_queue(namespace: namespace, task_queue: task_queue, + max_tasks_per_second: max_tasks_per_second) rescue ::GRPC::Cancelled # We're shutting down and we've already reported that in the logs nil @@ -115,6 +117,10 @@ def poll_retry_seconds @options[:poll_retry_seconds] end + def max_tasks_per_second + @options[:max_tasks_per_second] + end + def thread_pool @thread_pool ||= ThreadPool.new( options[:thread_pool_size], diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index fa246e9f..282a262c 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -256,7 +256,7 @@ def respond_workflow_task_failed(namespace:, task_token:, cause:, exception:, bi client.respond_workflow_task_failed(request) end - def poll_activity_task_queue(namespace:, task_queue:) + def poll_activity_task_queue(namespace:, task_queue:, max_tasks_per_second: 0) request = Temporalio::Api::WorkflowService::V1::PollActivityTaskQueueRequest.new( identity: identity, namespace: namespace, @@ -265,6 +265,12 @@ def poll_activity_task_queue(namespace:, task_queue:) ) ) + if max_tasks_per_second > 0 + request.task_queue_metadata = Temporalio::Api::TaskQueue::V1::TaskQueueMetadata.new( + max_tasks_per_second: Google::Protobuf::DoubleValue.new(value: max_tasks_per_second) + ) + end + poll_mutex.synchronize do return unless can_poll? diff --git a/lib/temporal/worker.rb b/lib/temporal/worker.rb index 5d84df6e..e9a3b2f3 100644 --- a/lib/temporal/worker.rb +++ b/lib/temporal/worker.rb @@ -9,7 +9,7 @@ module Temporal class Worker # activity_thread_pool_size: number of threads that the poller can use to run activities. # can be set to 1 if you want no paralellism in your activities, at the cost of throughput. - + # # binary_checksum: The binary checksum identifies the version of workflow worker code. It is set on each completed or failed workflow # task. It is present in API responses that return workflow execution info, and is shown in temporal-web and tctl. # It is traditionally a checksum of the application binary. However, Temporal server treats this as an opaque @@ -21,13 +21,25 @@ class Worker # from workers with these bad versions. # # See https://docs.temporal.io/docs/tctl/how-to-use-tctl/#recovery-from-bad-deployment----auto-reset-workflow + # + # activity_max_tasks_per_second: Optional: Sets the rate limiting on number of activities that can be executed per second + # + # This limits new activities being started and activity attempts being scheduled. It does NOT + # limit the number of concurrent activities being executed on this task queue. + # + # This is managed by the server and controls activities per second for the entire task queue + # across all the workers. Notice that the number is represented in double, so that you can set + # it to less than 1 if needed. For example, set the number to 0.1 means you want your activity + # to be executed once every 10 seconds. This can be used to protect down stream services from + # flooding. The zero value of this uses the default value. Default is unlimited. def initialize( config = Temporal.configuration, activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], workflow_thread_pool_size: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:thread_pool_size], binary_checksum: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:binary_checksum], activity_poll_retry_seconds: Temporal::Activity::Poller::DEFAULT_OPTIONS[:poll_retry_seconds], - workflow_poll_retry_seconds: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:poll_retry_seconds] + workflow_poll_retry_seconds: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:poll_retry_seconds], + activity_max_tasks_per_second: Temporal::Activity::Poller::DEFAULT_OPTIONS[:max_tasks_per_second] ) @config = config @workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @@ -39,7 +51,8 @@ def initialize( @shutting_down = false @activity_poller_options = { thread_pool_size: activity_thread_pool_size, - poll_retry_seconds: activity_poll_retry_seconds + poll_retry_seconds: activity_poll_retry_seconds, + max_tasks_per_second: activity_max_tasks_per_second } @workflow_poller_options = { thread_pool_size: workflow_thread_pool_size, diff --git a/spec/unit/lib/temporal/activity/poller_spec.rb b/spec/unit/lib/temporal/activity/poller_spec.rb index 76d8396a..3e5d24c7 100644 --- a/spec/unit/lib/temporal/activity/poller_spec.rb +++ b/spec/unit/lib/temporal/activity/poller_spec.rb @@ -199,6 +199,36 @@ def call(_); end end end + context 'when max_tasks_per_second is set' do + subject do + described_class.new( + namespace, + task_queue, + lookup, + config, + middleware, + { + max_tasks_per_second: 32 + } + ) + end + + it 'sends PollActivityTaskQueue requests with the configured task rate-limit' do + times = poll(nil, times: 2) + expect(times).to be >= 2 + + expect(connection).to have_received(:poll_activity_task_queue) + .with( + namespace: namespace, + task_queue: task_queue, + max_tasks_per_second: 32 + ) + .at_least(2) + .times + end + end + + context 'when connection is unable to poll and poll_retry_seconds is set' do subject do described_class.new( diff --git a/spec/unit/lib/temporal/grpc_spec.rb b/spec/unit/lib/temporal/grpc_spec.rb index 0799c5d0..cb97469f 100644 --- a/spec/unit/lib/temporal/grpc_spec.rb +++ b/spec/unit/lib/temporal/grpc_spec.rb @@ -646,6 +646,49 @@ class TestDeserializer end end + describe '#poll_activity_task_queue' do + let(:task_queue) { 'test-task-queue' } + let(:temporal_response) do + Temporalio::Api::WorkflowService::V1::PollActivityTaskQueueResponse.new + end + let(:poll_request) do + instance_double( + "GRPC::ActiveCall::Operation", + execute: temporal_response + ) + end + + before do + allow(grpc_stub).to receive(:poll_activity_task_queue).with(anything, return_op: true).and_return(poll_request) + end + + it 'makes an API request' do + subject.poll_activity_task_queue(namespace: namespace, task_queue: task_queue) + + expect(grpc_stub).to have_received(:poll_activity_task_queue) do |request| + expect(request).to be_an_instance_of(Temporalio::Api::WorkflowService::V1::PollActivityTaskQueueRequest) + expect(request.namespace).to eq(namespace) + expect(request.task_queue.name).to eq(task_queue) + expect(request.identity).to eq(identity) + expect(request.task_queue_metadata).to be_nil + end + end + + it 'makes an API request with max_tasks_per_second in the metadata' do + subject.poll_activity_task_queue(namespace: namespace, task_queue: task_queue, max_tasks_per_second: 10) + + expect(grpc_stub).to have_received(:poll_activity_task_queue) do |request| + expect(request).to be_an_instance_of(Temporalio::Api::WorkflowService::V1::PollActivityTaskQueueRequest) + expect(request.namespace).to eq(namespace) + expect(request.task_queue.name).to eq(task_queue) + expect(request.identity).to eq(identity) + expect(request.task_queue_metadata).to_not be_nil + expect(request.task_queue_metadata.max_tasks_per_second).to_not be_nil + expect(request.task_queue_metadata.max_tasks_per_second.value).to eq(10) + end + end + end + describe '#add_custom_search_attributes' do it 'calls GRPC service with supplied arguments' do allow(grpc_operator_stub).to receive(:add_search_attributes) diff --git a/spec/unit/lib/temporal/worker_spec.rb b/spec/unit/lib/temporal/worker_spec.rb index 685e07a0..4fffc5d8 100644 --- a/spec/unit/lib/temporal/worker_spec.rb +++ b/spec/unit/lib/temporal/worker_spec.rb @@ -293,7 +293,8 @@ def start_and_stop(worker) config, [], thread_pool_size: 20, - poll_retry_seconds: 0 + poll_retry_seconds: 0, + max_tasks_per_second: 0 ) .and_return(activity_poller_1) @@ -306,7 +307,8 @@ def start_and_stop(worker) config, [], thread_pool_size: 20, - poll_retry_seconds: 0 + poll_retry_seconds: 0, + max_tasks_per_second: 0 ) .and_return(activity_poller_2) @@ -333,7 +335,7 @@ def start_and_stop(worker) an_instance_of(Temporal::ExecutableLookup), an_instance_of(Temporal::Configuration), [], - {thread_pool_size: 10, poll_retry_seconds: 0} + {thread_pool_size: 10, poll_retry_seconds: 0, max_tasks_per_second: 0} ) .and_return(activity_poller) @@ -406,7 +408,7 @@ def start_and_stop(worker) an_instance_of(Temporal::ExecutableLookup), an_instance_of(Temporal::Configuration), [], - {thread_pool_size: 20, poll_retry_seconds: 10} + {thread_pool_size: 20, poll_retry_seconds: 10, max_tasks_per_second: 0} ) .and_return(activity_poller) @@ -441,6 +443,28 @@ def start_and_stop(worker) expect(workflow_poller).to have_received(:start) end + it 'can have an activity poller that registers a task rate limit' do + activity_poller = instance_double(Temporal::Activity::Poller, start: nil, stop_polling: nil, cancel_pending_requests: nil, wait: nil) + expect(Temporal::Activity::Poller) + .to receive(:new) + .with( + 'default-namespace', + 'default-task-queue', + an_instance_of(Temporal::ExecutableLookup), + an_instance_of(Temporal::Configuration), + [], + {thread_pool_size: 20, poll_retry_seconds: 0, max_tasks_per_second: 5} + ) + .and_return(activity_poller) + + worker = Temporal::Worker.new(activity_max_tasks_per_second: 5) + worker.register_activity(TestWorkerActivity) + + start_and_stop(worker) + + expect(activity_poller).to have_received(:start) + end + context 'when middleware is configured' do let(:entry_1) { instance_double(Temporal::Middleware::Entry) } let(:entry_2) { instance_double(Temporal::Middleware::Entry) } @@ -492,7 +516,8 @@ def start_and_stop(worker) config, [entry_2], thread_pool_size: 20, - poll_retry_seconds: 0 + poll_retry_seconds: 0, + max_tasks_per_second: 0 ) .and_return(activity_poller_1)