diff --git a/lib/good_job/active_job_extensions/concurrency.rb b/lib/good_job/active_job_extensions/concurrency.rb index 14762221..257210f1 100644 --- a/lib/good_job/active_job_extensions/concurrency.rb +++ b/lib/good_job/active_job_extensions/concurrency.rb @@ -28,28 +28,77 @@ def deserialize(job_data) class_attribute :good_job_concurrency_config, instance_accessor: false, default: {} attr_writer :good_job_concurrency_key - if ActiveJob.gem_version >= Gem::Version.new("6.1.0") - before_enqueue do |job| - good_job_enqueue_concurrency_check(job, on_abort: -> { throw(:abort) }, on_enqueue: nil) - end - else - around_enqueue do |job, block| - good_job_enqueue_concurrency_check(job, on_abort: nil, on_enqueue: block) - end - end - wait_key = if ActiveJob.gem_version >= Gem::Version.new("7.1.0.a") :polynomially_longer else :exponentially_longer end - retry_on( GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError, attempts: Float::INFINITY, wait: wait_key ) + before_enqueue do |job| + # Don't attempt to enforce concurrency limits with other queue adapters. + next unless job.class.queue_adapter.is_a?(GoodJob::Adapter) + + # Always allow jobs to be retried because the current job's execution will complete momentarily + next if CurrentThread.active_job_id == job.job_id + + # Only generate the concurrency key on the initial enqueue in case it is dynamic + job.good_job_concurrency_key ||= job._good_job_concurrency_key + key = job.good_job_concurrency_key + next if key.blank? + + enqueue_limit = job.class.good_job_concurrency_config[:enqueue_limit] + enqueue_limit = instance_exec(&enqueue_limit) if enqueue_limit.respond_to?(:call) + enqueue_limit = nil unless enqueue_limit.present? && (0...Float::INFINITY).cover?(enqueue_limit) + + unless enqueue_limit + total_limit = job.class.good_job_concurrency_config[:total_limit] + total_limit = instance_exec(&total_limit) if total_limit.respond_to?(:call) + total_limit = nil unless total_limit.present? && (0...Float::INFINITY).cover?(total_limit) + end + + enqueue_throttle = job.class.good_job_concurrency_config[:enqueue_throttle] + enqueue_throttle = instance_exec(&enqueue_throttle) if enqueue_throttle.respond_to?(:call) + enqueue_throttle = nil unless enqueue_throttle.present? && enqueue_throttle.is_a?(Array) && enqueue_throttle.size == 2 + + limit = enqueue_limit || total_limit + throttle = enqueue_throttle + next unless limit || throttle + + GoodJob::Job.advisory_lock_key(key, function: "pg_advisory_lock") do + if limit + enqueue_concurrency = if enqueue_limit + GoodJob::Job.where(concurrency_key: key).unfinished.advisory_unlocked.count + else + GoodJob::Job.where(concurrency_key: key).unfinished.count + end + + # The job has not yet been enqueued, so check if adding it will go over the limit + if (enqueue_concurrency + 1) > limit + logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its enqueue limit of #{limit} #{'job'.pluralize(limit)}" + throw :abort + end + end + + if throttle + throttle_limit = throttle[0] + throttle_period = throttle[1] + enqueued_within_period = GoodJob::Job.where(concurrency_key: key) + .where(GoodJob::Job.arel_table[:created_at].gt(throttle_period.ago)) + .count + + if (enqueued_within_period + 1) > throttle_limit + logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its throttle limit of #{limit} #{'job'.pluralize(limit)}" + throw :abort + end + end + end + end + before_perform do |job| # Don't attempt to enforce concurrency limits with other queue adapters. next unless job.class.queue_adapter.is_a?(GoodJob::Adapter) @@ -139,72 +188,6 @@ def _good_job_concurrency_key def _good_job_default_concurrency_key self.class.name.to_s end - - private - - def good_job_enqueue_concurrency_check(job, on_abort:, on_enqueue:) - # Don't attempt to enforce concurrency limits with other queue adapters. - return on_enqueue&.call unless job.class.queue_adapter.is_a?(GoodJob::Adapter) - - # Always allow jobs to be retried because the current job's execution will complete momentarily - return on_enqueue&.call if CurrentThread.active_job_id == job.job_id - - # Only generate the concurrency key on the initial enqueue in case it is dynamic - job.good_job_concurrency_key ||= job._good_job_concurrency_key - key = job.good_job_concurrency_key - return on_enqueue&.call if key.blank? - - enqueue_limit = job.class.good_job_concurrency_config[:enqueue_limit] - enqueue_limit = instance_exec(&enqueue_limit) if enqueue_limit.respond_to?(:call) - enqueue_limit = nil unless enqueue_limit.present? && (0...Float::INFINITY).cover?(enqueue_limit) - - unless enqueue_limit - total_limit = job.class.good_job_concurrency_config[:total_limit] - total_limit = instance_exec(&total_limit) if total_limit.respond_to?(:call) - total_limit = nil unless total_limit.present? && (0...Float::INFINITY).cover?(total_limit) - end - - enqueue_throttle = job.class.good_job_concurrency_config[:enqueue_throttle] - enqueue_throttle = instance_exec(&enqueue_throttle) if enqueue_throttle.respond_to?(:call) - enqueue_throttle = nil unless enqueue_throttle.present? && enqueue_throttle.is_a?(Array) && enqueue_throttle.size == 2 - - limit = enqueue_limit || total_limit - throttle = enqueue_throttle - return on_enqueue&.call unless limit || throttle - - GoodJob::Job.advisory_lock_key(key, function: "pg_advisory_lock") do - if limit - enqueue_concurrency = if enqueue_limit - GoodJob::Job.where(concurrency_key: key).unfinished.advisory_unlocked.count - else - GoodJob::Job.where(concurrency_key: key).unfinished.count - end - - # The job has not yet been enqueued, so check if adding it will go over the limit - if (enqueue_concurrency + 1) > limit - logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its enqueue limit of #{limit} #{'job'.pluralize(limit)}" - on_abort&.call - break - end - end - - if throttle - throttle_limit = throttle[0] - throttle_period = throttle[1] - enqueued_within_period = GoodJob::Job.where(concurrency_key: key) - .where(GoodJob::Job.arel_table[:created_at].gt(throttle_period.ago)) - .count - - if (enqueued_within_period + 1) > throttle_limit - logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its throttle limit of #{limit} #{'job'.pluralize(limit)}" - on_abort&.call - break - end - end - - on_enqueue&.call - end - end end end end