From f7c29a6fe1fa589ad411092a9c5d9505c375b03a Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Fri, 6 Dec 2024 19:03:05 -0800 Subject: [PATCH] Set job execution thread priority to `-3` when in async mode --- lib/good_job/adapter.rb | 1 + lib/good_job/capsule.rb | 10 +++++++++- lib/good_job/configuration.rb | 8 ++++++++ lib/good_job/multi_scheduler.rb | 9 ++++++++- lib/good_job/scheduler.rb | 13 ++++++++++++- spec/lib/good_job/adapter_spec.rb | 12 +++++++++++- 6 files changed, 49 insertions(+), 4 deletions(-) diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb index 9f4cb47ad..bd593f5c3 100644 --- a/lib/good_job/adapter.rb +++ b/lib/good_job/adapter.rb @@ -216,6 +216,7 @@ def start_async return unless execute_async? @capsule.start + @capsule.lower_thread_priority = true if GoodJob.configuration.lower_thread_priority.in?([true, nil]) @_async_started = true end diff --git a/lib/good_job/capsule.rb b/lib/good_job/capsule.rb index afa82a2bb..86302a983 100644 --- a/lib/good_job/capsule.rb +++ b/lib/good_job/capsule.rb @@ -24,6 +24,7 @@ def initialize(configuration: nil) @shared_executor = GoodJob::SharedExecutor.new @tracker = GoodJob::CapsuleTracker.new(executor: @shared_executor) + @lower_thread_priority = nil self.class.instances << self end @@ -38,7 +39,9 @@ def start(force: false) @notifier = GoodJob::Notifier.new(enable_listening: configuration.enable_listen_notify, capsule: self, executor: @shared_executor) @poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval) - @multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, capsule: self, warm_cache_on_initialize: true) + @multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, capsule: self, warm_cache_on_initialize: true).tap do |multischeduler| + multischeduler.lower_thread_priority = @lower_thread_priority unless @lower_thread_priority.nil? + end @notifier.recipients.push([@multi_scheduler, :create_thread]) @poller.recipients.push(-> { @multi_scheduler.create_thread({ fanout: true }) }) @@ -110,6 +113,11 @@ def process_id @tracker.process_id end + def lower_thread_priority=(value) + @lower_thread_priority = value + @multi_scheduler&.lower_thread_priority = value + end + private def configuration diff --git a/lib/good_job/configuration.rb b/lib/good_job/configuration.rb index a0019c875..65ca7d666 100644 --- a/lib/good_job/configuration.rb +++ b/lib/good_job/configuration.rb @@ -383,6 +383,14 @@ def in_webserver? end || false end + def lower_thread_priority + return options[:lower_thread_priority] unless options[:lower_thread_priority].nil? + return rails_config[:lower_thread_priority] unless rails_config[:lower_thread_priority].nil? + return ActiveModel::Type::Boolean.new.cast(env['GOOD_JOB_LOWER_THREAD_PRIORITY']) unless env['GOOD_JOB_LOWER_THREAD_PRIORITY'].nil? + + nil + end + # Whether to take an advisory lock on the process record in the notifier reactor. # @return [Boolean] def advisory_lock_heartbeat diff --git a/lib/good_job/multi_scheduler.rb b/lib/good_job/multi_scheduler.rb index f0815c10e..a5440587c 100644 --- a/lib/good_job/multi_scheduler.rb +++ b/lib/good_job/multi_scheduler.rb @@ -19,7 +19,8 @@ def self.from_configuration(configuration, capsule: GoodJob.capsule, warm_cache_ max_cache: configuration.max_cache, warm_cache_on_initialize: warm_cache_on_initialize, cleanup_interval_seconds: configuration.cleanup_interval_seconds, - cleanup_interval_jobs: configuration.cleanup_interval_jobs + cleanup_interval_jobs: configuration.cleanup_interval_jobs, + lower_thread_priority: configuration.lower_thread_priority ) end @@ -85,6 +86,12 @@ def create_thread(state = nil) end end + def lower_thread_priority=(value) + schedulers.each do |scheduler| + scheduler.lower_thread_priority = value + end + end + def stats scheduler_stats = schedulers.map(&:stats) diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index 9af6b1ca3..32ad6d13e 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -29,6 +29,9 @@ class Scheduler fallback_policy: :discard, }.freeze + # In CRuby, this sets the thread quantum to ~12.5ms ( 100ms * 2^(-3) ). + LOW_THREAD_PRIORITY = -3 + # @!attribute [r] instances # @!scope class # List of all instantiated Schedulers in the current process. @@ -39,13 +42,18 @@ class Scheduler # @return [String] attr_reader :name + # Whether to lower the thread priority to a fixed value + # @return [Boolean] + attr_accessor :lower_thread_priority + # @param performer [GoodJob::JobPerformer] # @param max_threads [Numeric, nil] number of seconds between polls for jobs # @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory # @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately, or manually by calling +warm_cache+ # @param cleanup_interval_seconds [Numeric, nil] number of seconds between cleaning up job records # @param cleanup_interval_jobs [Numeric, nil] number of executed jobs between cleaning up job records - def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil) + # @param lower_thread_priority [Boolean] whether to lower the thread priority of execution threads + def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil, lower_thread_priority: false) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) @performer = performer @@ -62,6 +70,8 @@ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initia @cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs) @executor_options[:name] = name + self.lower_thread_priority = lower_thread_priority + create_executor warm_cache if warm_cache_on_initialize self.class.instances << self @@ -271,6 +281,7 @@ def create_task(delay = 0, fanout: false) future = Concurrent::ScheduledTask.new(delay, args: [self, performer], executor: executor, timer_set: timer_set) do |thr_scheduler, thr_performer| Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name Thread.current[:good_job_scheduler] = thr_scheduler + Thread.current.priority = -3 if thr_scheduler.lower_thread_priority Rails.application.reloader.wrap do thr_performer.next do |found| diff --git a/spec/lib/good_job/adapter_spec.rb b/spec/lib/good_job/adapter_spec.rb index 6d3929493..a0312d836 100644 --- a/spec/lib/good_job/adapter_spec.rb +++ b/spec/lib/good_job/adapter_spec.rb @@ -88,7 +88,7 @@ def perform(succeed: true) allow(GoodJob::Job).to receive(:enqueue).and_return(good_job) allow(GoodJob::Notifier).to receive(:notify) - capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil) + capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil, "lower_thread_priority=": nil) allow(GoodJob).to receive(:capsule).and_return(capsule) allow(capsule).to receive(:start) @@ -99,6 +99,16 @@ def perform(succeed: true) expect(capsule).to have_received(:create_thread) expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default' }) end + + it 'lowers the thread priority of the capsule' do + capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil, "lower_thread_priority=": nil) + allow(GoodJob).to receive(:capsule).and_return(capsule) + allow(capsule).to receive(:start) + + described_class.new(execution_mode: :async_all) + + expect(capsule).to have_received(:lower_thread_priority=).with(true) + end end end