Skip to content

Commit

Permalink
Refactor Concurrency extension for Rails 6.1+ compatibility (#1429)
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon authored Jul 16, 2024
1 parent d934264 commit 327cb92
Showing 1 changed file with 60 additions and 77 deletions.
137 changes: 60 additions & 77 deletions lib/good_job/active_job_extensions/concurrency.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,77 @@ def deserialize(job_data)
class_attribute :good_job_concurrency_config, instance_accessor: false, default: {}
attr_writer :good_job_concurrency_key

if ActiveJob.gem_version >= Gem::Version.new("6.1.0")
before_enqueue do |job|
good_job_enqueue_concurrency_check(job, on_abort: -> { throw(:abort) }, on_enqueue: nil)
end
else
around_enqueue do |job, block|
good_job_enqueue_concurrency_check(job, on_abort: nil, on_enqueue: block)
end
end

wait_key = if ActiveJob.gem_version >= Gem::Version.new("7.1.0.a")
:polynomially_longer
else
:exponentially_longer
end

retry_on(
GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError,
attempts: Float::INFINITY,
wait: wait_key
)

before_enqueue do |job|
# Don't attempt to enforce concurrency limits with other queue adapters.
next unless job.class.queue_adapter.is_a?(GoodJob::Adapter)

# Always allow jobs to be retried because the current job's execution will complete momentarily
next if CurrentThread.active_job_id == job.job_id

# Only generate the concurrency key on the initial enqueue in case it is dynamic
job.good_job_concurrency_key ||= job._good_job_concurrency_key
key = job.good_job_concurrency_key
next if key.blank?

enqueue_limit = job.class.good_job_concurrency_config[:enqueue_limit]
enqueue_limit = instance_exec(&enqueue_limit) if enqueue_limit.respond_to?(:call)
enqueue_limit = nil unless enqueue_limit.present? && (0...Float::INFINITY).cover?(enqueue_limit)

unless enqueue_limit
total_limit = job.class.good_job_concurrency_config[:total_limit]
total_limit = instance_exec(&total_limit) if total_limit.respond_to?(:call)
total_limit = nil unless total_limit.present? && (0...Float::INFINITY).cover?(total_limit)
end

enqueue_throttle = job.class.good_job_concurrency_config[:enqueue_throttle]
enqueue_throttle = instance_exec(&enqueue_throttle) if enqueue_throttle.respond_to?(:call)
enqueue_throttle = nil unless enqueue_throttle.present? && enqueue_throttle.is_a?(Array) && enqueue_throttle.size == 2

limit = enqueue_limit || total_limit
throttle = enqueue_throttle
next unless limit || throttle

GoodJob::Job.advisory_lock_key(key, function: "pg_advisory_lock") do
if limit
enqueue_concurrency = if enqueue_limit
GoodJob::Job.where(concurrency_key: key).unfinished.advisory_unlocked.count
else
GoodJob::Job.where(concurrency_key: key).unfinished.count
end

# The job has not yet been enqueued, so check if adding it will go over the limit
if (enqueue_concurrency + 1) > limit
logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its enqueue limit of #{limit} #{'job'.pluralize(limit)}"
throw :abort
end
end

if throttle
throttle_limit = throttle[0]
throttle_period = throttle[1]
enqueued_within_period = GoodJob::Job.where(concurrency_key: key)
.where(GoodJob::Job.arel_table[:created_at].gt(throttle_period.ago))
.count

if (enqueued_within_period + 1) > throttle_limit
logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its throttle limit of #{limit} #{'job'.pluralize(limit)}"
throw :abort
end
end
end
end

before_perform do |job|
# Don't attempt to enforce concurrency limits with other queue adapters.
next unless job.class.queue_adapter.is_a?(GoodJob::Adapter)
Expand Down Expand Up @@ -139,72 +188,6 @@ def _good_job_concurrency_key
def _good_job_default_concurrency_key
self.class.name.to_s
end

private

def good_job_enqueue_concurrency_check(job, on_abort:, on_enqueue:)
# Don't attempt to enforce concurrency limits with other queue adapters.
return on_enqueue&.call unless job.class.queue_adapter.is_a?(GoodJob::Adapter)

# Always allow jobs to be retried because the current job's execution will complete momentarily
return on_enqueue&.call if CurrentThread.active_job_id == job.job_id

# Only generate the concurrency key on the initial enqueue in case it is dynamic
job.good_job_concurrency_key ||= job._good_job_concurrency_key
key = job.good_job_concurrency_key
return on_enqueue&.call if key.blank?

enqueue_limit = job.class.good_job_concurrency_config[:enqueue_limit]
enqueue_limit = instance_exec(&enqueue_limit) if enqueue_limit.respond_to?(:call)
enqueue_limit = nil unless enqueue_limit.present? && (0...Float::INFINITY).cover?(enqueue_limit)

unless enqueue_limit
total_limit = job.class.good_job_concurrency_config[:total_limit]
total_limit = instance_exec(&total_limit) if total_limit.respond_to?(:call)
total_limit = nil unless total_limit.present? && (0...Float::INFINITY).cover?(total_limit)
end

enqueue_throttle = job.class.good_job_concurrency_config[:enqueue_throttle]
enqueue_throttle = instance_exec(&enqueue_throttle) if enqueue_throttle.respond_to?(:call)
enqueue_throttle = nil unless enqueue_throttle.present? && enqueue_throttle.is_a?(Array) && enqueue_throttle.size == 2

limit = enqueue_limit || total_limit
throttle = enqueue_throttle
return on_enqueue&.call unless limit || throttle

GoodJob::Job.advisory_lock_key(key, function: "pg_advisory_lock") do
if limit
enqueue_concurrency = if enqueue_limit
GoodJob::Job.where(concurrency_key: key).unfinished.advisory_unlocked.count
else
GoodJob::Job.where(concurrency_key: key).unfinished.count
end

# The job has not yet been enqueued, so check if adding it will go over the limit
if (enqueue_concurrency + 1) > limit
logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its enqueue limit of #{limit} #{'job'.pluralize(limit)}"
on_abort&.call
break
end
end

if throttle
throttle_limit = throttle[0]
throttle_period = throttle[1]
enqueued_within_period = GoodJob::Job.where(concurrency_key: key)
.where(GoodJob::Job.arel_table[:created_at].gt(throttle_period.ago))
.count

if (enqueued_within_period + 1) > throttle_limit
logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its throttle limit of #{limit} #{'job'.pluralize(limit)}"
on_abort&.call
break
end
end

on_enqueue&.call
end
end
end
end
end

0 comments on commit 327cb92

Please sign in to comment.