From 69b42bbd900932226cb4083e9e1c08f9109ded1d Mon Sep 17 00:00:00 2001 From: Ben Sheldon Date: Thu, 24 Jun 2021 14:28:13 -0700 Subject: [PATCH] Add concurrency extension for ActiveJob --- README.md | 30 ++++++++ lib/good_job/active_job_extensions.rb | 4 ++ .../active_job_extensions/concurrency.rb | 68 +++++++++++++++++++ lib/good_job/current_execution.rb | 17 +++-- lib/good_job/job.rb | 20 ++++++ lib/good_job/lockable.rb | 13 +++- .../active_job_extensions/concurrency_spec.rb | 64 +++++++++++++++++ spec/lib/good_job/current_execution_spec.rb | 15 ++++ spec/lib/good_job/lockable_spec.rb | 6 ++ spec/support/rails_versions.rb | 4 ++ 10 files changed, 233 insertions(+), 8 deletions(-) create mode 100644 lib/good_job/active_job_extensions.rb create mode 100644 lib/good_job/active_job_extensions/concurrency.rb create mode 100644 spec/lib/good_job/active_job_extensions/concurrency_spec.rb create mode 100644 spec/support/rails_versions.rb diff --git a/README.md b/README.md index 62ff9148d..4b8d220f8 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ For more of the story of GoodJob, read the [introductory blog post](https://isla - [Configuration options](#configuration-options) - [Global options](#global-options) - [Dashboard](#dashboard) + - [ActiveJob Concurrency](#activejob-concurrency) - [Updating](#updating) - [Go deeper](#go-deeper) - [Exceptions, retries, and reliability](#exceptions-retries-and-reliability) @@ -319,6 +320,35 @@ GoodJob includes a Dashboard as a mountable `Rails::Engine`. end ``` +### ActiveJob Concurrency + +GoodJob can extend ActiveJob to provide limits on concurrently running jobs, either at time of _enqueue_ or at _perform_. + +**Note:** Limiting concurrency at _enqueue_ requires Rails 6.0+ because Rails 5.2 does not support `throw :abort` in ActiveJob callbacks. + +```ruby +class MyJob < ApplicationJob + include GoodJob::ActiveJobExtensions::Concurrency + + good_job_control_concurrency_with( + # Maximum number of jobs with the concurrency key to be concurrently enqueued + enqueue_limit: 2, + + # Maximum number of jobs with the concurrency key to be concurrently performed + perform_limit: 1, + + # A unique key to be globally locked against. + # Can be String or Lambda/Proc that is invoked in the context of the job. + # Note: Arguments passed to #perform_later must be accessed through `arguments` method. + key: -> { "Unique-#{arguments.first}" } # MyJob.perform_later("Alice") => "Unique-Alice" + ) + + def perform(first_name) + # do work + end +end +``` + ### Updating GoodJob follows semantic versioning, though updates may be encouraged through deprecation warnings in minor versions. diff --git a/lib/good_job/active_job_extensions.rb b/lib/good_job/active_job_extensions.rb new file mode 100644 index 000000000..203500d15 --- /dev/null +++ b/lib/good_job/active_job_extensions.rb @@ -0,0 +1,4 @@ +module GoodJob + module ActiveJobExtensions + end +end diff --git a/lib/good_job/active_job_extensions/concurrency.rb b/lib/good_job/active_job_extensions/concurrency.rb new file mode 100644 index 000000000..20aba9513 --- /dev/null +++ b/lib/good_job/active_job_extensions/concurrency.rb @@ -0,0 +1,68 @@ +module GoodJob + module ActiveJobExtensions + module Concurrency + extend ActiveSupport::Concern + + ConcurrencyExceededError = Class.new(StandardError) + + included do + class_attribute :good_job_concurrency_config, instance_accessor: false, default: {} + + before_enqueue do |job| + # Always allow jobs to be retried because the current job's execution will complete momentarily + next if CurrentExecution.active_job_id == job.job_id + + limit = job.class.good_job_concurrency_config.fetch(:enqueue_limit, Float::INFINITY) + next if limit.blank? || (0...Float::INFINITY).exclude?(limit) + + key = job.good_job_concurrency_key + next if key.blank? + + GoodJob::Job.new.with_advisory_lock(key: key, function: "pg_advisory_lock") do + # TODO: Why is `unscoped` necessary? Nested scope is bleeding into subsequent query? + enqueue_concurrency = GoodJob::Job.unscoped.where(concurrency_key: key).unfinished.count + # The job has not yet been enqueued, so check if adding it will go over the limit + throw :abort if enqueue_concurrency + 1 > limit + end + end + + retry_on( + GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError, + attempts: Float::INFINITY, + wait: :exponentially_longer + ) + + before_perform do |job| + limit = job.class.good_job_concurrency_config.fetch(:perform_limit, Float::INFINITY) + next if limit.blank? || (0...Float::INFINITY).exclude?(limit) + + key = job.good_job_concurrency_key + next if key.blank? + + GoodJob::Job.new.with_advisory_lock(key: key, function: "pg_advisory_lock") do + perform_concurrency = GoodJob::Job.unscoped.where(concurrency_key: key).advisory_locked.count + # The current job has already been locked and will appear in the previous query + raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError if perform_concurrency > limit + end + end + end + + class_methods do + def good_job_control_concurrency_with(config) + self.good_job_concurrency_config = config + end + end + + def good_job_concurrency_key + key = self.class.good_job_concurrency_config[:key] + return if key.blank? + + if key.respond_to? :call + instance_exec(&key) + else + key + end + end + end + end +end diff --git a/lib/good_job/current_execution.rb b/lib/good_job/current_execution.rb index 5ab5dd58f..94326487d 100644 --- a/lib/good_job/current_execution.rb +++ b/lib/good_job/current_execution.rb @@ -4,11 +4,11 @@ module GoodJob # Thread-local attributes for passing values from Instrumentation. # (Cannot use ActiveSupport::CurrentAttributes because ActiveJob resets it) module CurrentExecution - # @!attribute [rw] error_on_retry + # @!attribute [rw] active_job_id # @!scope class - # Error captured by retry_on - # @return [Exception, nil] - thread_mattr_accessor :error_on_retry + # ActiveJob ID + # @return [String, nil] + thread_mattr_accessor :active_job_id # @!attribute [rw] error_on_discard # @!scope class @@ -16,11 +16,18 @@ module CurrentExecution # @return [Exception, nil] thread_mattr_accessor :error_on_discard + # @!attribute [rw] error_on_retry + # @!scope class + # Error captured by retry_on + # @return [Exception, nil] + thread_mattr_accessor :error_on_retry + # Resets attributes # @return [void] def self.reset - self.error_on_retry = nil + self.active_job_id = nil self.error_on_discard = nil + self.error_on_retry = nil end # @return [Integer] Current process ID diff --git a/lib/good_job/job.rb b/lib/good_job/job.rb index f0c5c2d13..c89a4e994 100644 --- a/lib/good_job/job.rb +++ b/lib/good_job/job.rb @@ -219,6 +219,21 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false DEPRECATION end + if column_names.include?('concurrency_key') + good_job_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) + else + ActiveSupport::Deprecation.warn(<<~DEPRECATION) + GoodJob has pending database migrations. To create the migration files, run: + + rails generate good_job:update + + To apply the migration files, run: + + rails db:migrate + + DEPRECATION + end + good_job = GoodJob::Job.new(**good_job_args) instrument_payload[:good_job] = good_job @@ -264,6 +279,10 @@ def executable? self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id) end + def active_job_id + super || serialized_params['job_id'] + end + private # @return [ExecutionResult] @@ -273,6 +292,7 @@ def execute ) GoodJob::CurrentExecution.reset + GoodJob::CurrentExecution.active_job_id = active_job_id ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do value = ActiveJob::Base.execute(params) diff --git a/lib/good_job/lockable.rb b/lib/good_job/lockable.rb index 7e4b4efe4..1310cffff 100644 --- a/lib/good_job/lockable.rb +++ b/lib/good_job/lockable.rb @@ -201,9 +201,16 @@ def pg_or_jdbc_query(query) # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [Boolean] whether the lock was acquired. def advisory_lock(key: lockable_key, function: advisory_lockable_function) - query = <<~SQL.squish - SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS locked - SQL + query = if function.include? "_try_" + <<~SQL.squish + SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS locked + SQL + else + <<~SQL.squish + SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked + SQL + end + binds = [[nil, key]] self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Lock', binds).first['locked'] end diff --git a/spec/lib/good_job/active_job_extensions/concurrency_spec.rb b/spec/lib/good_job/active_job_extensions/concurrency_spec.rb new file mode 100644 index 000000000..fe51a6d22 --- /dev/null +++ b/spec/lib/good_job/active_job_extensions/concurrency_spec.rb @@ -0,0 +1,64 @@ +require 'rails_helper' + +RSpec.describe GoodJob::ActiveJobExtensions::Concurrency do + before do + ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external) + + stub_const 'TestJob', (Class.new(ActiveJob::Base) do + include GoodJob::ActiveJobExtensions::Concurrency + + good_job_control_concurrency_with( + enqueue_limit: 2, + perform_limit: 1, + key: -> { arguments.first[:name] } + ) + + def perform(name:) + name && sleep(1) + end + end) + end + + describe '.good_job_control_concurrency_with' do + describe 'enqueue_limit' do + it "does not enqueue if enqueue concurrency limit is exceeded for a particular key" do + expect(TestJob.perform_later(name: "Alice")).to be_present + expect(TestJob.perform_later(name: "Alice")).to be_present + + # Third usage of key does not enqueue + expect(TestJob.perform_later(name: "Alice")).to eq false + + # Usage of different key does enqueue + expect(TestJob.perform_later(name: "Bob")).to be_present + + expect(GoodJob::Job.where(concurrency_key: "Alice").count).to eq 2 + expect(GoodJob::Job.where(concurrency_key: "Bob").count).to eq 1 + end + end + + describe 'perform_limit' do + before do + allow(GoodJob).to receive(:preserve_job_records).and_return(true) + end + + it "will error and retry jobs if concurrency is exceeded" do + TestJob.perform_later(name: "Alice") + TestJob.perform_later(name: "Alice") + TestJob.perform_later(name: "Bob") + + performer = GoodJob::JobPerformer.new('*') + scheduler = GoodJob::Scheduler.new(performer, max_threads: 5) + 5.times { scheduler.create_thread } + + sleep_until(max: 10, increments_of: 0.5) do + GoodJob::Job.where(concurrency_key: "Alice").finished.count >= 1 && + GoodJob::Job.where(concurrency_key: "Bob").finished.count == 1 + end + scheduler.shutdown + + expect(GoodJob::Job.count).to be > 3 + expect(GoodJob::Job.where("error LIKE '%GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError%'")).to be_present + end + end + end +end diff --git a/spec/lib/good_job/current_execution_spec.rb b/spec/lib/good_job/current_execution_spec.rb index dcfb1347b..43b94dae4 100644 --- a/spec/lib/good_job/current_execution_spec.rb +++ b/spec/lib/good_job/current_execution_spec.rb @@ -52,4 +52,19 @@ expect(described_class.error_on_retry).to eq nil end end + + describe '.active_job_id' do + it 'is assignable, thread-safe, and resettable' do + described_class.active_job_id = 'duck' + + Thread.new do + described_class.active_job_id = 'bear' + end.join + + expect(described_class.active_job_id).to eq 'duck' + + described_class.reset + expect(described_class.active_job_id).to eq nil + end + end end diff --git a/spec/lib/good_job/lockable_spec.rb b/spec/lib/good_job/lockable_spec.rb index 1ea710389..b038202a2 100644 --- a/spec/lib/good_job/lockable_spec.rb +++ b/spec/lib/good_job/lockable_spec.rb @@ -132,6 +132,12 @@ job.advisory_unlock(key: "alternative") end + + it 'can lock alternative postgres functions' do + job.advisory_lock!(function: "pg_advisory_lock") + expect(job.advisory_locked?).to be true + job.advisory_unlock + end end describe '#advisory_unlock' do diff --git a/spec/support/rails_versions.rb b/spec/support/rails_versions.rb new file mode 100644 index 000000000..533c9303b --- /dev/null +++ b/spec/support/rails_versions.rb @@ -0,0 +1,4 @@ +RSpec.configure do |c| + less_than_rails_6 = Gem::Version.new(Rails.version) < Gem::Version.new('6') + c.filter_run_excluding(:skip_rails_5) if less_than_rails_6 +end