Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set job execution thread priority to -3 when in async mode #1560

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def start_async
return unless execute_async?

@capsule.start
@capsule.lower_thread_priority = true if GoodJob.configuration.lower_thread_priority.in?([true, nil])
@_async_started = true
end

Expand Down
10 changes: 9 additions & 1 deletion lib/good_job/capsule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def initialize(configuration: nil)

@shared_executor = GoodJob::SharedExecutor.new
@tracker = GoodJob::CapsuleTracker.new(executor: @shared_executor)
@lower_thread_priority = nil

self.class.instances << self
end
Expand All @@ -38,7 +39,9 @@ def start(force: false)

@notifier = GoodJob::Notifier.new(enable_listening: configuration.enable_listen_notify, capsule: self, executor: @shared_executor)
@poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
@multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, capsule: self, warm_cache_on_initialize: true)
@multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, capsule: self, warm_cache_on_initialize: true).tap do |multischeduler|
multischeduler.lower_thread_priority = @lower_thread_priority unless @lower_thread_priority.nil?
end
@notifier.recipients.push([@multi_scheduler, :create_thread])
@poller.recipients.push(-> { @multi_scheduler.create_thread({ fanout: true }) })

Expand Down Expand Up @@ -110,6 +113,11 @@ def process_id
@tracker.process_id
end

def lower_thread_priority=(value)
@lower_thread_priority = value
@multi_scheduler&.lower_thread_priority = value
end

private

def configuration
Expand Down
8 changes: 8 additions & 0 deletions lib/good_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,14 @@ def in_webserver?
end || false
end

def lower_thread_priority
return options[:lower_thread_priority] unless options[:lower_thread_priority].nil?
return rails_config[:lower_thread_priority] unless rails_config[:lower_thread_priority].nil?
return ActiveModel::Type::Boolean.new.cast(env['GOOD_JOB_LOWER_THREAD_PRIORITY']) unless env['GOOD_JOB_LOWER_THREAD_PRIORITY'].nil?

nil
end

# Whether to take an advisory lock on the process record in the notifier reactor.
# @return [Boolean]
def advisory_lock_heartbeat
Expand Down
9 changes: 8 additions & 1 deletion lib/good_job/multi_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def self.from_configuration(configuration, capsule: GoodJob.capsule, warm_cache_
max_cache: configuration.max_cache,
warm_cache_on_initialize: warm_cache_on_initialize,
cleanup_interval_seconds: configuration.cleanup_interval_seconds,
cleanup_interval_jobs: configuration.cleanup_interval_jobs
cleanup_interval_jobs: configuration.cleanup_interval_jobs,
lower_thread_priority: configuration.lower_thread_priority
)
end

Expand Down Expand Up @@ -85,6 +86,12 @@ def create_thread(state = nil)
end
end

def lower_thread_priority=(value)
schedulers.each do |scheduler|
scheduler.lower_thread_priority = value
end
end

def stats
scheduler_stats = schedulers.map(&:stats)

Expand Down
13 changes: 12 additions & 1 deletion lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class Scheduler
fallback_policy: :discard,
}.freeze

# In CRuby, this sets the thread quantum to ~12.5ms ( 100ms * 2^(-3) ).
LOW_THREAD_PRIORITY = -3

# @!attribute [r] instances
# @!scope class
# List of all instantiated Schedulers in the current process.
Expand All @@ -39,13 +42,18 @@ class Scheduler
# @return [String]
attr_reader :name

# Whether to lower the thread priority to a fixed value
# @return [Boolean]
attr_accessor :lower_thread_priority

# @param performer [GoodJob::JobPerformer]
# @param max_threads [Numeric, nil] number of seconds between polls for jobs
# @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory
# @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately, or manually by calling +warm_cache+
# @param cleanup_interval_seconds [Numeric, nil] number of seconds between cleaning up job records
# @param cleanup_interval_jobs [Numeric, nil] number of executed jobs between cleaning up job records
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil)
# @param lower_thread_priority [Boolean] whether to lower the thread priority of execution threads
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil, lower_thread_priority: false)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

@performer = performer
Expand All @@ -62,6 +70,8 @@ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initia
@cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs)
@executor_options[:name] = name

self.lower_thread_priority = lower_thread_priority

create_executor
warm_cache if warm_cache_on_initialize
self.class.instances << self
Expand Down Expand Up @@ -271,6 +281,7 @@ def create_task(delay = 0, fanout: false)
future = Concurrent::ScheduledTask.new(delay, args: [self, performer], executor: executor, timer_set: timer_set) do |thr_scheduler, thr_performer|
Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name
Thread.current[:good_job_scheduler] = thr_scheduler
Thread.current.priority = -3 if thr_scheduler.lower_thread_priority

Rails.application.reloader.wrap do
thr_performer.next do |found|
Expand Down
12 changes: 11 additions & 1 deletion spec/lib/good_job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def perform(succeed: true)
allow(GoodJob::Job).to receive(:enqueue).and_return(good_job)
allow(GoodJob::Notifier).to receive(:notify)

capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil)
capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil, "lower_thread_priority=": nil)
allow(GoodJob).to receive(:capsule).and_return(capsule)
allow(capsule).to receive(:start)

Expand All @@ -99,6 +99,16 @@ def perform(succeed: true)
expect(capsule).to have_received(:create_thread)
expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default' })
end

it 'lowers the thread priority of the capsule' do
capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil, "lower_thread_priority=": nil)
allow(GoodJob).to receive(:capsule).and_return(capsule)
allow(capsule).to receive(:start)

described_class.new(execution_mode: :async_all)

expect(capsule).to have_received(:lower_thread_priority=).with(true)
end
end
end

Expand Down